refactor: Consolidate end to end normalization code (#6848)

pull/24376/head
Andrew Lamb 2023-02-06 20:47:14 +01:00 committed by GitHub
parent f1f475d552
commit ec42aa0ba9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 243 additions and 212 deletions

View File

@ -3,7 +3,6 @@ mod queries;
use crate::snapshot_comparison::queries::TestQueries;
use crate::{run_influxql, run_sql, MiniCluster};
use arrow_util::{display::pretty_format_batches, test_util::sort_record_batch};
use snafu::{OptionExt, ResultExt, Snafu};
use std::fmt::{Display, Formatter};
use std::{
@ -11,7 +10,6 @@ use std::{
path::{Path, PathBuf},
};
use self::normalization::normalize_results;
use self::queries::Query;
#[derive(Debug, Snafu)]
@ -98,19 +96,7 @@ pub async fn run(
for q in queries.iter() {
output.push(format!("-- {}: {}", language, q.text()));
if q.sorted_compare() {
output.push("-- Results After Sorting".into())
}
if q.normalized_uuids() {
output.push("-- Results After Normalizing UUIDs".into())
}
if q.normalized_metrics() {
output.push("-- Results After Normalizing Metrics".into())
}
if q.normalized_filters() {
output.push("-- Results After Normalizing Filters".into())
}
q.add_description(&mut output);
let results = run_query(cluster, q, language).await?;
output.extend(results);
}
@ -233,7 +219,7 @@ async fn run_query(
) -> Result<Vec<String>> {
let query_text = query.text();
let mut results = match language {
let results = match language {
Language::Sql => {
run_sql(
query_text,
@ -252,22 +238,5 @@ async fn run_query(
}
};
// compare against sorted results, if requested
if query.sorted_compare() && !results.is_empty() {
let schema = results[0].schema();
let batch =
arrow::compute::concat_batches(&schema, &results).expect("concatenating batches");
results = vec![sort_record_batch(batch)];
}
let current_results = pretty_format_batches(&results)
.unwrap()
.trim()
.lines()
.map(|s| s.to_string())
.collect::<Vec<_>>();
let current_results = normalize_results(query, current_results);
Ok(current_results)
Ok(query.normalize_results(results))
}

View File

@ -1,9 +1,28 @@
use crate::snapshot_comparison::queries::Query;
use arrow::record_batch::RecordBatch;
use arrow_util::{display::pretty_format_batches, test_util::sort_record_batch};
use once_cell::sync::Lazy;
use regex::{Captures, Regex};
use std::{borrow::Cow, collections::HashMap};
use uuid::Uuid;
/// Match the parquet UUID
///
/// For example, given
/// `32/51/216/13452/1d325760-2b20-48de-ab48-2267b034133d.parquet`
///
/// matches `1d325760-2b20-48de-ab48-2267b034133d`
static REGEX_UUID: Lazy<Regex> = Lazy::new(|| {
Regex::new("[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}").expect("UUID regex")
});
/// Match the parquet directory names
/// For example, given
/// `32/51/216/13452/1d325760-2b20-48de-ab48-2267b034133d.parquet`
///
/// matches `32/51/216/13452`
static REGEX_DIRS: Lazy<Regex> =
Lazy::new(|| Regex::new(r#"[0-9]+/[0-9]+/[0-9]+/[0-9]+"#).expect("directory regex"));
/// Replace table row separators of flexible width with fixed with. This is required
/// because the original timing values may differ in "printed width", so the table
/// cells have different widths and hence the separators / borders. E.g.:
@ -22,23 +41,79 @@ static REGEX_LINESEP: Lazy<Regex> = Lazy::new(|| Regex::new(r#"[+-]{6,}"#).expec
/// ` |` -> ` |`
static REGEX_COL: Lazy<Regex> = Lazy::new(|| Regex::new(r#"\s+\|"#).expect("col regex"));
/// Matches line like `metrics=[foo=1, bar=2]`
static REGEX_METRICS: Lazy<Regex> =
Lazy::new(|| Regex::new(r#"metrics=\[([^\]]*)\]"#).expect("metrics regex"));
/// Matches things like `1s`, `1.2ms` and `10.2μs`
static REGEX_TIMING: Lazy<Regex> =
Lazy::new(|| Regex::new(r#"[0-9]+(\.[0-9]+)?.s"#).expect("timing regex"));
/// Matches things like `FilterExec: time@2 < -9223372036854775808 OR time@2 > 1640995204240217000`
static REGEX_FILTER: Lazy<Regex> =
Lazy::new(|| Regex::new("FilterExec: .*").expect("filter regex"));
fn normalize_for_variable_width(s: Cow<str>) -> String {
let s = REGEX_LINESEP.replace_all(&s, "----------");
REGEX_COL.replace_all(&s, " |").to_string()
}
pub(crate) fn normalize_results(query: &Query, mut current_results: Vec<String>) -> Vec<String> {
// normalize UUIDs, if requested
if query.normalized_uuids() {
let regex_uuid = Regex::new("[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}")
.expect("UUID regex");
let regex_dirs = Regex::new(r#"[0-9]+/[0-9]+/[0-9]+/[0-9]+"#).expect("directory regex");
/// A query to run with optional annotations
#[derive(Debug, PartialEq, Eq, Default)]
pub struct Normalizer {
/// If true, results are sorted first
pub sorted_compare: bool,
/// If true, replace UUIDs with static placeholders.
pub normalized_uuids: bool,
/// If true, normalize timings in queries by replacing them with
/// static placeholders, for example:
///
/// `1s` -> `1.234ms`
pub normalized_metrics: bool,
/// if true, normalize filter predicates for explain plans
/// `FilterExec: <REDACTED>`
pub normalized_filters: bool,
}
impl Normalizer {
#[cfg(test)]
pub fn new() -> Self {
Default::default()
}
/// Take the output of running the query and apply the specified normalizations to them
pub fn normalize_results(&self, mut results: Vec<RecordBatch>) -> Vec<String> {
// compare against sorted results, if requested
if self.sorted_compare && !results.is_empty() {
let schema = results[0].schema();
let batch =
arrow::compute::concat_batches(&schema, &results).expect("concatenating batches");
results = vec![sort_record_batch(batch)];
}
let mut current_results = pretty_format_batches(&results)
.unwrap()
.trim()
.lines()
.map(|s| s.to_string())
.collect::<Vec<_>>();
// normalize UUIDs, if requested
if self.normalized_uuids {
let mut seen: HashMap<String, u128> = HashMap::new();
current_results = current_results
.into_iter()
.map(|s| {
let s = regex_uuid.replace_all(&s, |s: &Captures| {
// Rewrite parquet directory names like
// `51/216/13452/1d325760-2b20-48de-ab48-2267b034133d.parquet`
//
// to:
// 1/1/1/1/00000000-0000-0000-0000-000000000000.parquet
let s = REGEX_UUID.replace_all(&s, |s: &Captures| {
let next = seen.len() as u128;
Uuid::from_u128(
*seen
@ -49,19 +124,13 @@ pub(crate) fn normalize_results(query: &Query, mut current_results: Vec<String>)
});
let s = normalize_for_variable_width(s);
regex_dirs.replace_all(&s, "1/1/1/1").to_string()
REGEX_DIRS.replace_all(&s, "1/1/1/1").to_string()
})
.collect();
}
// normalize metrics, if requested
if query.normalized_metrics() {
// Parse regex once and apply to all rows. See description around the `replace...` calls on
// why/how the regexes are used.
let regex_metrics = Regex::new(r#"metrics=\[([^\]]*)\]"#).expect("metrics regex");
let regex_timing = Regex::new(r#"[0-9]+(\.[0-9]+)?.s"#).expect("timing regex");
if self.normalized_metrics {
current_results = current_results
.into_iter()
.map(|s| {
@ -70,7 +139,7 @@ pub(crate) fn normalize_results(query: &Query, mut current_results: Vec<String>)
// `1s` -> `1.234ms`
// `1.2ms` -> `1.234ms`
// `10.2μs` -> `1.234ms`
let s = regex_timing.replace_all(&s, "1.234ms");
let s = REGEX_TIMING.replace_all(&s, "1.234ms");
let s = normalize_for_variable_width(s);
@ -80,7 +149,7 @@ pub(crate) fn normalize_results(query: &Query, mut current_results: Vec<String>)
// `metrics=[]` => `metrics=[]`
// `metrics=[foo=1, bar=2]` => `metrics=[bar=2, foo=1]`
// `metrics=[foo=2, bar=1]` => `metrics=[bar=1, foo=2]`
regex_metrics
REGEX_METRICS
.replace_all(&s, |c: &Captures| {
let mut metrics: Vec<_> = c[1].split(", ").collect();
metrics.sort();
@ -98,12 +167,11 @@ pub(crate) fn normalize_results(query: &Query, mut current_results: Vec<String>)
//
// to
// FilterExec: <REDACTED>
if query.normalized_filters() {
let filter_regex = Regex::new("FilterExec: .*").expect("filter regex");
if self.normalized_filters {
current_results = current_results
.into_iter()
.map(|s| {
filter_regex
REGEX_FILTER
.replace_all(&s, |_: &Captures| "FilterExec: <REDACTED>")
.to_string()
})
@ -112,3 +180,20 @@ pub(crate) fn normalize_results(query: &Query, mut current_results: Vec<String>)
current_results
}
/// Adds information on what normalizations were applied to the input
pub fn add_description(&self, output: &mut Vec<String>) {
if self.sorted_compare {
output.push("-- Results After Sorting".into())
}
if self.normalized_uuids {
output.push("-- Results After Normalizing UUIDs".into())
}
if self.normalized_metrics {
output.push("-- Results After Normalizing Metrics".into())
}
if self.normalized_filters {
output.push("-- Results After Normalizing Filters".into())
}
}
}

View File

@ -1,22 +1,12 @@
use arrow::record_batch::RecordBatch;
use super::normalization::Normalizer;
/// A query to run with optional annotations
#[derive(Debug, PartialEq, Eq, Default)]
pub struct Query {
/// If true, results are sorted first prior to comparison, meaning that differences in the
/// output order compared with expected order do not cause a diff
sorted_compare: bool,
/// If true, replace UUIDs with static placeholders.
normalized_uuids: bool,
/// If true, normalize timings in queries by replacing them with
/// static placeholders, for example:
///
/// `1s` -> `1.234ms`
normalized_metrics: bool,
/// if true, normalize filter predicates for explain plans
/// `FilterExec: <REDACTED>`
normalized_filters: bool,
/// Describes how query text should be normalized
normalizer: Normalizer,
/// The query string
text: String,
@ -27,49 +17,49 @@ impl Query {
fn new(text: impl Into<String>) -> Self {
let text = text.into();
Self {
sorted_compare: false,
normalized_uuids: false,
normalized_metrics: false,
normalized_filters: false,
normalizer: Normalizer::new(),
text,
}
}
#[cfg(test)]
fn with_sorted_compare(mut self) -> Self {
self.sorted_compare = true;
pub fn text(&self) -> &str {
&self.text
}
pub fn with_sorted_compare(mut self) -> Self {
self.normalizer.sorted_compare = true;
self
}
/// Get a reference to the query text.
pub fn text(&self) -> &str {
self.text.as_ref()
pub fn with_normalized_uuids(mut self) -> Self {
self.normalizer.normalized_uuids = true;
self
}
/// Get the query's sorted compare.
pub fn sorted_compare(&self) -> bool {
self.sorted_compare
pub fn with_normalize_metrics(mut self) -> Self {
self.normalizer.normalized_metrics = true;
self
}
/// Get queries normalized UUID
pub fn normalized_uuids(&self) -> bool {
self.normalized_uuids
pub fn with_normalize_filters(mut self) -> Self {
self.normalizer.normalized_filters = true;
self
}
/// Use normalized timing values
pub fn normalized_metrics(&self) -> bool {
self.normalized_metrics
/// Take the output of running the query and apply the specified normalizations to them
pub fn normalize_results(&self, results: Vec<RecordBatch>) -> Vec<String> {
self.normalizer.normalize_results(results)
}
/// Use normalized filter plans
pub fn normalized_filters(&self) -> bool {
self.normalized_filters
/// Adds information on what normalizations were applied to the input
pub fn add_description(&self, output: &mut Vec<String>) {
self.normalizer.add_description(output)
}
}
#[derive(Debug, Default)]
struct QueryBuilder {
query: Query,
pub query: Query,
}
impl QueryBuilder {
@ -85,22 +75,6 @@ impl QueryBuilder {
self.query.text.push(c)
}
fn sorted_compare(&mut self) {
self.query.sorted_compare = true;
}
fn normalized_uuids(&mut self) {
self.query.normalized_uuids = true;
}
fn normalize_metrics(&mut self) {
self.query.normalized_metrics = true;
}
fn normalize_filters(&mut self) {
self.query.normalized_filters = true;
}
fn is_empty(&self) -> bool {
self.query.text.is_empty()
}
@ -125,9 +99,10 @@ impl TestQueries {
S: AsRef<str>,
{
let mut queries = vec![];
let mut builder = QueryBuilder::new();
lines.into_iter().for_each(|line| {
let mut builder = lines
.into_iter()
.fold(QueryBuilder::new(), |mut builder, line| {
let line = line.as_ref().trim();
const COMPARE_STR: &str = "-- IOX_COMPARE: ";
if line.starts_with(COMPARE_STR) {
@ -136,16 +111,16 @@ impl TestQueries {
let option = option.trim();
match option {
"sorted" => {
builder.sorted_compare();
builder.query = builder.query.with_sorted_compare();
}
"uuid" => {
builder.normalized_uuids();
builder.query = builder.query.with_normalized_uuids();
}
"metrics" => {
builder.normalize_metrics();
builder.query = builder.query.with_normalize_metrics();
}
"filters" => {
builder.normalize_filters();
builder.query = builder.query.with_normalize_filters();
}
_ => {}
}
@ -153,10 +128,10 @@ impl TestQueries {
}
if line.starts_with("--") {
return;
return builder;
}
if line.is_empty() {
return;
return builder;
}
// replace newlines
@ -171,8 +146,10 @@ impl TestQueries {
queries.push(q);
}
}
builder
});
// get last one, if any
if let Some(q) = builder.build_and_reset() {
queries.push(q);
}