Merge branch 'main' into dom/lifecycle-ids

pull/24376/head
Dom 2022-09-30 14:31:07 +01:00 committed by GitHub
commit 4c3697c5c7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
42 changed files with 2987 additions and 1395 deletions

14
Cargo.lock generated
View File

@ -2020,6 +2020,7 @@ dependencies = [
name = "influxdb_influxql_parser"
version = "0.1.0"
dependencies = [
"assert_matches",
"nom",
"test_helpers",
"workspace-hack",
@ -2272,6 +2273,7 @@ dependencies = [
name = "iox_data_generator"
version = "0.1.0"
dependencies = [
"bytes",
"chrono",
"chrono-english",
"clap",
@ -2281,8 +2283,12 @@ dependencies = [
"humantime",
"influxdb2_client",
"itertools",
"mutable_batch",
"mutable_batch_lp",
"parquet_file",
"rand",
"regex",
"schema",
"serde",
"serde_json",
"snafu",
@ -2656,9 +2662,9 @@ dependencies = [
[[package]]
name = "libc"
version = "0.2.133"
version = "0.2.134"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c0f80d65747a3e43d1596c7c5492d95d5edddaabd45a7fcdb02b95f644164966"
checksum = "329c933548736bc49fd575ee68c89e8be4d260064184389a5b77517cddd99ffb"
[[package]]
name = "libloading"
@ -5093,9 +5099,9 @@ dependencies = [
[[package]]
name = "tikv-jemalloc-sys"
version = "0.5.1+5.3.0-patched"
version = "0.5.2+5.3.0-patched"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "931e876f91fed0827f863a2d153897790da0b24d882c721a79cb3beb0b903261"
checksum = "ec45c14da997d0925c7835883e4d5c181f196fa142f8c19d7643d1e9af2592c3"
dependencies = [
"cc",
"fs_extra",

View File

@ -118,9 +118,14 @@ message Column {
bytes null_mask = 4;
}
service WriteService {
rpc Write (WriteRequest) returns (WriteResponse);
}
// Note there used to be a service that would load this internal protobuf format.
// See https://github.com/influxdata/influxdb_iox/pull/5750 and
// https://github.com/influxdata/influxdb_iox/issues/4866
// for rationale of why it was removed
// service WriteService {
// rpc Write (WriteRequest) returns (WriteResponse);
// }
message WriteRequest {
DatabaseBatch database_batch = 1;

View File

@ -9,3 +9,4 @@ workspace-hack = { path = "../workspace-hack"}
[dev-dependencies] # In alphabetical order
test_helpers = { path = "../test_helpers" }
assert_matches = "1"

View File

@ -1,15 +1,15 @@
#![allow(dead_code)]
use crate::expression::{conditional_expression, Expr};
use crate::expression::conditional::{conditional_expression, ConditionalExpression};
use crate::identifier::{identifier, Identifier};
use crate::internal::{expect, map_fail, ParseResult};
use crate::internal::{expect, ParseResult};
use crate::literal::unsigned_integer;
use core::fmt;
use nom::branch::alt;
use nom::bytes::complete::{tag, tag_no_case};
use nom::character::complete::{char, digit1, multispace1};
use nom::character::complete::{char, multispace0, multispace1};
use nom::combinator::{map, opt, value};
use nom::multi::separated_list1;
use nom::sequence::{pair, preceded, terminated};
use std::fmt::Formatter;
use std::fmt::{Display, Formatter};
/// Represents a fully-qualified measurement name.
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
@ -19,6 +19,35 @@ pub struct MeasurementNameExpression {
pub name: Identifier,
}
impl MeasurementNameExpression {
/// Constructs a new `MeasurementNameExpression` with the specified `name`.
pub fn new(name: Identifier) -> Self {
Self {
database: None,
retention_policy: None,
name,
}
}
/// Constructs a new `MeasurementNameExpression` with the specified `name` and `database`.
pub fn new_db(name: Identifier, database: Identifier) -> Self {
Self {
database: Some(database),
retention_policy: None,
name,
}
}
/// Constructs a new `MeasurementNameExpression` with the specified `name`, `database` and `retention_policy`.
pub fn new_db_rp(name: Identifier, database: Identifier, retention_policy: Identifier) -> Self {
Self {
database: Some(database),
retention_policy: Some(retention_policy),
name,
}
}
}
impl fmt::Display for MeasurementNameExpression {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match self {
@ -83,29 +112,24 @@ pub fn measurement_name_expression(i: &str) -> ParseResult<&str, MeasurementName
))
}
/// Parse an unsigned integer.
fn unsigned_number(i: &str) -> ParseResult<&str, u64> {
map_fail("unable to parse unsigned integer", digit1, &str::parse)(i)
}
/// Parse a LIMIT <n> clause.
/// Parse a `LIMIT <n>` clause.
pub fn limit_clause(i: &str) -> ParseResult<&str, u64> {
preceded(
pair(tag_no_case("LIMIT"), multispace1),
expect(
"invalid LIMIT clause, expected unsigned integer",
unsigned_number,
unsigned_integer,
),
)(i)
}
/// Parse an OFFSET <n> clause.
/// Parse an `OFFSET <n>` clause.
pub fn offset_clause(i: &str) -> ParseResult<&str, u64> {
preceded(
pair(tag_no_case("OFFSET"), multispace1),
expect(
"invalid OFFSET clause, expected unsigned integer",
unsigned_number,
unsigned_integer,
),
)(i)
}
@ -116,7 +140,7 @@ pub fn statement_terminator(i: &str) -> ParseResult<&str, ()> {
}
/// Parse a `WHERE` clause.
pub fn where_clause(i: &str) -> ParseResult<&str, Expr> {
pub fn where_clause(i: &str) -> ParseResult<&str, ConditionalExpression> {
preceded(
pair(tag_no_case("WHERE"), multispace1),
conditional_expression,
@ -124,7 +148,7 @@ pub fn where_clause(i: &str) -> ParseResult<&str, Expr> {
}
/// Represents an InfluxQL `ORDER BY` clause.
#[derive(Default, Debug, Clone, Eq, PartialEq)]
#[derive(Default, Debug, Clone, Copy, Eq, PartialEq)]
pub enum OrderByClause {
#[default]
Ascending,
@ -185,10 +209,86 @@ pub fn order_by_clause(i: &str) -> ParseResult<&str, OrderByClause> {
)(i)
}
/// Parser is a trait that allows a type to parse itself.
pub trait Parser: Sized {
fn parse(i: &str) -> ParseResult<&str, Self>;
}
/// `OneOrMore` is a container for representing a minimum of one `T`.
///
/// `OneOrMore` provides a default implementation of [`fmt::Display`],
/// which displays the contents separated by commas.
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct OneOrMore<T: Display + Parser> {
contents: Vec<T>,
}
impl<T: Display + Parser> Display for OneOrMore<T> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
Display::fmt(self.first(), f)?;
for arg in self.rest() {
write!(f, ", {}", arg)?;
}
Ok(())
}
}
impl<T: Display + Parser> OneOrMore<T> {
/// Construct a new `OneOrMore<T>` with `contents`.
///
/// **NOTE:** that `new` panics if contents is empty.
pub fn new(contents: Vec<T>) -> Self {
if contents.is_empty() {
panic!("OneOrMore requires elements");
}
Self { contents }
}
/// Returns the first element.
pub fn first(&self) -> &T {
self.contents.first().unwrap()
}
/// Returns any remaining elements.
pub fn rest(&self) -> &[T] {
&self.contents[1..]
}
/// Returns the total number of elements.
/// Note that `len` ≥ 1.
pub fn len(&self) -> usize {
self.contents.len()
}
}
impl<T: Display + Parser> OneOrMore<T> {
/// Parse a list of one or more `T`, separated by commas.
///
/// Returns an error using `msg` if `separated_list1` fails to parse any elements.
pub fn separated_list1<'a>(
msg: &'static str,
) -> impl FnMut(&'a str) -> ParseResult<&'a str, Self> {
move |i: &str| {
map(
expect(
msg,
separated_list1(
preceded(multispace0, char(',')),
preceded(multispace0, T::parse),
),
),
Self::new,
)(i)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::assert_expect_error;
use nom::character::complete::alphanumeric1;
#[test]
fn test_measurement_name_expression() {
@ -346,4 +446,38 @@ mod tests {
// Fallible cases
statement_terminator("foo").unwrap_err();
}
impl Parser for String {
fn parse(i: &str) -> ParseResult<&str, Self> {
map(alphanumeric1, &str::to_string)(i)
}
}
type OneOrMoreString = OneOrMore<String>;
#[test]
#[should_panic(expected = "OneOrMore requires elements")]
fn test_one_or_more() {
let (_, got) = OneOrMoreString::separated_list1("Expects one or more")("foo").unwrap();
assert_eq!(got.len(), 1);
assert_eq!(got.first(), "foo");
assert_eq!(format!("{}", got), "foo");
let (_, got) =
OneOrMoreString::separated_list1("Expects one or more")("foo , bar,foobar").unwrap();
assert_eq!(got.len(), 3);
assert_eq!(got.first(), "foo");
assert_eq!(got.rest(), vec!["bar", "foobar"]);
assert_eq!(format!("{}", got), "foo, bar, foobar");
// Fallible cases
assert_expect_error!(
OneOrMoreString::separated_list1("Expects one or more")("+"),
"Expects one or more"
);
// should panic
OneOrMoreString::new(vec![]);
}
}

View File

@ -1,5 +1,5 @@
use crate::common::where_clause;
use crate::expression::Expr;
use crate::expression::conditional::ConditionalExpression;
use crate::internal::{expect, ParseResult};
use crate::simple_from_clause::{delete_from_clause, DeleteFromClause};
use nom::branch::alt;
@ -15,11 +15,11 @@ pub enum DeleteStatement {
/// to restrict which series are deleted.
FromWhere {
from: DeleteFromClause,
condition: Option<Expr>,
condition: Option<ConditionalExpression>,
},
/// A `DELETE` with a conditional expression to restrict which series are deleted.
Where(Expr),
Where(ConditionalExpression),
}
impl Display for DeleteStatement {
@ -46,7 +46,7 @@ pub fn delete_statement(i: &str) -> ParseResult<&str, DeleteStatement> {
preceded(
tag_no_case("DELETE"),
expect(
"invalid DELETE statement, must be followed by FROM or WHERE",
"invalid DELETE statement, expected FROM or WHERE",
preceded(
multispace1,
alt((
@ -85,12 +85,12 @@ mod test {
// Fallible cases
assert_expect_error!(
delete_statement("DELETE"),
"invalid DELETE statement, must be followed by FROM or WHERE"
"invalid DELETE statement, expected FROM or WHERE"
);
assert_expect_error!(
delete_statement("DELETE FOO"),
"invalid DELETE statement, must be followed by FROM or WHERE"
"invalid DELETE statement, expected FROM or WHERE"
);
}
}

View File

@ -15,8 +15,7 @@ pub struct DropMeasurementStatement {
impl Display for DropMeasurementStatement {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "DROP MEASUREMENT {}", self.name)?;
Ok(())
write!(f, "DROP MEASUREMENT {}", self.name)
}
}
@ -24,7 +23,7 @@ pub fn drop_statement(i: &str) -> ParseResult<&str, DropMeasurementStatement> {
preceded(
pair(tag_no_case("DROP"), multispace1),
expect(
"invalid DROP statement, must be followed by MEASUREMENT",
"invalid DROP statement, expected MEASUREMENT",
drop_measurement,
),
)(i)
@ -55,7 +54,7 @@ mod test {
// Fallible cases
assert_expect_error!(
drop_statement("DROP foo"),
"invalid DROP statement, must be followed by MEASUREMENT"
"invalid DROP statement, expected MEASUREMENT"
);
}

View File

@ -1,696 +1,5 @@
#![allow(dead_code)]
use crate::identifier::unquoted_identifier;
use crate::internal::ParseResult;
use crate::literal::literal_regex;
use crate::{
identifier::{identifier, Identifier},
literal::{literal, Literal},
parameter::{parameter, BindParameter},
};
use nom::branch::alt;
use nom::bytes::complete::{tag, tag_no_case};
use nom::character::complete::{char, multispace0};
use nom::combinator::{cut, map, value};
use nom::multi::{many0, separated_list0};
use nom::sequence::{delimited, preceded, separated_pair, tuple};
use std::fmt::{Display, Formatter, Write};
/// An InfluxQL expression of any type.
#[derive(Clone, Debug, PartialEq)]
pub enum Expr {
/// Identifier name, such as a tag or field key
Identifier(Identifier),
/// BindParameter identifier
BindParameter(BindParameter),
/// Literal value such as 'foo', 5 or /^(a|b)$/
Literal(Literal),
/// Unary operation such as + 5 or - 1h3m
UnaryOp(UnaryOperator, Box<Expr>),
/// Function call
Call {
name: String,
args: Option<Vec<Expr>>,
},
/// Binary operations, such as the
/// conditional foo = 'bar' or the arithmetic 1 + 2 expressions.
BinaryOp {
lhs: Box<Expr>,
op: BinaryOperator,
rhs: Box<Expr>,
},
/// Nested expression, such as (foo = 'bar') or (1)
Nested(Box<Expr>),
}
impl From<Literal> for Expr {
fn from(v: Literal) -> Self {
Self::Literal(v)
}
}
impl From<u64> for Expr {
fn from(v: u64) -> Self {
Self::Literal(v.into())
}
}
impl From<f64> for Expr {
fn from(v: f64) -> Self {
Self::Literal(v.into())
}
}
impl From<u64> for Box<Expr> {
fn from(v: u64) -> Self {
Self::new(v.into())
}
}
impl Display for Expr {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::Identifier(v) => write!(f, "{}", v)?,
Self::BindParameter(v) => write!(f, "{}", v)?,
Self::Literal(v) => write!(f, "{}", v)?,
Self::UnaryOp(op, e) => write!(f, "{}{}", op, e)?,
Self::BinaryOp { lhs, op, rhs } => write!(f, "{} {} {}", lhs, op, rhs)?,
Self::Nested(e) => write!(f, "({})", e)?,
Self::Call { name, args } => {
write!(f, "{}(", name)?;
match args {
Some(args) if !args.is_empty() => {
let args = args.as_slice();
write!(f, "{}", args[0])?;
for arg in &args[1..] {
write!(f, ", {}", arg)?;
}
}
_ => {}
}
write!(f, ")")?;
}
}
Ok(())
}
}
/// An InfluxQL unary operator.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum UnaryOperator {
Plus,
Minus,
}
impl Display for UnaryOperator {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::Plus => f.write_char('+')?,
Self::Minus => f.write_char('-')?,
}
Ok(())
}
}
/// An InfluxQL binary operators.
#[derive(Clone, Debug, Copy, PartialEq, Eq)]
pub enum BinaryOperator {
Add, // +
Sub, // -
Mul, // *
Div, // /
Mod, // %
BitwiseAnd, // &
BitwiseOr, // |
BitwiseXor, // ^
Eq, // =
NotEq, // !=
EqRegex, // =~
NotEqRegex, // !~
Lt, // <
LtEq, // <=
Gt, // >
GtEq, // >=
In, // IN
And, // AND
Or, // OR
}
impl Display for BinaryOperator {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::Add => f.write_char('+')?,
Self::Sub => f.write_char('-')?,
Self::Mul => f.write_char('*')?,
Self::Div => f.write_char('/')?,
Self::Mod => f.write_char('%')?,
Self::BitwiseAnd => f.write_char('&')?,
Self::BitwiseOr => f.write_char('|')?,
Self::BitwiseXor => f.write_char('^')?,
Self::Eq => f.write_char('=')?,
Self::NotEq => f.write_str("!=")?,
Self::EqRegex => f.write_str("=~")?,
Self::NotEqRegex => f.write_str("!~")?,
Self::Lt => f.write_char('<')?,
Self::LtEq => f.write_str("<=")?,
Self::Gt => f.write_char('>')?,
Self::GtEq => f.write_str(">=")?,
Self::In => f.write_str("IN")?,
Self::And => f.write_str("AND")?,
Self::Or => f.write_str("OR")?,
}
Ok(())
}
}
/// Parse a unary expression.
fn unary(i: &str) -> ParseResult<&str, Expr> {
let (i, op) = preceded(
multispace0,
alt((
value(UnaryOperator::Plus, char('+')),
value(UnaryOperator::Minus, char('-')),
)),
)(i)?;
let (i, e) = factor(i)?;
Ok((i, Expr::UnaryOp(op, e.into())))
}
/// Parse a parenthesis expression.
fn parens(i: &str) -> ParseResult<&str, Expr> {
delimited(
preceded(multispace0, char('(')),
map(conditional_expression, |e| Expr::Nested(e.into())),
preceded(multispace0, char(')')),
)(i)
}
/// Parse a function call expression
fn call(i: &str) -> ParseResult<&str, Expr> {
map(
separated_pair(
map(unquoted_identifier, &str::to_string),
multispace0,
delimited(
char('('),
alt((
// A single regular expression to match 0 or more field keys
map(preceded(multispace0, literal_regex), |re| vec![re.into()]),
// A list of Expr, separated by commas
separated_list0(preceded(multispace0, char(',')), arithmetic),
)),
cut(preceded(multispace0, char(')'))),
),
),
|(name, args)| match args.is_empty() {
true => Expr::Call { name, args: None },
false => Expr::Call {
name,
args: Some(args),
},
},
)(i)
}
/// Parse an operand expression, such as a literal, identifier or bind parameter.
fn operand(i: &str) -> ParseResult<&str, Expr> {
preceded(
multispace0,
alt((
map(literal, Expr::Literal),
map(identifier, Expr::Identifier),
map(parameter, Expr::BindParameter),
)),
)(i)
}
/// Parse precedence priority 1 operators.
///
/// These are the highest precedence operators, and include parenthesis and the unary operators.
fn factor(i: &str) -> ParseResult<&str, Expr> {
alt((unary, parens, operand))(i)
}
/// Parse arithmetic, precedence priority 2 operators.
///
/// This includes the multiplication, division, bitwise and, and modulus operators.
fn term(i: &str) -> ParseResult<&str, Expr> {
let (input, left) = factor(i)?;
let (input, remaining) = many0(tuple((
preceded(
multispace0,
alt((
value(BinaryOperator::Mul, char('*')),
value(BinaryOperator::Div, char('/')),
value(BinaryOperator::BitwiseAnd, char('&')),
value(BinaryOperator::Mod, char('%')),
)),
),
factor,
)))(input)?;
Ok((input, reduce_expr(left, remaining)))
}
/// Parse arithmetic, precedence priority 3 operators.
///
/// This includes the addition, subtraction, bitwise or, and bitwise xor operators.
fn arithmetic(i: &str) -> ParseResult<&str, Expr> {
let (input, left) = term(i)?;
let (input, remaining) = many0(tuple((
preceded(
multispace0,
alt((
value(BinaryOperator::Add, char('+')),
value(BinaryOperator::Sub, char('-')),
value(BinaryOperator::BitwiseOr, char('|')),
value(BinaryOperator::BitwiseXor, char('^')),
)),
),
cut(term),
)))(input)?;
Ok((input, reduce_expr(left, remaining)))
}
/// Parse the conditional regular expression operators `=~` and `!~`.
fn conditional_regex(i: &str) -> ParseResult<&str, Expr> {
let (input, f1) = arithmetic(i)?;
let (input, exprs) = many0(tuple((
preceded(
multispace0,
alt((
value(BinaryOperator::EqRegex, tag("=~")),
value(BinaryOperator::NotEqRegex, tag("!~")),
)),
),
map(cut(preceded(multispace0, literal_regex)), From::from),
)))(input)?;
Ok((input, reduce_expr(f1, exprs)))
}
/// Parse conditional operators.
fn conditional(i: &str) -> ParseResult<&str, Expr> {
let (input, f1) = conditional_regex(i)?;
let (input, exprs) = many0(tuple((
preceded(
multispace0,
alt((
// try longest matches first
value(BinaryOperator::LtEq, tag("<=")),
value(BinaryOperator::GtEq, tag(">=")),
value(BinaryOperator::NotEq, tag("!=")),
value(BinaryOperator::Lt, char('<')),
value(BinaryOperator::Gt, char('>')),
value(BinaryOperator::Eq, char('=')),
)),
),
cut(conditional_regex),
)))(input)?;
Ok((input, reduce_expr(f1, exprs)))
}
/// Parse conjunction operators, such as `AND`.
fn conjunction(i: &str) -> ParseResult<&str, Expr> {
let (input, f1) = conditional(i)?;
let (input, exprs) = many0(tuple((
value(
BinaryOperator::And,
preceded(multispace0, tag_no_case("and")),
),
cut(conditional),
)))(input)?;
Ok((input, reduce_expr(f1, exprs)))
}
/// Parse disjunction operator, such as `OR`.
fn disjunction(i: &str) -> ParseResult<&str, Expr> {
let (input, f1) = conjunction(i)?;
let (input, exprs) = many0(tuple((
value(BinaryOperator::Or, preceded(multispace0, tag_no_case("or"))),
cut(conjunction),
)))(input)?;
Ok((input, reduce_expr(f1, exprs)))
}
/// Parse an InfluxQL conditional expression.
pub fn conditional_expression(i: &str) -> ParseResult<&str, Expr> {
disjunction(i)
}
/// Folds `expr` and `remainder` into a [Expr::BinaryOp] tree.
fn reduce_expr(expr: Expr, remainder: Vec<(BinaryOperator, Expr)>) -> Expr {
remainder.into_iter().fold(expr, |lhs, val| Expr::BinaryOp {
lhs: lhs.into(),
op: val.0,
rhs: val.1.into(),
})
}
pub mod arithmetic;
pub mod conditional;
#[cfg(test)]
mod test {
use super::*;
use crate::assert_failure;
/// Constructs an [Expr::Identifier] expression.
macro_rules! ident {
($EXPR: expr) => {
Expr::Identifier($EXPR.into())
};
}
/// Constructs a regular expression [Expr::Literal].
macro_rules! regex {
($EXPR: expr) => {
Expr::Literal(crate::literal::Literal::Regex($EXPR.into()).into())
};
}
/// Constructs a [Expr::BindParameter] expression.
macro_rules! param {
($EXPR: expr) => {
Expr::BindParameter(crate::parameter::BindParameter($EXPR.into()).into())
};
}
/// Constructs a [Expr::Nested] expression.
macro_rules! nested {
($EXPR: expr) => {
<Expr as std::convert::Into<Box<Expr>>>::into(Expr::Nested($EXPR.into()))
};
}
/// Constructs a [Expr::UnaryOp] expression.
macro_rules! unary {
(- $EXPR:expr) => {
<Expr as std::convert::Into<Box<Expr>>>::into(Expr::UnaryOp(
UnaryOperator::Minus,
$EXPR.into(),
))
};
(+ $EXPR:expr) => {
<Expr as std::convert::Into<Box<Expr>>>::into(Expr::UnaryOp(
UnaryOperator::Plus,
$EXPR.into(),
))
};
}
/// Constructs a [Expr::BinaryOp] expression.
macro_rules! binary_op {
($LHS: expr, $OP: ident, $RHS: expr) => {
<Expr as std::convert::Into<Box<Expr>>>::into(Expr::BinaryOp {
lhs: $LHS.into(),
op: BinaryOperator::$OP,
rhs: $RHS.into(),
})
};
}
#[test]
fn test_arithmetic_expression() {
let (_, got) = conditional_expression("5 + 51").unwrap();
assert_eq!(got, *binary_op!(5, Add, 51));
let (_, got) = conditional_expression("5 + $foo").unwrap();
assert_eq!(got, *binary_op!(5, Add, param!("foo")));
// Following two tests validate that operators of higher precedence
// are nested deeper in the AST.
let (_, got) = conditional_expression("5 % -3 | 2").unwrap();
assert_eq!(
got,
*binary_op!(binary_op!(5, Mod, unary!(-3)), BitwiseOr, 2)
);
let (_, got) = conditional_expression("-3 | 2 % 5").unwrap();
assert_eq!(
got,
*binary_op!(unary!(-3), BitwiseOr, binary_op!(2, Mod, 5))
);
let (_, got) = conditional_expression("5 % 2 | -3").unwrap();
assert_eq!(
got,
*binary_op!(binary_op!(5, Mod, 2), BitwiseOr, unary!(-3))
);
let (_, got) = conditional_expression("2 | -3 % 5").unwrap();
assert_eq!(
got,
*binary_op!(2, BitwiseOr, binary_op!(unary!(-3), Mod, 5))
);
let (_, got) = conditional_expression("5 - -(3 | 2)").unwrap();
assert_eq!(
got,
*binary_op!(5, Sub, unary!(-nested!(binary_op!(3, BitwiseOr, 2))))
);
let (_, got) = conditional_expression("2 | 5 % 3").unwrap();
assert_eq!(got, *binary_op!(2, BitwiseOr, binary_op!(5, Mod, 3)));
// Expressions are still valid when unnecessary whitespace is omitted
let (_, got) = conditional_expression("5+51").unwrap();
assert_eq!(got, *binary_op!(5, Add, 51));
let (_, got) = conditional_expression("5+$foo").unwrap();
assert_eq!(got, *binary_op!(5, Add, param!("foo")));
let (_, got) = conditional_expression("5- -(3|2)").unwrap();
assert_eq!(
got,
*binary_op!(5, Sub, unary!(-nested!(binary_op!(3, BitwiseOr, 2))))
);
// whitespace is not significant between unary operators
let (_, got) = conditional_expression("5+-(3|2)").unwrap();
assert_eq!(
got,
*binary_op!(5, Add, unary!(-nested!(binary_op!(3, BitwiseOr, 2))))
);
// Fallible cases
// invalid operator / incomplete expression
assert_failure!(conditional_expression("5 || 3"));
// TODO: skip until https://github.com/influxdata/influxdb_iox/issues/5663 is implemented
// assert_failure!(conditional_expression("5+--(3|2)"));
}
#[test]
fn test_conditional_expression() {
let (_, got) = conditional_expression("foo = 5").unwrap();
assert_eq!(got, *binary_op!(ident!("foo"), Eq, 5));
let (_, got) = conditional_expression("foo != 5").unwrap();
assert_eq!(got, *binary_op!(ident!("foo"), NotEq, 5));
let (_, got) = conditional_expression("foo > 5").unwrap();
assert_eq!(got, *binary_op!(ident!("foo"), Gt, 5));
let (_, got) = conditional_expression("foo >= 5").unwrap();
assert_eq!(got, *binary_op!(ident!("foo"), GtEq, 5));
let (_, got) = conditional_expression("foo < 5").unwrap();
assert_eq!(got, *binary_op!(ident!("foo"), Lt, 5));
let (_, got) = conditional_expression("foo <= 5").unwrap();
assert_eq!(got, *binary_op!(ident!("foo"), LtEq, 5));
let (_, got) = conditional_expression("foo > 5 + 6 ").unwrap();
assert_eq!(got, *binary_op!(ident!("foo"), Gt, binary_op!(5, Add, 6)));
let (_, got) = conditional_expression("5 <= -6").unwrap();
assert_eq!(got, *binary_op!(5, LtEq, unary!(-6)));
// simple expressions
let (_, got) = conditional_expression("true").unwrap();
assert_eq!(got, Expr::Literal(true.into()));
// Expressions are still valid when whitespace is omitted
let (_, got) = conditional_expression("foo>5+6 ").unwrap();
assert_eq!(got, *binary_op!(ident!("foo"), Gt, binary_op!(5, Add, 6)));
let (_, got) = conditional_expression("5<=-6").unwrap();
assert_eq!(got, *binary_op!(5, LtEq, unary!(-6)));
// Fallible cases
// conditional expression must be complete
assert_failure!(conditional_expression("5 <="));
// should not accept a regex literal
assert_failure!(conditional_expression("5 = /regex/"));
}
#[test]
fn test_logical_expression() {
let (_, got) = conditional_expression("5 AND 6").unwrap();
assert_eq!(got, *binary_op!(5, And, 6));
let (_, got) = conditional_expression("5 AND 6 OR 7").unwrap();
assert_eq!(got, *binary_op!(binary_op!(5, And, 6), Or, 7));
let (_, got) = conditional_expression("5 > 3 OR 6 = 7 AND 7 != 1").unwrap();
assert_eq!(
got,
*binary_op!(
binary_op!(5, Gt, 3),
Or,
binary_op!(binary_op!(6, Eq, 7), And, binary_op!(7, NotEq, 1))
)
);
let (_, got) = conditional_expression("5 AND (6 OR 7)").unwrap();
assert_eq!(got, *binary_op!(5, And, nested!(binary_op!(6, Or, 7))));
// Fallible cases
// Expects Expr after operator
assert_failure!(conditional_expression("5 OR -"));
assert_failure!(conditional_expression("5 OR"));
assert_failure!(conditional_expression("5 AND"));
// Can't use "and" as identifier
assert_failure!(conditional_expression("5 AND and OR 5"));
}
#[test]
fn test_regex() {
let (_, got) = conditional_expression("foo =~ /(a > b)/").unwrap();
assert_eq!(got, *binary_op!(ident!("foo"), EqRegex, regex!("(a > b)")));
let (_, got) = conditional_expression("foo !~ /bar/").unwrap();
assert_eq!(got, *binary_op!(ident!("foo"), NotEqRegex, regex!("bar")));
// Expressions are still valid when whitespace is omitted
let (_, got) = conditional_expression("foo=~/(a > b)/").unwrap();
assert_eq!(got, *binary_op!(ident!("foo"), EqRegex, regex!("(a > b)")));
// Fallible cases
// Expects a regex literal after regex conditional operators
assert_failure!(conditional_expression("foo =~ 5"));
assert_failure!(conditional_expression("foo !~ 5"));
}
#[test]
fn test_spacing_and_remaining_input() {
// Validate that the remaining input is returned
let (got, _) = conditional_expression("foo < 1 + 2 LIMIT 10").unwrap();
assert_eq!(got, " LIMIT 10");
// Any whitespace preceding the expression is consumed
conditional_expression(" foo < 1 + 2").unwrap();
// Various whitespace separators are supported between tokens
let (got, _) = conditional_expression("foo\n > 1 \t + \n \t3").unwrap();
assert!(got.is_empty())
}
#[test]
fn test_display_expr() {
let (_, e) = conditional_expression("5 + 51").unwrap();
let got = format!("{}", e);
assert_eq!(got, "5 + 51");
let (_, e) = conditional_expression("5 + -10").unwrap();
let got = format!("{}", e);
assert_eq!(got, "5 + -10");
let (_, e) = conditional_expression("-(5 % 6)").unwrap();
let got = format!("{}", e);
assert_eq!(got, "-(5 % 6)");
// vary spacing
let (_, e) = conditional_expression("( 5 + 6 ) * -( 7+ 8)").unwrap();
let got = format!("{}", e);
assert_eq!(got, "(5 + 6) * -(7 + 8)");
// multiple unary and parenthesis
let (_, e) = conditional_expression("(-(5 + 6) & -+( 7 + 8 ))").unwrap();
let got = format!("{}", e);
assert_eq!(got, "(-(5 + 6) & -+(7 + 8))");
// unquoted identifier
let (_, e) = conditional_expression("foo + 5").unwrap();
let got = format!("{}", e);
assert_eq!(got, "foo + 5");
// bind parameter identifier
let (_, e) = conditional_expression("foo + $0").unwrap();
let got = format!("{}", e);
assert_eq!(got, "foo + $0");
// quoted identifier
let (_, e) = conditional_expression(r#""foo" + 'bar'"#).unwrap();
let got = format!("{}", e);
assert_eq!(got, r#"foo + 'bar'"#);
// Duration
let (_, e) = conditional_expression("- 6h30m").unwrap();
let got = format!("{}", e);
assert_eq!(got, "-6h30m");
// can't parse literal regular expressions as part of an arithmetic expression
assert_failure!(conditional_expression(r#""foo" + /^(no|match)$/"#));
}
#[test]
fn test_call() {
// These tests validate a `Call` expression and also it's Display implementation.
// We don't need to validate Expr trees, as we do that in the conditional and arithmetic
// tests.
// No arguments
let (_, ex) = call("FN()").unwrap();
let got = format!("{}", ex);
assert_eq!(got, "FN()");
// Single argument with surrounding whitespace
let (_, ex) = call("FN ( 1 )").unwrap();
let got = format!("{}", ex);
assert_eq!(got, "FN(1)");
// Multiple arguments with varying whitespace
let (_, ex) = call("FN ( 1,2\n,3,\t4 )").unwrap();
let got = format!("{}", ex);
assert_eq!(got, "FN(1, 2, 3, 4)");
// Arguments as expressions
let (_, ex) = call("FN ( 1 + 2, foo, 'bar' )").unwrap();
let got = format!("{}", ex);
assert_eq!(got, "FN(1 + 2, foo, 'bar')");
// A single regular expression argument
let (_, ex) = call("FN ( /foo/ )").unwrap();
let got = format!("{}", ex);
assert_eq!(got, "FN(/foo/)");
// Fallible cases
call("FN ( 1").unwrap_err();
call("FN ( 1, )").unwrap_err();
call("FN ( 1,, 2 )").unwrap_err();
// Conditionals not supported
call("FN ( 1 = 2 )").unwrap_err();
// Multiple regular expressions not supported
call("FN ( /foo/, /bar/ )").unwrap_err();
}
}
mod test_util;

View File

@ -0,0 +1,637 @@
use crate::identifier::unquoted_identifier;
use crate::internal::{expect, ParseResult};
use crate::literal::literal_regex;
use crate::{
identifier::{identifier, Identifier},
literal::Literal,
parameter::BindParameter,
};
use nom::branch::alt;
use nom::bytes::complete::{tag, tag_no_case};
use nom::character::complete::{char, multispace0};
use nom::combinator::{cut, map, opt, value};
use nom::multi::{many0, separated_list0};
use nom::sequence::{delimited, pair, preceded, separated_pair, tuple};
use std::fmt::{Display, Formatter, Write};
/// An InfluxQL arithmetic expression.
#[derive(Clone, Debug, PartialEq)]
pub enum Expr {
/// Reference to a tag or field key.
VarRef {
/// The name of the tag or field.
name: Identifier,
/// An optional data type selection specified using the `::` operator.
///
/// When the `::` operator follows an identifier, it instructs InfluxQL to fetch
/// only data of the matching data type.
///
/// The `::` operator appears after an [`Identifier`] and may be described using
/// the following EBNF:
///
/// ```text
/// variable_ref ::= identifier ( "::" data_type )?
/// data_type ::= "float" | "integer" | "boolean" | "string" | "tag" | "field"
/// ```
///
/// For example:
///
/// ```text
/// SELECT foo::field, host::tag, usage_idle::integer, idle::boolean FROM cpu
/// ```
///
/// Specifies the following:
///
/// * `foo::field` will return a field of any data type named `foo`
/// * `host::tag` will return a tag named `host`
/// * `usage_idle::integer` will return either a float or integer field named `usage_idle`,
/// and casting it to an `integer`
/// * `idle::boolean` will return a field named `idle` that has a matching data type of
/// `boolean`
data_type: Option<VarRefDataType>,
},
/// BindParameter identifier
BindParameter(BindParameter),
/// Literal value such as 'foo', 5 or /^(a|b)$/
Literal(Literal),
/// A literal wildcard (`*`) with an optional data type selection.
Wildcard(Option<WildcardType>),
/// A DISTINCT <identifier> expression.
Distinct(Identifier),
/// Unary operation such as + 5 or - 1h3m
UnaryOp(UnaryOperator, Box<Expr>),
/// Function call
Call { name: String, args: Vec<Expr> },
/// Binary operations, such as `1 + 2`.
Binary {
lhs: Box<Expr>,
op: BinaryOperator,
rhs: Box<Expr>,
},
/// Nested expression, such as (foo = 'bar') or (1)
Nested(Box<Expr>),
}
impl From<Literal> for Expr {
fn from(v: Literal) -> Self {
Self::Literal(v)
}
}
impl From<u64> for Expr {
fn from(v: u64) -> Self {
Self::Literal(v.into())
}
}
impl From<f64> for Expr {
fn from(v: f64) -> Self {
Self::Literal(v.into())
}
}
impl From<u64> for Box<Expr> {
fn from(v: u64) -> Self {
Self::new(v.into())
}
}
impl Display for Expr {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::VarRef { name, data_type } => {
write!(f, "{}", name)?;
if let Some(d) = data_type {
write!(f, "::{}", d)?;
}
}
Self::BindParameter(v) => write!(f, "{}", v)?,
Self::Literal(v) => write!(f, "{}", v)?,
Self::UnaryOp(op, e) => write!(f, "{}{}", op, e)?,
Self::Binary { lhs, op, rhs } => write!(f, "{} {} {}", lhs, op, rhs)?,
Self::Nested(e) => write!(f, "({})", e)?,
Self::Call { name, args } => {
write!(f, "{}(", name)?;
if !args.is_empty() {
let args = args.as_slice();
write!(f, "{}", args[0])?;
for arg in &args[1..] {
write!(f, ", {}", arg)?;
}
}
write!(f, ")")?;
}
Self::Wildcard(Some(dt)) => write!(f, "*::{}", dt)?,
Self::Wildcard(None) => f.write_char('*')?,
Self::Distinct(ident) => write!(f, "DISTINCT {}", ident)?,
}
Ok(())
}
}
/// Specifies the data type of a wildcard (`*`) when using the `::` operator.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum WildcardType {
Tag,
Field,
}
impl Display for WildcardType {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::Tag => f.write_str("tag"),
Self::Field => f.write_str("field"),
}
}
}
/// Represents the primitive data types of a [`Expr::VarRef`] when specified
/// using a [cast operation][cast].
///
/// InfluxQL only supports casting between [`Self::Float`] and [`Self::Integer`] types.
///
/// [cast]: https://docs.influxdata.com/influxdb/v1.8/query_language/explore-data/#cast-operations
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum VarRefDataType {
Float,
Integer,
String,
Boolean,
Tag,
Field,
}
impl Display for VarRefDataType {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::Float => f.write_str("float"),
Self::Integer => f.write_str("integer"),
Self::String => f.write_str("string"),
Self::Boolean => f.write_str("boolean"),
Self::Tag => f.write_str("tag"),
Self::Field => f.write_str("field"),
}
}
}
/// An InfluxQL unary operator.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum UnaryOperator {
Plus,
Minus,
}
impl Display for UnaryOperator {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::Plus => f.write_char('+'),
Self::Minus => f.write_char('-'),
}
}
}
/// An InfluxQL binary operators.
#[derive(Clone, Debug, Copy, PartialEq, Eq)]
pub enum BinaryOperator {
Add, // +
Sub, // -
Mul, // *
Div, // /
Mod, // %
BitwiseAnd, // &
BitwiseOr, // |
BitwiseXor, // ^
}
impl Display for BinaryOperator {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::Add => f.write_char('+'),
Self::Sub => f.write_char('-'),
Self::Mul => f.write_char('*'),
Self::Div => f.write_char('/'),
Self::Mod => f.write_char('%'),
Self::BitwiseAnd => f.write_char('&'),
Self::BitwiseOr => f.write_char('|'),
Self::BitwiseXor => f.write_char('^'),
}
}
}
/// Parse a unary expression.
fn unary<T>(i: &str) -> ParseResult<&str, Expr>
where
T: ArithmeticParsers,
{
let (i, op) = preceded(
multispace0,
alt((
value(UnaryOperator::Plus, char('+')),
value(UnaryOperator::Minus, char('-')),
)),
)(i)?;
let (i, e) = factor::<T>(i)?;
Ok((i, Expr::UnaryOp(op, e.into())))
}
/// Parse a parenthesis expression.
fn parens<T>(i: &str) -> ParseResult<&str, Expr>
where
T: ArithmeticParsers,
{
delimited(
preceded(multispace0, char('(')),
map(arithmetic::<T>, |e| Expr::Nested(e.into())),
preceded(multispace0, char(')')),
)(i)
}
/// Parse a function call expression
pub fn call_expression<T>(i: &str) -> ParseResult<&str, Expr>
where
T: ArithmeticParsers,
{
map(
separated_pair(
map(unquoted_identifier, &str::to_string),
multispace0,
delimited(
char('('),
alt((
// A single regular expression to match 0 or more field keys
map(preceded(multispace0, literal_regex), |re| vec![re.into()]),
// A list of Expr, separated by commas
separated_list0(preceded(multispace0, char(',')), arithmetic::<T>),
)),
cut(preceded(multispace0, char(')'))),
),
),
|(name, args)| Expr::Call { name, args },
)(i)
}
/// Parse a variable reference, which is an identifier followed by an optional cast expression.
pub fn var_ref(i: &str) -> ParseResult<&str, Expr> {
map(
pair(
identifier,
opt(preceded(
tag("::"),
expect(
"invalid data type for tag or field reference, expected float, integer, string, boolean, tag or field",
alt((
value(VarRefDataType::Float, tag_no_case("float")),
value(VarRefDataType::Integer, tag_no_case("integer")),
value(VarRefDataType::String, tag_no_case("string")),
value(VarRefDataType::Boolean, tag_no_case("boolean")),
value(VarRefDataType::Tag, tag_no_case("tag")),
value(VarRefDataType::Field, tag_no_case("field"))
))
)
)),
),
|(name, data_type)| Expr::VarRef { name, data_type },
)(i)
}
/// Parse precedence priority 1 operators.
///
/// These are the highest precedence operators, and include parenthesis and the unary operators.
fn factor<T>(i: &str) -> ParseResult<&str, Expr>
where
T: ArithmeticParsers,
{
alt((unary::<T>, parens::<T>, T::operand))(i)
}
/// Parse arithmetic, precedence priority 2 operators.
///
/// This includes the multiplication, division, bitwise and, and modulus operators.
fn term<T>(i: &str) -> ParseResult<&str, Expr>
where
T: ArithmeticParsers,
{
let (input, left) = factor::<T>(i)?;
let (input, remaining) = many0(tuple((
preceded(
multispace0,
alt((
value(BinaryOperator::Mul, char('*')),
value(BinaryOperator::Div, char('/')),
value(BinaryOperator::BitwiseAnd, char('&')),
value(BinaryOperator::Mod, char('%')),
)),
),
factor::<T>,
)))(input)?;
Ok((input, reduce_expr(left, remaining)))
}
/// Parse an arithmetic expression.
///
/// This includes the addition, subtraction, bitwise or, and bitwise xor operators.
pub fn arithmetic<T>(i: &str) -> ParseResult<&str, Expr>
where
T: ArithmeticParsers,
{
let (input, left) = term::<T>(i)?;
let (input, remaining) = many0(tuple((
preceded(
multispace0,
alt((
value(BinaryOperator::Add, char('+')),
value(BinaryOperator::Sub, char('-')),
value(BinaryOperator::BitwiseOr, char('|')),
value(BinaryOperator::BitwiseXor, char('^')),
)),
),
cut(term::<T>),
)))(input)?;
Ok((input, reduce_expr(left, remaining)))
}
/// A trait for customizing arithmetic parsers.
pub trait ArithmeticParsers {
/// Parse an operand of an arithmetic expression.
fn operand(i: &str) -> ParseResult<&str, Expr>;
}
/// Folds `expr` and `remainder` into a [Expr::Binary] tree.
fn reduce_expr(expr: Expr, remainder: Vec<(BinaryOperator, Expr)>) -> Expr {
remainder.into_iter().fold(expr, |lhs, val| Expr::Binary {
lhs: lhs.into(),
op: val.0,
rhs: val.1.into(),
})
}
#[cfg(test)]
mod test {
use super::*;
use crate::literal::literal_no_regex;
use crate::parameter::parameter;
use crate::{assert_expect_error, assert_failure, binary_op, nested, param, unary, var_ref};
struct TestParsers;
impl ArithmeticParsers for TestParsers {
fn operand(i: &str) -> ParseResult<&str, Expr> {
preceded(
multispace0,
alt((
map(literal_no_regex, Expr::Literal),
var_ref,
map(parameter, Expr::BindParameter),
)),
)(i)
}
}
fn arithmetic_expression(i: &str) -> ParseResult<&str, Expr> {
arithmetic::<TestParsers>(i)
}
#[test]
fn test_arithmetic() {
let (_, got) = arithmetic_expression("5 + 51").unwrap();
assert_eq!(got, binary_op!(5, Add, 51));
let (_, got) = arithmetic_expression("5 + $foo").unwrap();
assert_eq!(got, binary_op!(5, Add, param!("foo")));
// Following two tests validate that operators of higher precedence
// are nested deeper in the AST.
let (_, got) = arithmetic_expression("5 % -3 | 2").unwrap();
assert_eq!(
got,
binary_op!(binary_op!(5, Mod, unary!(-3)), BitwiseOr, 2)
);
let (_, got) = arithmetic_expression("-3 | 2 % 5").unwrap();
assert_eq!(
got,
binary_op!(unary!(-3), BitwiseOr, binary_op!(2, Mod, 5))
);
let (_, got) = arithmetic_expression("5 % 2 | -3").unwrap();
assert_eq!(
got,
binary_op!(binary_op!(5, Mod, 2), BitwiseOr, unary!(-3))
);
let (_, got) = arithmetic_expression("2 | -3 % 5").unwrap();
assert_eq!(
got,
binary_op!(2, BitwiseOr, binary_op!(unary!(-3), Mod, 5))
);
let (_, got) = arithmetic_expression("5 - -(3 | 2)").unwrap();
assert_eq!(
got,
binary_op!(5, Sub, unary!(-nested!(binary_op!(3, BitwiseOr, 2))))
);
let (_, got) = arithmetic_expression("2 | 5 % 3").unwrap();
assert_eq!(got, binary_op!(2, BitwiseOr, binary_op!(5, Mod, 3)));
// Expressions are still valid when unnecessary whitespace is omitted
let (_, got) = arithmetic_expression("5+51").unwrap();
assert_eq!(got, binary_op!(5, Add, 51));
let (_, got) = arithmetic_expression("5+$foo").unwrap();
assert_eq!(got, binary_op!(5, Add, param!("foo")));
let (_, got) = arithmetic_expression("5- -(3|2)").unwrap();
assert_eq!(
got,
binary_op!(5, Sub, unary!(-nested!(binary_op!(3, BitwiseOr, 2))))
);
// whitespace is not significant between unary operators
let (_, got) = arithmetic_expression("5+-(3|2)").unwrap();
assert_eq!(
got,
binary_op!(5, Add, unary!(-nested!(binary_op!(3, BitwiseOr, 2))))
);
// Fallible cases
// invalid operator / incomplete expression
assert_failure!(arithmetic_expression("5 || 3"));
// TODO: skip until https://github.com/influxdata/influxdb_iox/issues/5663 is implemented
// assert_failure!(arithmetic("5+--(3|2)"));
}
#[test]
fn test_var_ref() {
let (_, got) = var_ref("foo").unwrap();
assert_eq!(got, var_ref!("foo"));
// with cast operator
let (_, got) = var_ref("foo::tag").unwrap();
assert_eq!(got, var_ref!("foo", Tag));
// Fallible cases
assert_expect_error!(var_ref("foo::invalid"), "invalid data type for tag or field reference, expected float, integer, string, boolean, tag or field");
}
#[test]
fn test_spacing_and_remaining_input() {
// Validate that the remaining input is returned
let (got, _) = arithmetic_expression("foo - 1 + 2 LIMIT 10").unwrap();
assert_eq!(got, " LIMIT 10");
// Any whitespace preceding the expression is consumed
let (got, _) = arithmetic_expression(" foo - 1 + 2").unwrap();
assert_eq!(got, "");
// Various whitespace separators are supported between tokens
let (got, _) = arithmetic_expression("foo\n | 1 \t + \n \t3").unwrap();
assert!(got.is_empty())
}
#[test]
fn test_display_expr() {
let (_, e) = arithmetic_expression("5 + 51").unwrap();
let got = format!("{}", e);
assert_eq!(got, "5 + 51");
let (_, e) = arithmetic_expression("5 + -10").unwrap();
let got = format!("{}", e);
assert_eq!(got, "5 + -10");
let (_, e) = arithmetic_expression("-(5 % 6)").unwrap();
let got = format!("{}", e);
assert_eq!(got, "-(5 % 6)");
// vary spacing
let (_, e) = arithmetic_expression("( 5 + 6 ) * -( 7+ 8)").unwrap();
let got = format!("{}", e);
assert_eq!(got, "(5 + 6) * -(7 + 8)");
// multiple unary and parenthesis
let (_, e) = arithmetic_expression("(-(5 + 6) & -+( 7 + 8 ))").unwrap();
let got = format!("{}", e);
assert_eq!(got, "(-(5 + 6) & -+(7 + 8))");
// unquoted identifier
let (_, e) = arithmetic_expression("foo + 5").unwrap();
let got = format!("{}", e);
assert_eq!(got, "foo + 5");
// bind parameter identifier
let (_, e) = arithmetic_expression("foo + $0").unwrap();
let got = format!("{}", e);
assert_eq!(got, "foo + $0");
// quoted identifier
let (_, e) = arithmetic_expression(r#""foo" + 'bar'"#).unwrap();
let got = format!("{}", e);
assert_eq!(got, r#"foo + 'bar'"#);
// Duration
let (_, e) = arithmetic_expression("- 6h30m").unwrap();
let got = format!("{}", e);
assert_eq!(got, "-6h30m");
// Validate other expression types
assert_eq!(format!("{}", Expr::Wildcard(None)), "*");
assert_eq!(
format!("{}", Expr::Wildcard(Some(WildcardType::Field))),
"*::field"
);
assert_eq!(format!("{}", Expr::Distinct("foo".into())), "DISTINCT foo");
// can't parse literal regular expressions as part of an arithmetic expression
assert_failure!(arithmetic_expression(r#""foo" + /^(no|match)$/"#));
}
/// Test call expressions using `ConditionalExpression`
fn call(i: &str) -> ParseResult<&str, Expr> {
call_expression::<TestParsers>(i)
}
#[test]
fn test_call() {
// These tests validate a `Call` expression and also it's Display implementation.
// We don't need to validate Expr trees, as we do that in the conditional and arithmetic
// tests.
// No arguments
let (_, ex) = call("FN()").unwrap();
let got = format!("{}", ex);
assert_eq!(got, "FN()");
// Single argument with surrounding whitespace
let (_, ex) = call("FN ( 1 )").unwrap();
let got = format!("{}", ex);
assert_eq!(got, "FN(1)");
// Multiple arguments with varying whitespace
let (_, ex) = call("FN ( 1,2\n,3,\t4 )").unwrap();
let got = format!("{}", ex);
assert_eq!(got, "FN(1, 2, 3, 4)");
// Arguments as expressions
let (_, ex) = call("FN ( 1 + 2, foo, 'bar' )").unwrap();
let got = format!("{}", ex);
assert_eq!(got, "FN(1 + 2, foo, 'bar')");
// A single regular expression argument
let (_, ex) = call("FN ( /foo/ )").unwrap();
let got = format!("{}", ex);
assert_eq!(got, "FN(/foo/)");
// Fallible cases
call("FN ( 1").unwrap_err();
call("FN ( 1, )").unwrap_err();
call("FN ( 1,, 2 )").unwrap_err();
// Conditionals not supported
call("FN ( 1 = 2 )").unwrap_err();
// Multiple regular expressions not supported
call("FN ( /foo/, /bar/ )").unwrap_err();
}
#[test]
fn test_var_ref_display() {
assert_eq!(
format!(
"{}",
Expr::VarRef {
name: "foo".into(),
data_type: None
}
),
"foo"
);
assert_eq!(
format!(
"{}",
Expr::VarRef {
name: "foo".into(),
data_type: Some(VarRefDataType::Field)
}
),
"foo::field"
);
}
}

View File

@ -0,0 +1,387 @@
use crate::expression::arithmetic::{
arithmetic, call_expression, var_ref, ArithmeticParsers, Expr,
};
use crate::internal::{verify, ParseResult};
use crate::literal::{literal_no_regex, literal_regex, Literal};
use crate::parameter::parameter;
use nom::branch::alt;
use nom::bytes::complete::{tag, tag_no_case};
use nom::character::complete::{char, multispace0};
use nom::combinator::{cut, map, value};
use nom::multi::many0;
use nom::sequence::{delimited, preceded, tuple};
use std::fmt;
use std::fmt::{Display, Formatter, Write};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ConditionalOperator {
Eq, // =
NotEq, // !=
EqRegex, // =~
NotEqRegex, // !~
Lt, // <
LtEq, // <=
Gt, // >
GtEq, // >=
In, // IN
And, // AND
Or, // OR
}
impl Display for ConditionalOperator {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::Eq => f.write_char('='),
Self::NotEq => f.write_str("!="),
Self::EqRegex => f.write_str("=~"),
Self::NotEqRegex => f.write_str("!~"),
Self::Lt => f.write_char('<'),
Self::LtEq => f.write_str("<="),
Self::Gt => f.write_char('>'),
Self::GtEq => f.write_str(">="),
Self::In => f.write_str("IN"),
Self::And => f.write_str("AND"),
Self::Or => f.write_str("OR"),
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub enum ConditionalExpression {
/// Represents an arithmetic expression.
Expr(Box<Expr>),
/// Binary operations, such as `foo = 'bar'` or `true AND false`.
Binary {
lhs: Box<ConditionalExpression>,
op: ConditionalOperator,
rhs: Box<ConditionalExpression>,
},
/// Represents a conditional expression enclosed in parenthesis.
Grouped(Box<ConditionalExpression>),
}
impl Display for ConditionalExpression {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::Expr(v) => fmt::Display::fmt(v, f),
Self::Binary { lhs, op, rhs } => write!(f, "{} {} {}", lhs, op, rhs),
Self::Grouped(v) => write!(f, "({})", v),
}
}
}
impl From<Literal> for ConditionalExpression {
fn from(v: Literal) -> Self {
Self::Expr(Box::new(Expr::Literal(v)))
}
}
/// Parse a parenthesis expression.
fn parens(i: &str) -> ParseResult<&str, ConditionalExpression> {
delimited(
preceded(multispace0, char('(')),
map(conditional_expression, |e| {
ConditionalExpression::Grouped(e.into())
}),
preceded(multispace0, char(')')),
)(i)
}
fn expr_or_group(i: &str) -> ParseResult<&str, ConditionalExpression> {
alt((
map(arithmetic_expression, |v| {
ConditionalExpression::Expr(Box::new(v))
}),
parens,
))(i)
}
/// Parse the conditional regular expression operators `=~` and `!~`.
fn conditional_regex(i: &str) -> ParseResult<&str, ConditionalExpression> {
let (input, f1) = expr_or_group(i)?;
let (input, exprs) = many0(tuple((
preceded(
multispace0,
alt((
value(ConditionalOperator::EqRegex, tag("=~")),
value(ConditionalOperator::NotEqRegex, tag("!~")),
)),
),
map(cut(preceded(multispace0, literal_regex)), From::from),
)))(input)?;
Ok((input, reduce_expr(f1, exprs)))
}
/// Parse conditional operators.
fn conditional(i: &str) -> ParseResult<&str, ConditionalExpression> {
let (input, f1) = conditional_regex(i)?;
let (input, exprs) = many0(tuple((
preceded(
multispace0,
alt((
// try longest matches first
value(ConditionalOperator::LtEq, tag("<=")),
value(ConditionalOperator::GtEq, tag(">=")),
value(ConditionalOperator::NotEq, tag("!=")),
value(ConditionalOperator::Lt, char('<')),
value(ConditionalOperator::Gt, char('>')),
value(ConditionalOperator::Eq, char('=')),
)),
),
cut(conditional_regex),
)))(input)?;
Ok((input, reduce_expr(f1, exprs)))
}
/// Parse conjunction operators, such as `AND`.
fn conjunction(i: &str) -> ParseResult<&str, ConditionalExpression> {
let (input, f1) = conditional(i)?;
let (input, exprs) = many0(tuple((
value(
ConditionalOperator::And,
preceded(multispace0, tag_no_case("and")),
),
cut(conditional),
)))(input)?;
Ok((input, reduce_expr(f1, exprs)))
}
/// Parse disjunction operator, such as `OR`.
fn disjunction(i: &str) -> ParseResult<&str, ConditionalExpression> {
let (input, f1) = conjunction(i)?;
let (input, exprs) = many0(tuple((
value(
ConditionalOperator::Or,
preceded(multispace0, tag_no_case("or")),
),
cut(conjunction),
)))(input)?;
Ok((input, reduce_expr(f1, exprs)))
}
/// Parse an InfluxQL conditional expression.
pub fn conditional_expression(i: &str) -> ParseResult<&str, ConditionalExpression> {
disjunction(i)
}
/// Folds `expr` and `remainder` into a [ConditionalExpression::Binary] tree.
fn reduce_expr(
expr: ConditionalExpression,
remainder: Vec<(ConditionalOperator, ConditionalExpression)>,
) -> ConditionalExpression {
remainder
.into_iter()
.fold(expr, |lhs, val| ConditionalExpression::Binary {
lhs: lhs.into(),
op: val.0,
rhs: val.1.into(),
})
}
/// Returns true if `expr` is a valid [`Expr::Call`] expression for the `now` function.
pub fn is_valid_now_call(expr: &Expr) -> bool {
match expr {
Expr::Call { name, args } => name.to_lowercase() == "now" && args.is_empty(),
_ => false,
}
}
impl ConditionalExpression {
/// Parse the `now()` function call
fn call(i: &str) -> ParseResult<&str, Expr> {
verify(
"invalid expression, the only valid function call is 'now' with no arguments",
call_expression::<Self>,
is_valid_now_call,
)(i)
}
}
impl ArithmeticParsers for ConditionalExpression {
fn operand(i: &str) -> ParseResult<&str, Expr> {
preceded(
multispace0,
alt((
map(literal_no_regex, Expr::Literal),
Self::call,
var_ref,
map(parameter, Expr::BindParameter),
)),
)(i)
}
}
/// Parse an arithmetic expression used by conditional expressions.
fn arithmetic_expression(i: &str) -> ParseResult<&str, Expr> {
arithmetic::<ConditionalExpression>(i)
}
#[cfg(test)]
mod test {
use super::*;
use crate::expression::arithmetic::Expr;
use crate::{
assert_expect_error, assert_failure, binary_op, call, cond_op, grouped, regex, unary,
var_ref,
};
impl From<Expr> for ConditionalExpression {
fn from(v: Expr) -> Self {
Self::Expr(Box::new(v))
}
}
impl From<u64> for Box<ConditionalExpression> {
fn from(v: u64) -> Self {
Self::new(ConditionalExpression::Expr(Box::new(Expr::Literal(
v.into(),
))))
}
}
impl From<Expr> for Box<ConditionalExpression> {
fn from(v: Expr) -> Self {
Self::new(ConditionalExpression::Expr(v.into()))
}
}
impl From<Box<Expr>> for Box<ConditionalExpression> {
fn from(v: Box<Expr>) -> Self {
Self::new(ConditionalExpression::Expr(v))
}
}
#[test]
fn test_arithmetic_expression() {
// now() function call is permitted
let (_, got) = arithmetic_expression("now() + 3").unwrap();
assert_eq!(got, binary_op!(call!("now"), Add, 3));
// Fallible cases
assert_expect_error!(
arithmetic_expression("sum(foo)"),
"invalid expression, the only valid function call is 'now' with no arguments"
);
assert_expect_error!(
arithmetic_expression("now(1)"),
"invalid expression, the only valid function call is 'now' with no arguments"
);
}
#[test]
fn test_conditional_expression() {
let (_, got) = conditional_expression("foo = 5").unwrap();
assert_eq!(got, *cond_op!(var_ref!("foo"), Eq, 5));
let (_, got) = conditional_expression("foo != 5").unwrap();
assert_eq!(got, *cond_op!(var_ref!("foo"), NotEq, 5));
let (_, got) = conditional_expression("foo > 5").unwrap();
assert_eq!(got, *cond_op!(var_ref!("foo"), Gt, 5));
let (_, got) = conditional_expression("foo >= 5").unwrap();
assert_eq!(got, *cond_op!(var_ref!("foo"), GtEq, 5));
let (_, got) = conditional_expression("foo < 5").unwrap();
assert_eq!(got, *cond_op!(var_ref!("foo"), Lt, 5));
let (_, got) = conditional_expression("foo <= 5").unwrap();
assert_eq!(got, *cond_op!(var_ref!("foo"), LtEq, 5));
let (_, got) = conditional_expression("foo > 5 + 6 ").unwrap();
assert_eq!(got, *cond_op!(var_ref!("foo"), Gt, binary_op!(5, Add, 6)));
let (_, got) = conditional_expression("5 <= -6").unwrap();
assert_eq!(got, *cond_op!(5, LtEq, unary!(-6)));
// simple expressions
let (_, got) = conditional_expression("true").unwrap();
assert_eq!(
got,
ConditionalExpression::Expr(Box::new(Expr::Literal(true.into())))
);
// Expressions are still valid when whitespace is omitted
let (_, got) = conditional_expression("foo>5+6 ").unwrap();
assert_eq!(got, *cond_op!(var_ref!("foo"), Gt, binary_op!(5, Add, 6)));
let (_, got) = conditional_expression("5<=-6").unwrap();
assert_eq!(got, *cond_op!(5, LtEq, unary!(-6)));
// var refs with cast operator
let (_, got) = conditional_expression("foo::integer = 5").unwrap();
assert_eq!(got, *cond_op!(var_ref!("foo", Integer), Eq, 5));
// Fallible cases
// conditional expression must be complete
assert_failure!(conditional_expression("5 <="));
// should not accept a regex literal
assert_failure!(conditional_expression("5 = /regex/"));
}
#[test]
fn test_logical_expression() {
let (_, got) = conditional_expression("5 AND 6").unwrap();
assert_eq!(got, *cond_op!(5, And, 6));
let (_, got) = conditional_expression("5 AND 6 OR 7").unwrap();
assert_eq!(got, *cond_op!(cond_op!(5, And, 6), Or, 7));
let (_, got) = conditional_expression("5 > 3 OR 6 = 7 AND 7 != 1").unwrap();
assert_eq!(
got,
*cond_op!(
cond_op!(5, Gt, 3),
Or,
cond_op!(cond_op!(6, Eq, 7), And, cond_op!(7, NotEq, 1))
)
);
let (_, got) = conditional_expression("5 AND (6 OR 7)").unwrap();
assert_eq!(got, *cond_op!(5, And, grouped!(cond_op!(6, Or, 7))));
// Fallible cases
// Expects Expr after operator
assert_failure!(conditional_expression("5 OR -"));
assert_failure!(conditional_expression("5 OR"));
assert_failure!(conditional_expression("5 AND"));
// Can't use "and" as identifier
assert_failure!(conditional_expression("5 AND and OR 5"));
}
#[test]
fn test_regex() {
let (_, got) = conditional_expression("foo =~ /(a > b)/").unwrap();
assert_eq!(got, *cond_op!(var_ref!("foo"), EqRegex, regex!("(a > b)")));
let (_, got) = conditional_expression("foo !~ /bar/").unwrap();
assert_eq!(got, *cond_op!(var_ref!("foo"), NotEqRegex, regex!("bar")));
// Expressions are still valid when whitespace is omitted
let (_, got) = conditional_expression("foo=~/(a > b)/").unwrap();
assert_eq!(got, *cond_op!(var_ref!("foo"), EqRegex, regex!("(a > b)")));
// Fallible cases
// Expects a regex literal after regex conditional operators
assert_failure!(conditional_expression("foo =~ 5"));
assert_failure!(conditional_expression("foo !~ 5"));
}
#[test]
fn test_display_expr() {
let (_, e) = conditional_expression("foo = 'test'").unwrap();
let got = format!("{}", e);
assert_eq!(got, "foo = 'test'");
}
}

View File

@ -0,0 +1,147 @@
#![cfg(test)]
/// Constructs an [crate::expression::arithmetic::Expr::VarRef] expression.
#[macro_export]
macro_rules! var_ref {
($NAME: literal) => {
$crate::expression::arithmetic::Expr::VarRef {
name: $NAME.into(),
data_type: None,
}
};
($NAME: literal, $TYPE: ident) => {
$crate::expression::arithmetic::Expr::VarRef {
name: $NAME.into(),
data_type: Some($crate::expression::arithmetic::VarRefDataType::$TYPE),
}
};
}
/// Constructs a regular expression [crate::expression::arithmetic::Expr::Literal].
#[macro_export]
macro_rules! regex {
($EXPR: expr) => {
$crate::expression::arithmetic::Expr::Literal(
$crate::literal::Literal::Regex($EXPR.into()).into(),
)
};
}
/// Constructs a [crate::expression::arithmetic::Expr::BindParameter] expression.
#[macro_export]
macro_rules! param {
($EXPR: expr) => {
$crate::expression::arithmetic::Expr::BindParameter(
$crate::parameter::BindParameter($EXPR.into()).into(),
)
};
}
/// Constructs a [crate::expression::conditional::ConditionalExpression::Grouped] expression.
#[macro_export]
macro_rules! grouped {
($EXPR: expr) => {
<$crate::expression::conditional::ConditionalExpression as std::convert::Into<
Box<$crate::expression::conditional::ConditionalExpression>,
>>::into($crate::expression::conditional::ConditionalExpression::Grouped($EXPR.into()))
};
}
/// Constructs a [crate::expression::arithmetic::Expr::Nested] expression.
#[macro_export]
macro_rules! nested {
($EXPR: expr) => {
<$crate::expression::arithmetic::Expr as std::convert::Into<
Box<$crate::expression::arithmetic::Expr>,
>>::into($crate::expression::arithmetic::Expr::Nested($EXPR.into()))
};
}
/// Constructs a [crate::expression::arithmetic::Expr::Call] expression.
#[macro_export]
macro_rules! call {
($NAME:literal) => {
$crate::expression::arithmetic::Expr::Call {
name: $NAME.into(),
args: vec![],
}
};
($NAME:literal, $( $ARG:expr ),+) => {
$crate::expression::arithmetic::Expr::Call {
name: $NAME.into(),
args: vec![$( $ARG ),+],
}
};
}
/// Constructs a [crate::expression::arithmetic::Expr::UnaryOp] expression.
#[macro_export]
macro_rules! unary {
(- $EXPR:expr) => {
$crate::expression::arithmetic::Expr::UnaryOp(
$crate::expression::arithmetic::UnaryOperator::Minus,
$EXPR.into(),
)
};
(+ $EXPR:expr) => {
$crate::expression::arithmetic::Expr::UnaryOp(
$crate::expression::arithmetic::UnaryOperator::Plus,
$EXPR.into(),
)
};
}
/// Constructs a [crate::expression::arithmetic::Expr::Distinct] expression.
#[macro_export]
macro_rules! distinct {
($IDENT:literal) => {
$crate::expression::arithmetic::Expr::Distinct($IDENT.into())
};
}
/// Constructs a [crate::expression::arithmetic::Expr::Wildcard] expression.
#[macro_export]
macro_rules! wildcard {
() => {
$crate::expression::arithmetic::Expr::Wildcard(None)
};
(tag) => {
$crate::expression::arithmetic::Expr::Wildcard(Some(
$crate::expression::arithmetic::WildcardType::Tag,
))
};
(field) => {
$crate::expression::arithmetic::Expr::Wildcard(Some(
$crate::expression::arithmetic::WildcardType::Field,
))
};
}
/// Constructs a [crate::expression::arithmetic::Expr::Binary] expression.
#[macro_export]
macro_rules! binary_op {
($LHS: expr, $OP: ident, $RHS: expr) => {
$crate::expression::arithmetic::Expr::Binary {
lhs: $LHS.into(),
op: $crate::expression::arithmetic::BinaryOperator::$OP,
rhs: $RHS.into(),
}
};
}
/// Constructs a [crate::expression::conditional::ConditionalExpression::Binary] expression.
#[macro_export]
macro_rules! cond_op {
($LHS: expr, $OP: ident, $RHS: expr) => {
<$crate::expression::conditional::ConditionalExpression as std::convert::Into<
Box<$crate::expression::conditional::ConditionalExpression>,
>>::into(
$crate::expression::conditional::ConditionalExpression::Binary {
lhs: $LHS.into(),
op: $crate::expression::conditional::ConditionalOperator::$OP,
rhs: $RHS.into(),
},
)
};
}

View File

@ -11,8 +11,6 @@
//! [identifier]: https://docs.influxdata.com/influxdb/v1.8/query_language/spec/#identifiers
//! [keywords]: https://docs.influxdata.com/influxdb/v1.8/query_language/spec/#keywords
#![allow(dead_code)]
use crate::internal::ParseResult;
use crate::keywords::sql_keyword;
use crate::string::double_quoted_string;

View File

@ -2,6 +2,7 @@
//!
use nom::error::{ErrorKind as NomErrorKind, ParseError as NomParseError};
use nom::Parser;
use std::borrow::Borrow;
use std::fmt::{Display, Formatter};
/// This trait must be implemented in order to use the [`map_fail`] and
@ -74,6 +75,30 @@ where
}
}
/// Returns the result of `f` if it satisfies `is_valid`; otherwise,
/// returns an error using the specified `message`.
pub fn verify<'a, O1, O2, E: ParseError<'a>, F, G>(
message: &'static str,
mut f: F,
is_valid: G,
) -> impl FnMut(&'a str) -> ParseResult<&'a str, O1, E>
where
F: Parser<&'a str, O1, E>,
G: Fn(&O2) -> bool,
O1: Borrow<O2>,
O2: ?Sized,
{
move |i: &str| {
let (remain, o) = f.parse(i)?;
if is_valid(o.borrow()) {
Ok((remain, o))
} else {
Err(nom::Err::Failure(E::from_message(i, message)))
}
}
}
impl<I> NomParseError<I> for Error<I> {
fn from_error_kind(input: I, kind: NomErrorKind) -> Self {
Self::Nom(input, kind)

View File

@ -2,8 +2,6 @@
//!
//! [keywords]: https://docs.influxdata.com/influxdb/v1.8/query_language/spec/#keywords
#![allow(dead_code)]
use crate::internal::ParseResult;
use nom::branch::alt;
use nom::bytes::complete::{tag, tag_no_case};

View File

@ -35,6 +35,7 @@ mod internal;
mod keywords;
mod literal;
mod parameter;
mod select;
mod show;
mod show_field_keys;
mod show_measurements;
@ -55,8 +56,7 @@ pub struct ParseError {
impl Display for ParseError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{} at pos {}", self.message, self.pos)?;
Ok(())
write!(f, "{} at pos {}", self.message, self.pos)
}
}

View File

@ -1,14 +1,13 @@
#![allow(dead_code)]
use crate::internal::{map_fail, ParseResult};
use crate::string::{regex, single_quoted_string, Regex};
use crate::write_escaped;
use nom::branch::alt;
use nom::bytes::complete::{tag, tag_no_case};
use nom::character::complete::digit1;
use nom::combinator::{map, recognize, value};
use nom::character::complete::{char, digit1, multispace0};
use nom::combinator::{map, opt, recognize, value};
use nom::multi::fold_many1;
use nom::sequence::{pair, separated_pair};
use nom::sequence::{pair, preceded, separated_pair};
use std::fmt;
use std::fmt::{Display, Formatter, Write};
/// Number of nanoseconds in a microsecond.
@ -87,19 +86,17 @@ impl From<Regex> for Literal {
impl Display for Literal {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::Unsigned(v) => write!(f, "{}", v)?,
Self::Float(v) => write!(f, "{}", v)?,
Self::Unsigned(v) => write!(f, "{}", v),
Self::Float(v) => write!(f, "{}", v),
Self::String(v) => {
f.write_char('\'')?;
write_escaped!(f, v, '\n' => "\\n", '\\' => "\\\\", '\'' => "\\'", '"' => "\\\"");
f.write_char('\'')?;
f.write_char('\'')
}
Self::Boolean(v) => write!(f, "{}", if *v { "true" } else { "false" })?,
Self::Duration(v) => write!(f, "{}", v)?,
Self::Regex(v) => write!(f, "{}", v)?,
Self::Boolean(v) => write!(f, "{}", if *v { "true" } else { "false" }),
Self::Duration(v) => write!(f, "{}", v),
Self::Regex(v) => write!(f, "{}", v),
}
Ok(())
}
}
@ -121,7 +118,7 @@ fn integer(i: &str) -> ParseResult<&str, i64> {
/// ```text
/// INTEGER ::= [0-9]+
/// ```
fn unsigned_integer(i: &str) -> ParseResult<&str, u64> {
pub fn unsigned_integer(i: &str) -> ParseResult<&str, u64> {
map_fail("unable to parse unsigned integer", digit1, &str::parse)(i)
}
@ -141,6 +138,50 @@ fn float(i: &str) -> ParseResult<&str, f64> {
)(i)
}
/// Represents any signed number.
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum Number {
Integer(i64),
Float(f64),
}
impl Display for Number {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::Integer(v) => fmt::Display::fmt(v, f),
Self::Float(v) => fmt::Display::fmt(v, f),
}
}
}
impl From<f64> for Number {
fn from(v: f64) -> Self {
Self::Float(v)
}
}
impl From<i64> for Number {
fn from(v: i64) -> Self {
Self::Integer(v)
}
}
/// Parse a signed [`Number`].
pub fn number(i: &str) -> ParseResult<&str, Number> {
let (remaining, sign) = opt(alt((char('-'), char('+'))))(i)?;
preceded(
multispace0,
alt((
map(float, move |v| {
Number::Float(v * if let Some('-') = sign { -1.0 } else { 1.0 })
}),
map(integer, move |v| {
Number::Integer(v * if let Some('-') = sign { -1 } else { 1 })
}),
)),
)(remaining)
}
/// Parse the input for an InfluxQL boolean, which must be the value `true` or `false`.
fn boolean(i: &str) -> ParseResult<&str, bool> {
alt((
@ -234,8 +275,8 @@ fn single_duration(i: &str) -> ParseResult<&str, i64> {
)(i)
}
/// Parse the input for an InfluxQL duration and returns the value in nanoseconds.
fn duration(i: &str) -> ParseResult<&str, Duration> {
/// Parse the input for an InfluxQL duration.
pub fn duration(i: &str) -> ParseResult<&str, Duration> {
map(
fold_many1(single_duration, || 0, |acc, fragment| acc + fragment),
Duration,
@ -244,8 +285,8 @@ fn duration(i: &str) -> ParseResult<&str, Duration> {
/// Parse an InfluxQL literal, except a [`Regex`].
///
/// See [`literal_regex`] for parsing literal regular expressions.
pub fn literal(i: &str) -> ParseResult<&str, Literal> {
/// Use [`literal`] for parsing any literals, excluding regular expressions.
pub fn literal_no_regex(i: &str) -> ParseResult<&str, Literal> {
alt((
// NOTE: order is important, as floats should be tested before durations and integers.
map(float, Literal::Float),
@ -256,6 +297,11 @@ pub fn literal(i: &str) -> ParseResult<&str, Literal> {
))(i)
}
/// Parse any InfluxQL literal.
pub fn literal(i: &str) -> ParseResult<&str, Literal> {
alt((literal_no_regex, map(regex, Literal::Regex)))(i)
}
/// Parse an InfluxQL literal regular expression.
pub fn literal_regex(i: &str) -> ParseResult<&str, Literal> {
map(regex, Literal::Regex)(i)
@ -264,34 +310,42 @@ pub fn literal_regex(i: &str) -> ParseResult<&str, Literal> {
#[cfg(test)]
mod test {
use super::*;
use assert_matches::assert_matches;
#[test]
fn test_literal_no_regex() {
let (_, got) = literal_no_regex("42").unwrap();
assert_matches!(got, Literal::Unsigned(42));
let (_, got) = literal_no_regex("42.69").unwrap();
assert_matches!(got, Literal::Float(v) if v == 42.69);
let (_, got) = literal_no_regex("'quick draw'").unwrap();
assert_matches!(got, Literal::String(v) if v == "quick draw");
let (_, got) = literal_no_regex("false").unwrap();
assert_matches!(got, Literal::Boolean(false));
let (_, got) = literal_no_regex("true").unwrap();
assert_matches!(got, Literal::Boolean(true));
let (_, got) = literal_no_regex("3h25m").unwrap();
assert_matches!(got, Literal::Duration(v) if v == Duration(3 * NANOS_PER_HOUR + 25 * NANOS_PER_MIN));
// Fallible cases
literal_no_regex("/foo/").unwrap_err();
}
#[test]
fn test_literal() {
let (_, got) = literal("42").unwrap();
assert!(matches!(got, Literal::Unsigned(42)));
let (_, got) = literal("42.69").unwrap();
assert!(matches!(got, Literal::Float(v) if v == 42.69));
let (_, got) = literal("'quick draw'").unwrap();
assert!(matches!(got, Literal::String(v) if v == "quick draw"));
let (_, got) = literal("false").unwrap();
assert!(matches!(got, Literal::Boolean(false)));
let (_, got) = literal("true").unwrap();
assert!(matches!(got, Literal::Boolean(true)));
let (_, got) = literal("3h25m").unwrap();
assert!(
matches!(got, Literal::Duration(v) if v == Duration(3 * NANOS_PER_HOUR + 25 * NANOS_PER_MIN))
);
let (_, got) = literal("/^(match|this)$/").unwrap();
assert_matches!(got, Literal::Regex(v) if v == "^(match|this)$".into());
}
#[test]
fn test_literal_regex() {
let (_, got) = literal_regex("/^(match|this)$/").unwrap();
assert!(matches!(got, Literal::Regex(v) if v == "^(match|this)$".into() ));
assert_matches!(got, Literal::Regex(v) if v == "^(match|this)$".into());
}
#[test]
@ -411,4 +465,39 @@ mod test {
let got = format!("{}", d);
assert_eq!(got, "20w6d13h11m10s9ms8us500ns");
}
#[test]
fn test_number() {
// Test floating point numbers
let (_, got) = number("55.3").unwrap();
assert_matches!(got, Number::Float(v) if v == 55.3);
let (_, got) = number("-18.9").unwrap();
assert_matches!(got, Number::Float(v) if v == -18.9);
let (_, got) = number("- 18.9").unwrap();
assert_matches!(got, Number::Float(v) if v == -18.9);
let (_, got) = number("+33.1").unwrap();
assert_matches!(got, Number::Float(v) if v == 33.1);
let (_, got) = number("+ 33.1").unwrap();
assert_matches!(got, Number::Float(v) if v == 33.1);
// Test integers
let (_, got) = number("42").unwrap();
assert_matches!(got, Number::Integer(v) if v == 42);
let (_, got) = number("-32").unwrap();
assert_matches!(got, Number::Integer(v) if v == -32);
let (_, got) = number("- 32").unwrap();
assert_matches!(got, Number::Integer(v) if v == -32);
let (_, got) = number("+501").unwrap();
assert_matches!(got, Number::Integer(v) if v == 501);
let (_, got) = number("+ 501").unwrap();
assert_matches!(got, Number::Integer(v) if v == 501);
}
}

View File

@ -7,8 +7,6 @@
//! [bind parameter]: https://docs.influxdata.com/influxdb/v1.8/tools/api/#bind-parameters
//! [implementation]: https://github.com/influxdata/influxql/blob/df51a45762be9c1b578f01718fa92d286a843fe9/scanner.go#L57-L62
#![allow(dead_code)]
use crate::internal::ParseResult;
use crate::string::double_quoted_string;
use crate::write_quoted_string;

File diff suppressed because it is too large Load Diff

View File

@ -45,8 +45,7 @@ pub struct ShowDatabasesStatement;
impl Display for ShowDatabasesStatement {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.write_str("SHOW DATABASES")?;
Ok(())
f.write_str("SHOW DATABASES")
}
}
@ -69,7 +68,7 @@ fn show_tag(i: &str) -> ParseResult<&str, Statement> {
preceded(
pair(tag_no_case("TAG"), multispace1),
expect(
"invalid SHOW TAG statement, expected KEYS or VALUES following TAG",
"invalid SHOW TAG statement, expected KEYS or VALUES",
alt((
map(show_tag_keys, |s| Statement::ShowTagKeys(Box::new(s))),
map(show_tag_values, |s| Statement::ShowTagValues(Box::new(s))),
@ -109,7 +108,7 @@ mod test {
assert_expect_error!(
show_statement("SHOW TAG FOO WITH KEY = some_key"),
"invalid SHOW TAG statement, expected KEYS or VALUES following TAG"
"invalid SHOW TAG statement, expected KEYS or VALUES"
);
// Unsupported SHOW

View File

@ -69,7 +69,7 @@ pub fn show_field_keys(i: &str) -> ParseResult<&str, ShowFieldKeysStatement> {
tag_no_case("FIELD"),
multispace1,
expect(
"invalid SHOW FIELD KEYS statement, expect KEYS following FIELD",
"invalid SHOW FIELD KEYS statement, expected KEYS",
tag_no_case("KEYS"),
),
opt(preceded(multispace1, on_clause)),
@ -134,7 +134,7 @@ mod test {
// Fallible cases
assert_expect_error!(
show_field_keys("FIELD ON db"),
"invalid SHOW FIELD KEYS statement, expect KEYS following FIELD"
"invalid SHOW FIELD KEYS statement, expected KEYS"
);
}
}

View File

@ -2,8 +2,6 @@
//!
//! [sql]: https://docs.influxdata.com/influxdb/v1.8/query_language/explore-schema/#show-measurements
#![allow(dead_code)]
use crate::internal::{expect, ParseResult};
use nom::branch::alt;
use nom::bytes::complete::{tag, tag_no_case};
@ -18,7 +16,7 @@ use crate::common::{
limit_clause, measurement_name_expression, offset_clause, where_clause,
MeasurementNameExpression,
};
use crate::expression::Expr;
use crate::expression::conditional::ConditionalExpression;
use crate::identifier::{identifier, Identifier};
use crate::string::{regex, Regex};
@ -66,10 +64,19 @@ fn on_clause(i: &str) -> ParseResult<&str, OnExpression> {
#[derive(Clone, Debug, Default, PartialEq)]
pub struct ShowMeasurementsStatement {
/// Limit the search to databases matching the expression.
pub on_expression: Option<OnExpression>,
/// Limit the search to measurements matching the expression.
pub measurement_expression: Option<MeasurementExpression>,
pub condition: Option<Expr>,
/// A conditional expression to filter the measurement list.
pub condition: Option<ConditionalExpression>,
/// A value to restrict the number of tag keys returned.
pub limit: Option<u64>,
/// A value to specify an offset to start retrieving tag keys.
pub offset: Option<u64>,
}
@ -136,7 +143,7 @@ fn with_measurement_clause(i: &str) -> ParseResult<&str, MeasurementExpression>
tuple((
tag("=~"),
multispace0,
expect("expected regex literal", regex),
expect("expected regular expression literal", regex),
)),
|(_, _, regex)| MeasurementExpression::Regex(regex),
),
@ -193,6 +200,8 @@ pub fn show_measurements(i: &str) -> ParseResult<&str, ShowMeasurementsStatement
mod test {
use super::*;
use crate::assert_expect_error;
use crate::expression::arithmetic::Expr;
use assert_matches::assert_matches;
#[test]
fn test_show_measurements() {
@ -229,7 +238,7 @@ mod test {
name: "bar".into(),
}
)),
condition: Some(Expr::Literal(true.into())),
condition: Some(Expr::Literal(true.into()).into()),
limit: Some(10),
offset: Some(20)
},
@ -247,7 +256,7 @@ mod test {
ShowMeasurementsStatement {
on_expression: Some(OnExpression::Database("foo".into())),
measurement_expression: Some(MeasurementExpression::Regex(Regex("bar".into()))),
condition: Some(Expr::Literal(true.into())),
condition: Some(Expr::Literal(true.into()).into()),
limit: None,
offset: None
},
@ -321,13 +330,10 @@ mod test {
);
let (_, got) = on_clause("ON *").unwrap();
assert!(matches!(got, OnExpression::AllDatabases));
assert_matches!(got, OnExpression::AllDatabases);
let (_, got) = on_clause("ON *.*").unwrap();
assert!(matches!(
got,
OnExpression::AllDatabasesAndRetentionPolicies
));
assert_matches!(got, OnExpression::AllDatabasesAndRetentionPolicies);
assert_expect_error!(
on_clause("ON WHERE cpu = 'test'"),
@ -376,7 +382,7 @@ mod test {
// Must have a regex for equal regex operator
assert_expect_error!(
with_measurement_clause("WITH measurement =~ foo"),
"expected regex literal"
"expected regular expression literal"
);
// Unsupported regex not equal operator

View File

@ -1,5 +1,5 @@
use crate::common::{limit_clause, offset_clause, where_clause};
use crate::expression::Expr;
use crate::expression::conditional::ConditionalExpression;
use crate::identifier::Identifier;
use crate::internal::ParseResult;
use crate::show::on_clause;
@ -23,7 +23,7 @@ pub struct ShowTagKeysStatement {
pub from: Option<ShowFromClause>,
/// A conditional expression to filter the tag keys.
pub condition: Option<Expr>,
pub condition: Option<ConditionalExpression>,
/// A value to restrict the number of tag keys returned.
pub limit: Option<u64>,

View File

@ -1,5 +1,5 @@
use crate::common::{limit_clause, offset_clause, where_clause};
use crate::expression::Expr;
use crate::common::{limit_clause, offset_clause, where_clause, OneOrMore};
use crate::expression::conditional::ConditionalExpression;
use crate::identifier::{identifier, Identifier};
use crate::internal::{expect, ParseResult};
use crate::show::on_clause;
@ -9,10 +9,9 @@ use nom::branch::alt;
use nom::bytes::complete::{tag, tag_no_case};
use nom::character::complete::{char, multispace0, multispace1};
use nom::combinator::{map, opt};
use nom::multi::separated_list1;
use nom::sequence::{delimited, pair, preceded, tuple};
use nom::sequence::{delimited, preceded, tuple};
use std::fmt;
use std::fmt::{Display, Formatter, Write};
use std::fmt::{Display, Formatter};
/// Represents a `SHOW TAG VALUES` InfluxQL statement.
#[derive(Clone, Debug, PartialEq)]
@ -30,7 +29,7 @@ pub struct ShowTagValuesStatement {
pub with_key: WithKeyExpression,
/// A conditional expression to filter the tag keys.
pub condition: Option<Expr>,
pub condition: Option<ConditionalExpression>,
/// A value to restrict the number of tag keys returned.
pub limit: Option<u64>,
@ -39,7 +38,7 @@ pub struct ShowTagValuesStatement {
pub offset: Option<u64>,
}
impl fmt::Display for ShowTagValuesStatement {
impl Display for ShowTagValuesStatement {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(f, "SHOW TAG VALUES")?;
@ -87,7 +86,7 @@ pub fn show_tag_values(i: &str) -> ParseResult<&str, ShowTagValuesStatement> {
opt(preceded(multispace1, on_clause)),
opt(preceded(multispace1, show_from_clause)),
expect(
"invalid SHOW TAG VALUES statement, expect WITH KEY clause",
"invalid SHOW TAG VALUES statement, expected WITH KEY clause",
preceded(multispace1, with_key_clause),
),
opt(preceded(multispace1, where_clause)),
@ -108,6 +107,8 @@ pub fn show_tag_values(i: &str) -> ParseResult<&str, ShowTagValuesStatement> {
))
}
pub type InList = OneOrMore<Identifier>;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum WithKeyExpression {
Eq(Identifier),
@ -115,10 +116,7 @@ pub enum WithKeyExpression {
EqRegex(Regex),
NotEqRegex(Regex),
/// IN expression
In {
first: Identifier,
rest: Option<Vec<Identifier>>,
},
In(InList),
}
impl Display for WithKeyExpression {
@ -126,41 +124,22 @@ impl Display for WithKeyExpression {
f.write_str("WITH KEY ")?;
match self {
Self::Eq(v) => write!(f, "= {}", v)?,
Self::NotEq(v) => write!(f, "!= {}", v)?,
Self::EqRegex(v) => write!(f, "=~ {}", v)?,
Self::NotEqRegex(v) => write!(f, "=! {}", v)?,
Self::In { first, rest } => {
write!(f, "IN ({}", first)?;
if let Some(rest) = rest {
for ident in rest {
write!(f, ", {}", ident)?;
}
}
f.write_char(')')?;
}
};
Ok(())
Self::Eq(v) => write!(f, "= {}", v),
Self::NotEq(v) => write!(f, "!= {}", v),
Self::EqRegex(v) => write!(f, "=~ {}", v),
Self::NotEqRegex(v) => write!(f, "=! {}", v),
Self::In(list) => write!(f, "IN ({})", list),
}
}
}
/// Parse an identifier list, as expected by the `WITH KEY IN` clause.
fn identifier_list(i: &str) -> ParseResult<&str, (Identifier, Option<Vec<Identifier>>)> {
fn identifier_list(i: &str) -> ParseResult<&str, InList> {
delimited(
preceded(multispace0, char('(')),
tuple((
preceded(multispace0, identifier),
opt(preceded(
pair(multispace0, char(',')),
separated_list1(
preceded(multispace0, char(',')),
preceded(multispace0, identifier),
),
)),
)),
InList::separated_list1("invalid IN clause, expected identifier"),
expect(
"invalid identifier list, missing ')'",
"invalid identifier list, expected ')'",
preceded(multispace0, char(')')),
),
)(i)
@ -171,25 +150,28 @@ fn with_key_clause(i: &str) -> ParseResult<&str, WithKeyExpression> {
tuple((
tag_no_case("WITH"),
multispace1,
expect(
"invalid WITH KEY clause, expect KEY following WITH",
tag_no_case("KEY"),
),
expect("invalid WITH KEY clause, expected KEY", tag_no_case("KEY")),
)),
expect(
"invalid WITH KEY clause, expect condition",
"invalid WITH KEY clause, expected condition",
alt((
map(
preceded(
delimited(multispace0, tag("=~"), multispace0),
expect("invalid WITH KEY clause, expect regex following =~", regex),
expect(
"invalid WITH KEY clause, expected regular expression following =~",
regex,
),
),
WithKeyExpression::EqRegex,
),
map(
preceded(
delimited(multispace0, tag("!~"), multispace0),
expect("invalid WITH KEY clause, expect regex following =!", regex),
expect(
"invalid WITH KEY clause, expected regular expression following =!",
regex,
),
),
WithKeyExpression::NotEqRegex,
),
@ -197,7 +179,7 @@ fn with_key_clause(i: &str) -> ParseResult<&str, WithKeyExpression> {
preceded(
delimited(multispace0, char('='), multispace0),
expect(
"invalid WITH KEY clause, expect identifier following =",
"invalid WITH KEY clause, expected identifier following =",
identifier,
),
),
@ -207,7 +189,7 @@ fn with_key_clause(i: &str) -> ParseResult<&str, WithKeyExpression> {
preceded(
delimited(multispace0, tag("!="), multispace0),
expect(
"invalid WITH KEY clause, expect identifier following !=",
"invalid WITH KEY clause, expected identifier following !=",
identifier,
),
),
@ -217,11 +199,11 @@ fn with_key_clause(i: &str) -> ParseResult<&str, WithKeyExpression> {
preceded(
preceded(multispace1, tag("IN")),
expect(
"invalid WITH KEY clause, expect identifier list following IN",
"invalid WITH KEY clause, expected identifier list following IN",
identifier_list,
),
),
|(first, rest)| WithKeyExpression::In { first, rest },
WithKeyExpression::In,
),
)),
),
@ -316,21 +298,16 @@ mod test {
assert_eq!(got, WithKeyExpression::NotEqRegex("foo".into()));
let (_, got) = with_key_clause("WITH KEY IN (foo)").unwrap();
assert_eq!(
got,
WithKeyExpression::In {
first: "foo".into(),
rest: None
}
);
assert_eq!(got, WithKeyExpression::In(InList::new(vec!["foo".into()])));
let (_, got) = with_key_clause("WITH KEY IN (foo, bar, \"foo bar\")").unwrap();
assert_eq!(
got,
WithKeyExpression::In {
first: "foo".into(),
rest: Some(vec!["bar".into(), "foo bar".into()])
}
WithKeyExpression::In(InList::new(vec![
"foo".into(),
"bar".into(),
"foo bar".into()
]))
);
// Expressions are still valid when whitespace is omitted
@ -341,52 +318,52 @@ mod test {
assert_expect_error!(
with_key_clause("WITH = foo"),
"invalid WITH KEY clause, expect KEY following WITH"
"invalid WITH KEY clause, expected KEY"
);
assert_expect_error!(
with_key_clause("WITH KEY"),
"invalid WITH KEY clause, expect condition"
"invalid WITH KEY clause, expected condition"
);
assert_expect_error!(
with_key_clause("WITH KEY foo"),
"invalid WITH KEY clause, expect condition"
"invalid WITH KEY clause, expected condition"
);
assert_expect_error!(
with_key_clause("WITH KEY = /foo/"),
"invalid WITH KEY clause, expect identifier following ="
"invalid WITH KEY clause, expected identifier following ="
);
assert_expect_error!(
with_key_clause("WITH KEY IN = foo"),
"invalid WITH KEY clause, expect identifier list following IN"
"invalid WITH KEY clause, expected identifier list following IN"
);
}
#[test]
fn test_identifier_list() {
let (_, got) = identifier_list("(foo)").unwrap();
assert_eq!(got, ("foo".into(), None));
assert_eq!(got, InList::new(vec!["foo".into()]));
// Test first and rest as well as removing unnecessary whitespace
let (_, got) = identifier_list("( foo, bar,\"foo bar\" )").unwrap();
assert_eq!(
got,
("foo".into(), Some(vec!["bar".into(), "foo bar".into()]))
InList::new(vec!["foo".into(), "bar".into(), "foo bar".into()])
);
// Fallible cases
assert_expect_error!(
identifier_list("(foo"),
"invalid identifier list, missing ')'"
"invalid identifier list, expected ')'"
);
assert_expect_error!(
identifier_list("(foo bar)"),
"invalid identifier list, missing ')'"
"invalid identifier list, expected ')'"
);
}
}

View File

@ -1,20 +1,15 @@
use crate::common::{measurement_name_expression, MeasurementNameExpression};
use crate::common::{measurement_name_expression, MeasurementNameExpression, OneOrMore, Parser};
use crate::identifier::{identifier, Identifier};
use crate::internal::{expect, ParseResult};
use crate::internal::ParseResult;
use crate::string::{regex, Regex};
use nom::branch::alt;
use nom::bytes::complete::tag_no_case;
use nom::character::complete::{char, multispace0, multispace1};
use nom::combinator::{map, opt};
use nom::multi::separated_list1;
use nom::sequence::{pair, preceded, tuple};
use nom::character::complete::multispace1;
use nom::combinator::map;
use nom::sequence::{pair, preceded};
use std::fmt;
use std::fmt::Formatter;
pub trait Parser: Sized {
fn parse(i: &str) -> ParseResult<&str, Self>;
}
/// Represents a single measurement selection found in a `FROM` measurement clause.
#[derive(Clone, Debug, Eq, PartialEq)]
pub enum MeasurementSelection<T: Parser> {
@ -22,6 +17,15 @@ pub enum MeasurementSelection<T: Parser> {
Regex(Regex),
}
impl<T: Parser> Parser for MeasurementSelection<T> {
fn parse(i: &str) -> ParseResult<&str, Self> {
alt((
map(T::parse, MeasurementSelection::Name),
map(regex, MeasurementSelection::Regex),
))(i)
}
}
impl<T: fmt::Display + Parser> fmt::Display for MeasurementSelection<T> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match self {
@ -39,56 +43,14 @@ impl<T: fmt::Display + Parser> fmt::Display for MeasurementSelection<T> {
/// for measurements names.
///
/// A `FROM` clause for a number of `SHOW` statements can accept a 3-part measurement name or
/// regular expression.
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct FromMeasurementClause<T: Parser> {
pub first: MeasurementSelection<T>,
pub rest: Option<Vec<MeasurementSelection<T>>>,
}
pub type FromMeasurementClause<U> = OneOrMore<MeasurementSelection<U>>;
impl<T: fmt::Display + Parser> fmt::Display for FromMeasurementClause<T> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
fmt::Display::fmt(&self.first, f)?;
if let Some(ref rest) = self.rest {
for arg in rest {
write!(f, ", {}", arg)?;
}
}
Ok(())
}
}
fn measurement_selection<T: Parser>(i: &str) -> ParseResult<&str, MeasurementSelection<T>> {
alt((
map(T::parse, MeasurementSelection::Name),
map(regex, MeasurementSelection::Regex),
))(i)
}
fn from_clause<T: Parser>(i: &str) -> ParseResult<&str, FromMeasurementClause<T>> {
// NOTE: This combinator is optimised to parse
map(
preceded(
pair(tag_no_case("FROM"), multispace1),
expect(
"invalid FROM clause, expected one or more identifiers or regexes",
tuple((
measurement_selection,
opt(preceded(
pair(multispace0, char(',')),
expect(
"invalid FROM clause, expected identifier after ,",
separated_list1(
preceded(multispace0, char(',')),
preceded(multispace0, measurement_selection),
),
),
)),
)),
),
fn from_clause<T: Parser + fmt::Display>(i: &str) -> ParseResult<&str, FromMeasurementClause<T>> {
preceded(
pair(tag_no_case("FROM"), multispace1),
FromMeasurementClause::<T>::separated_list1(
"invalid FROM clause, expected identifier or regular expression",
),
|(first, rest)| FromMeasurementClause { first, rest },
)(i)
}
@ -162,62 +124,38 @@ mod test {
let (_, from) = show_from_clause("FROM c").unwrap();
assert_eq!(
from,
ShowFromClause {
first: Name(MeasurementNameExpression {
database: None,
retention_policy: None,
name: "c".into()
}),
rest: None
}
ShowFromClause::new(vec![Name(MeasurementNameExpression::new("c".into()))])
);
let (_, from) = show_from_clause("FROM a..c").unwrap();
assert_eq!(
from,
ShowFromClause {
first: Name(MeasurementNameExpression {
database: Some("a".into()),
retention_policy: None,
name: "c".into()
}),
rest: None
}
ShowFromClause::new(vec![Name(MeasurementNameExpression::new_db(
"c".into(),
"a".into()
))])
);
let (_, from) = show_from_clause("FROM a.b.c").unwrap();
assert_eq!(
from,
ShowFromClause {
first: Name(MeasurementNameExpression {
database: Some("a".into()),
retention_policy: Some("b".into()),
name: "c".into()
}),
rest: None
}
ShowFromClause::new(vec![Name(MeasurementNameExpression::new_db_rp(
"c".into(),
"a".into(),
"b".into()
))])
);
let (_, from) = show_from_clause("FROM /reg/").unwrap();
assert_eq!(
from,
ShowFromClause {
first: Regex("reg".into()),
rest: None
}
);
assert_eq!(from, ShowFromClause::new(vec![Regex("reg".into())]));
let (_, from) = show_from_clause("FROM c, /reg/").unwrap();
assert_eq!(
from,
ShowFromClause {
first: Name(MeasurementNameExpression {
database: None,
retention_policy: None,
name: "c".into()
}),
rest: Some(vec![Regex("reg".into())]),
}
ShowFromClause::new(vec![
Name(MeasurementNameExpression::new("c".into())),
Regex("reg".into())
])
);
}
@ -226,41 +164,20 @@ mod test {
use crate::simple_from_clause::MeasurementSelection::*;
let (_, from) = delete_from_clause("FROM c").unwrap();
assert_eq!(
from,
DeleteFromClause {
first: Name("c".into()),
rest: None
}
);
assert_eq!(from, DeleteFromClause::new(vec![Name("c".into())]));
let (_, from) = delete_from_clause("FROM /reg/").unwrap();
assert_eq!(
from,
DeleteFromClause {
first: Regex("reg".into()),
rest: None
}
);
assert_eq!(from, DeleteFromClause::new(vec![Regex("reg".into())]));
let (_, from) = delete_from_clause("FROM c, /reg/").unwrap();
assert_eq!(
from,
DeleteFromClause {
first: Name("c".into()),
rest: Some(vec![Regex("reg".into())]),
}
DeleteFromClause::new(vec![Name("c".into()), Regex("reg".into())])
);
// Demonstrate that the 3-part name is not parsed
let (i, from) = delete_from_clause("FROM a.b.c").unwrap();
assert_eq!(
from,
DeleteFromClause {
first: Name("a".into()),
rest: None,
}
);
assert_eq!(from, DeleteFromClause::new(vec![Name("a".into())]));
// The remaining input will fail in a later parser
assert_eq!(i, ".b.c");
}

View File

@ -1,6 +1,7 @@
use crate::delete::{delete_statement, DeleteStatement};
use crate::drop::{drop_statement, DropMeasurementStatement};
use crate::internal::ParseResult;
use crate::select::{select_statement, SelectStatement};
use crate::show::{show_statement, ShowDatabasesStatement};
use crate::show_field_keys::ShowFieldKeysStatement;
use crate::show_measurements::ShowMeasurementsStatement;
@ -18,6 +19,8 @@ pub enum Statement {
Delete(Box<DeleteStatement>),
/// Represents a `DROP MEASUREMENT` statement.
DropMeasurement(Box<DropMeasurementStatement>),
/// Represents a `SELECT` statement.
Select(Box<SelectStatement>),
/// Represents a `SHOW DATABASES` statement.
ShowDatabases(Box<ShowDatabasesStatement>),
/// Represents a `SHOW MEASUREMENTS` statement.
@ -35,17 +38,16 @@ pub enum Statement {
impl Display for Statement {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::Delete(s) => Display::fmt(s, f)?,
Self::DropMeasurement(s) => Display::fmt(s, f)?,
Self::ShowDatabases(s) => Display::fmt(s, f)?,
Self::ShowMeasurements(s) => Display::fmt(s, f)?,
Self::ShowRetentionPolicies(s) => Display::fmt(s, f)?,
Self::ShowTagKeys(s) => Display::fmt(s, f)?,
Self::ShowTagValues(s) => Display::fmt(s, f)?,
Self::ShowFieldKeys(s) => Display::fmt(s, f)?,
};
Ok(())
Self::Delete(s) => Display::fmt(s, f),
Self::DropMeasurement(s) => Display::fmt(s, f),
Self::Select(s) => Display::fmt(s, f),
Self::ShowDatabases(s) => Display::fmt(s, f),
Self::ShowMeasurements(s) => Display::fmt(s, f),
Self::ShowRetentionPolicies(s) => Display::fmt(s, f),
Self::ShowTagKeys(s) => Display::fmt(s, f),
Self::ShowTagValues(s) => Display::fmt(s, f),
Self::ShowFieldKeys(s) => Display::fmt(s, f),
}
}
}
@ -54,6 +56,7 @@ pub fn statement(i: &str) -> ParseResult<&str, Statement> {
alt((
map(delete_statement, |s| Statement::Delete(Box::new(s))),
map(drop_statement, |s| Statement::DropMeasurement(Box::new(s))),
map(select_statement, |s| Statement::Select(Box::new(s))),
show_statement,
))(i)
}
@ -64,15 +67,21 @@ mod test {
#[test]
fn test_statement() {
// validate one of each statement parser is accepted
// Validate one of each statement parser is accepted and that all input is consumed
// delete_statement combinator
statement("DELETE FROM foo").unwrap();
let (got, _) = statement("DELETE FROM foo").unwrap();
assert_eq!(got, "");
// drop_statement combinator
statement("DROP MEASUREMENT foo").unwrap();
let (got, _) = statement("DROP MEASUREMENT foo").unwrap();
assert_eq!(got, "");
let (got, _) = statement("SELECT * FROM foo WHERE time > now() - 5m AND host = 'bar' GROUP BY TIME(5m) FILL(previous) ORDER BY time DESC").unwrap();
assert_eq!(got, "");
// show_statement combinator
statement("SHOW TAG KEYS").unwrap();
let (got, _) = statement("SHOW TAG KEYS").unwrap();
assert_eq!(got, "");
}
}

View File

@ -1,5 +1,3 @@
#![allow(dead_code)]
//! Parse delimited string inputs.
//!
@ -166,8 +164,7 @@ impl Display for Regex {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.write_char('/')?;
write_escaped!(f, self.0, '/' => "\\/");
f.write_char('/')?;
Ok(())
f.write_char('/')
}
}

View File

@ -4,7 +4,7 @@
#[macro_export]
macro_rules! assert_failure {
($RESULT:expr) => {
assert!(matches!($RESULT.unwrap_err(), nom::Err::Failure(_)));
assert_matches::assert_matches!($RESULT.unwrap_err(), nom::Err::Failure(_));
};
}

View File

@ -97,18 +97,23 @@ pub async fn run(config: Config) -> Result<()> {
.as_ref()
.expect("--data-dir is required and has already been checked")
.into();
let subdir = "compactor_data/line_protocol";
let lp_dir = root_dir.join(subdir);
if lp_dir
let compactor_data_dir = root_dir.join("compactor_data");
let parquet_dir = compactor_data_dir.join("parquet");
if compactor_data_dir
.try_exists()
.context(FileExistenceSnafu { path: &lp_dir })?
.context(FileExistenceSnafu {
path: &compactor_data_dir,
})?
{
fs::remove_dir_all(&lp_dir).context(RemoveSnafu { path: &lp_dir })?;
fs::remove_dir_all(&compactor_data_dir).context(RemoveSnafu {
path: &compactor_data_dir,
})?;
}
let spec_location = format!("{subdir}/spec.toml");
let spec_in_root = root_dir.join(&spec_location);
let spec_location = "compactor_data/spec.toml";
let spec_in_root = compactor_data_dir.join("spec.toml");
let Config {
compaction_type,
@ -132,13 +137,13 @@ pub async fn run(config: Config) -> Result<()> {
Arc::clone(&object_store),
config.num_columns.get(),
sampling_interval_ns,
&spec_location,
spec_location,
)
.await?;
let StartEndMinutesAgo { start, end } = start_end;
generate_data(&spec_in_root, &lp_dir, num_rows.get(), start, end)?;
generate_data(&spec_in_root, &parquet_dir, num_rows.get(), start, end)?;
}
Ok(())
@ -199,7 +204,7 @@ async fn write_data_generation_spec(
fn generate_data(
spec_in_root: impl AsRef<OsStr>,
lp_dir: impl AsRef<OsStr>,
parquet_dir: impl AsRef<OsStr>,
num_rows: usize,
start: usize,
end: usize,
@ -211,8 +216,8 @@ fn generate_data(
.arg("--")
.arg("--specification")
.arg(&spec_in_root)
.arg("-o")
.arg(&lp_dir)
.arg("--parquet")
.arg(&parquet_dir)
.arg("--start")
.arg(&format!("{start} minutes ago"))
.arg("--end")

View File

@ -1,6 +1,11 @@
use arrow::record_batch::RecordBatch;
use assert_cmd::Command;
use datafusion::datasource::object_store::ObjectStoreUrl;
use futures::TryStreamExt;
use object_store::{local::LocalFileSystem, path::Path as ObjectStorePath, ObjectStore};
use parquet_to_line_protocol::ParquetFileReader;
use predicates::prelude::*;
use std::fs;
use std::sync::Arc;
use test_helpers_end_to_end::maybe_skip_integration;
#[tokio::test]
@ -22,7 +27,7 @@ async fn compactor_generate_has_defaults() {
.arg(&dir)
.assert()
.success();
let data_generation_spec = dir.join("compactor_data/line_protocol/spec.toml");
let data_generation_spec = dir.join("compactor_data/spec.toml");
assert!(data_generation_spec.exists());
}
@ -74,7 +79,7 @@ async fn compactor_generate_creates_files_and_catalog_entries() {
.assert()
.success();
let data_generation_spec = dir.path().join("compactor_data/line_protocol/spec.toml");
let data_generation_spec = dir.path().join("compactor_data/spec.toml");
assert!(data_generation_spec.exists());
}
@ -96,9 +101,14 @@ async fn running_compactor_generate_twice_overwrites_existing_files() {
.assert()
.success();
let first_run_data =
fs::read_to_string(dir.path().join("compactor_data/line_protocol/data_0.txt")).unwrap();
let first_run_num_lines = first_run_data.lines().count();
let first_run_data_path = dir
.path()
.join("compactor_data/parquet/data_0_measure.parquet");
let first_run_record_batches = read_record_batches(&first_run_data_path).await;
assert_eq!(first_run_record_batches.len(), 1);
let first_run_record_batch = &first_run_record_batches[0];
let first_run_num_lines = first_run_record_batch.num_rows();
Command::cargo_bin("influxdb_iox")
.unwrap()
@ -113,13 +123,31 @@ async fn running_compactor_generate_twice_overwrites_existing_files() {
.assert()
.success();
let second_run_data =
fs::read_to_string(dir.path().join("compactor_data/line_protocol/data_0.txt")).unwrap();
let second_run_num_lines = second_run_data.lines().count();
let second_run_data_path = dir
.path()
.join("compactor_data/parquet/data_0_measure.parquet");
let second_run_record_batches = read_record_batches(&second_run_data_path).await;
assert_eq!(second_run_record_batches.len(), 1);
let second_run_record_batch = &second_run_record_batches[0];
let second_run_num_lines = second_run_record_batch.num_rows();
// If generation is appending instead of overwriting, this will fail.
assert_eq!(first_run_num_lines, second_run_num_lines);
// If generation isn't creating different data every time it's invoked, this will fail.
assert_ne!(first_run_data, second_run_data);
assert_ne!(first_run_record_batch, second_run_record_batch);
}
async fn read_record_batches(path: impl AsRef<std::path::Path>) -> Vec<RecordBatch> {
let object_store_path = ObjectStorePath::from_filesystem_path(path).unwrap();
let object_store = Arc::new(LocalFileSystem::new()) as Arc<dyn ObjectStore>;
let object_store_url = ObjectStoreUrl::local_filesystem();
let object_meta = object_store.head(&object_store_path).await.unwrap();
let reader = ParquetFileReader::try_new(object_store, object_store_url, object_meta)
.await
.unwrap();
reader.read().await.unwrap().try_collect().await.unwrap()
}

View File

@ -6,6 +6,7 @@ edition = "2021"
default-run = "iox_data_generator"
[dependencies]
bytes = "1.2"
chrono = { version = "0.4", default-features = false }
chrono-english = "0.1.4"
clap = { version = "3", features = ["derive", "env", "cargo"] }
@ -14,8 +15,12 @@ handlebars = "4.3.4"
humantime = "2.1.0"
influxdb2_client = { path = "../influxdb2_client" }
itertools = "0.10.5"
mutable_batch_lp = { path = "../mutable_batch_lp" }
mutable_batch = { path = "../mutable_batch" }
parquet_file = { path = "../parquet_file" }
rand = { version = "0.8.3", features = ["small_rng"] }
regex = "1.6"
schema = { path = "../schema" }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.83"
snafu = "0.7"

View File

@ -1,14 +1,17 @@
use criterion::{criterion_group, criterion_main, Criterion, Throughput};
use iox_data_generator::agent::Agent;
use iox_data_generator::specification::{AgentAssignmentSpec, DatabaseWriterSpec};
use iox_data_generator::{
specification::{AgentSpec, DataSpec, FieldSpec, FieldValueSpec, MeasurementSpec},
agent::Agent,
specification::{
AgentAssignmentSpec, AgentSpec, DataSpec, DatabaseWriterSpec, FieldSpec, FieldValueSpec,
MeasurementSpec,
},
tag_set::GeneratedTagSets,
write::PointsWriterBuilder,
};
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use std::time::Duration;
use std::{
sync::{atomic::AtomicU64, Arc},
time::Duration,
};
pub fn single_agent(c: &mut Criterion) {
let spec = DataSpec {
@ -74,19 +77,22 @@ pub fn single_agent(c: &mut Criterion) {
}
pub fn agent_pre_generated(c: &mut Criterion) {
let spec: DataSpec = toml::from_str(r#"
let spec: DataSpec = toml::from_str(
r#"
name = "storage_cardinality_example"
# Values are automatically generated before the agents are initialized. They generate tag key/value pairs
# with the name of the value as the tag key and the evaluated template as the value. These pairs
# are Arc wrapped so they can be shared across tagsets and used in the agents as pre-generated data.
# Values are automatically generated before the agents are initialized. They generate tag key/value
# pairs with the name of the value as the tag key and the evaluated template as the value. These
# pairs are Arc wrapped so they can be shared across tagsets and used in the agents as
# pre-generated data.
[[values]]
# the name must not have a . in it, which is used to access children later. Otherwise it's open.
name = "role"
# the template can use a number of helpers to get an id, a random string and the name, see below for examples
# the template can use a number of helpers to get an id, a random string and the name, see below
# for examples
template = "storage"
# this number of tag pairs will be generated. If this is > 1, the id or a random character string should be
# used in the template to ensure that the tag key/value pairs are unique.
# this number of tag pairs will be generated. If this is > 1, the id or a random character string
# should be used in the template to ensure that the tag key/value pairs are unique.
cardinality = 1
[[values]]
@ -108,10 +114,11 @@ cardinality = 10
[[values]]
name = "bucket_id"
# a bucket belongs to an org. With this, you would be able to access the org.id or org.value in the template
# a bucket belongs to an org. With this, you would be able to access the org.id or org.value in the
# template
belongs_to = "org_id"
# each bucket will have a unique id, which is used here to guarantee uniqueness even across orgs. We also
# have a random 15 character alphanumeric sequence to pad out the value length.
# each bucket will have a unique id, which is used here to guarantee uniqueness even across orgs.
# We also have a random 15 character alphanumeric sequence to pad out the value length.
template = "{{id}}_{{random 15}}"
# For each org, 3 buckets will be generated
cardinality = 3
@ -121,9 +128,10 @@ name = "partition_id"
template = "{{id}}"
cardinality = 10
# makes a tagset so every bucket appears in every partition. The other tags are descriptive and don't
# increase the cardinality beyond count(bucket) * count(partition). Later this example will use the
# agent and measurement generation to take this base tagset and increase cardinality on a per-agent basis.
# makes a tagset so every bucket appears in every partition. The other tags are descriptive and
# don't increase the cardinality beyond count(bucket) * count(partition). Later this example will
# use the agent and measurement generation to take this base tagset and increase cardinality on a
# per-agent basis.
[[tag_sets]]
name = "bucket_set"
for_each = [
@ -140,7 +148,8 @@ name = "foo"
[[agents.measurements]]
name = "storage_usage_bucket_cardinality"
# each sampling will have all the tag sets from this collection in addition to the tags and tag_pairs specified
# each sampling will have all the tag sets from this collection in addition to the tags and
# tag_pairs specified
tag_set = "bucket_set"
# for each agent, this specific measurement will be decorated with these additional tags.
tag_pairs = [
@ -155,7 +164,9 @@ i64_range = [1, 8147240]
[[database_writers]]
agents = [{name = "foo", sampling_interval = "1s", count = 3}]
"#).unwrap();
"#,
)
.unwrap();
let generated_tag_sets = GeneratedTagSets::from_spec(&spec).unwrap();

View File

@ -148,11 +148,12 @@ impl Agent {
let mut streams = Vec::with_capacity(batch_size);
for _ in 0..batch_size {
let mut s = self.generate().await?;
if s.is_empty() {
if self.finished {
break;
} else {
let mut s = self.generate().await?;
streams.append(&mut s);
}
streams.append(&mut s);
}
for s in &streams {
@ -160,6 +161,10 @@ impl Agent {
total_points += s.line_count();
}
if points_this_batch == 0 && self.finished {
break;
}
points_writer
.write_points(streams.into_iter().flatten())
.await
@ -187,7 +192,7 @@ impl Agent {
/// Generate data points from the configuration in this agent.
pub async fn generate(&mut self) -> Result<Vec<MeasurementLineIterator>> {
debug!(
"[agent {}] generate more? {} current: {}, end: {}",
"[agent {}] finished? {} current: {}, end: {}",
self.id, self.finished, self.current_datetime, self.end_datetime
);

View File

@ -13,8 +13,10 @@
use chrono::prelude::*;
use chrono_english::{parse_date_string, Dialect};
use iox_data_generator::{specification::DataSpec, write::PointsWriterBuilder};
use std::fs::File;
use std::io::{self, BufRead};
use std::{
fs::File,
io::{self, BufRead},
};
use tracing::info;
#[derive(clap::Parser)]
@ -57,14 +59,19 @@ struct Config {
#[clap(long, action)]
print: bool,
/// Runs the generation with agents writing to a sink. Useful for quick stress test to see how much resources the generator will take
/// Runs the generation with agents writing to a sink. Useful for quick stress test to see how
/// much resources the generator will take
#[clap(long, action)]
noop: bool,
/// The filename to write line protocol
/// The directory to write line protocol to
#[clap(long, short, action)]
output: Option<String>,
/// The directory to write Parquet files to
#[clap(long, short, action)]
parquet: Option<String>,
/// The host name part of the API endpoint to write to
#[clap(long, short, action)]
host: Option<String>,
@ -105,7 +112,8 @@ struct Config {
#[clap(long = "continue", action)]
do_continue: bool,
/// Generate this many samplings to batch into a single API call. Good for sending a bunch of historical data in quickly if paired with a start time from long ago.
/// Generate this many samplings to batch into a single API call. Good for sending a bunch of
/// historical data in quickly if paired with a start time from long ago.
#[clap(long, action, default_value = "1")]
batch_size: usize,
@ -142,10 +150,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let data_spec = DataSpec::from_file(&config.specification)?;
// TODO: parquet output
let mut points_writer_builder = if let Some(line_protocol_filename) = config.output {
PointsWriterBuilder::new_file(line_protocol_filename)?
} else if let Some(parquet_directory) = config.parquet {
PointsWriterBuilder::new_parquet(parquet_directory)?
} else if let Some(ref host) = config.host {
let token = config.token.expect("--token must be specified");

View File

@ -31,9 +31,9 @@
use crate::{agent::Agent, tag_set::GeneratedTagSets};
use snafu::{ResultExt, Snafu};
use std::sync::{atomic::AtomicU64, Arc};
use std::{
convert::TryFrom,
sync::{atomic::AtomicU64, Arc},
time::{SystemTime, UNIX_EPOCH},
};
use tracing::info;
@ -154,7 +154,8 @@ pub async fn generate(
.context(CouldNotCreateAgentSnafu)?;
info!(
"Configuring {} agents of \"{}\" to write data to org {} and bucket {} (database {})",
"Configuring {} agents of \"{}\" to write data \
to org {} and bucket {} (database {})",
agent_assignment.count,
agent_assignment.spec.name,
org,
@ -171,7 +172,8 @@ pub async fn generate(
let total_rows = Arc::clone(&total_rows);
handles.push(tokio::task::spawn(async move {
// did this weird hack because otherwise the stdout outputs would be jumbled together garbage
// did this weird hack because otherwise the stdout outputs would be jumbled
// together garbage
if one_agent_at_a_time {
let _l = lock_ref.lock().await;
agent

View File

@ -12,8 +12,13 @@ use tracing::warn;
#[allow(missing_docs)]
pub enum Error {
/// File-related error that may happen while reading a specification
#[snafu(display(r#"Error reading data spec from TOML file: {}"#, source))]
#[snafu(display(
r#"Error reading data spec from TOML file at {}: {}"#,
file_name,
source
))]
ReadFile {
file_name: String,
/// Underlying I/O error that caused this problem
source: std::io::Error,
},
@ -76,7 +81,7 @@ pub struct DataSpec {
impl DataSpec {
/// Given a filename, read the file and parse the specification.
pub fn from_file(file_name: &str) -> Result<Self> {
let spec_toml = fs::read_to_string(file_name).context(ReadFileSnafu)?;
let spec_toml = fs::read_to_string(file_name).context(ReadFileSnafu { file_name })?;
Self::from_str(&spec_toml)
}
@ -89,8 +94,8 @@ impl DataSpec {
let mut start = 0;
// either all database writers must use regex or none of them can. It's either ratio or regex
// for assignment
// either all database writers must use regex or none of them can. It's either ratio or
// regex for assignment
let use_ratio = self.database_writers[0].database_regex.is_none();
for b in &self.database_writers {
if use_ratio && b.database_regex.is_some() {
@ -200,7 +205,8 @@ impl FromStr for DataSpec {
pub struct ValuesSpec {
/// The name of the collection of values
pub name: String,
/// If values not specified this handlebars template will be used to create each value in the collection
/// If values not specified this handlebars template will be used to create each value in the
/// collection
pub template: String,
/// How many of these values should be generated. If belongs_to is
/// specified, each parent will have this many of this value. So
@ -297,8 +303,8 @@ pub struct AgentSpec {
pub has_one: Vec<String>,
/// Specification of tag key/value pairs that get generated once and reused for
/// every sampling. Every measurement (and thus line) will have these tag pairs added onto it.
/// The template can use `{{agent.id}}` to reference the agent's id and `{{guid}}` or `{{random N}}`
/// to generate random strings.
/// The template can use `{{agent.id}}` to reference the agent's id and `{{guid}}` or
/// `{{random N}}` to generate random strings.
#[serde(default)]
pub tag_pairs: Vec<TagPairSpec>,
}
@ -675,7 +681,10 @@ agents = [{name = "foo", sampling_interval = "10s"}]
let field_spec = &a0m0f0.field_value_spec;
assert!(
matches!(field_spec, FieldValueSpec::String { replacements, .. } if replacements.is_empty()),
matches!(
field_spec,
FieldValueSpec::String { replacements, .. } if replacements.is_empty()
),
"expected a String field with empty replacements; was {:?}",
field_spec
);

View File

@ -1,8 +1,12 @@
//! Writing generated points
use crate::measurement::LineToGenerate;
use bytes::Bytes;
use futures::stream;
use influxdb2_client::models::WriteDataPoint;
use mutable_batch_lp::lines_to_batches;
use parquet_file::{metadata::IoxMetadata, serialize};
use schema::selection::Selection;
use snafu::{ensure, ResultExt, Snafu};
#[cfg(test)]
use std::{
@ -10,9 +14,8 @@ use std::{
sync::{Arc, Mutex},
};
use std::{
fs,
fs::{File, OpenOptions},
io::BufWriter,
fs::{self, File, OpenOptions},
io::{BufWriter, Write},
path::{Path, PathBuf},
};
@ -20,7 +23,7 @@ use std::{
#[derive(Snafu, Debug)]
pub enum Error {
/// Error that may happen when writing line protocol to a file
#[snafu(display("Could open line protocol file {}: {}", filename.display(), source))]
#[snafu(display("Couldn't open line protocol file {}: {}", filename.display(), source))]
CantOpenLineProtocolFile {
/// The location of the file we tried to open
filename: PathBuf,
@ -28,6 +31,15 @@ pub enum Error {
source: std::io::Error,
},
/// Error that may happen when writing Parquet to a file
#[snafu(display("Couldn't open Parquet file {}: {}", filename.display(), source))]
CantOpenParquetFile {
/// The location of the file we tried to open
filename: PathBuf,
/// Underlying IO error that caused this problem
source: std::io::Error,
},
/// Error that may happen when writing line protocol to a no-op sink
#[snafu(display("Could not generate line protocol: {}", source))]
CantWriteToNoOp {
@ -42,6 +54,34 @@ pub enum Error {
source: std::io::Error,
},
/// Error that may happen when writing line protocol to a Vec of bytes
#[snafu(display("Could not write to vec: {}", source))]
WriteToVec {
/// Underlying IO error that caused this problem
source: std::io::Error,
},
/// Error that may happen when writing Parquet to a file
#[snafu(display("Could not write Parquet: {}", source))]
WriteToParquetFile {
/// Underlying IO error that caused this problem
source: std::io::Error,
},
/// Error that may happen when converting line protocol to a mutable batch
#[snafu(display("Could not convert to a mutable batch: {}", source))]
ConvertToMutableBatch {
/// Underlying mutable_batch_lp error that caused this problem
source: mutable_batch_lp::Error,
},
/// Error that may happen when converting a mutable batch to an Arrow RecordBatch
#[snafu(display("Could not convert to a record batch: {}", source))]
ConvertToArrow {
/// Underlying mutable_batch error that caused this problem
source: mutable_batch::Error,
},
/// Error that may happen when creating a directory to store files to write
/// to
#[snafu(display("Could not create directory: {}", source))]
@ -81,6 +121,13 @@ pub enum Error {
/// specifying the org ID
#[snafu(display("Could not create a bucket without an `org_id`"))]
OrgIdRequiredToCreateBucket,
/// Error that may happen when serializing to Parquet
#[snafu(display("Could not serialize to Parquet"))]
ParquetSerialization {
/// Underlying `parquet_file` error that caused this problem
source: parquet_file::serialize::CodecError,
},
}
type Result<T, E = Error> = std::result::Result<T, E>;
@ -96,6 +143,7 @@ pub struct PointsWriterBuilder {
enum PointsWriterConfig {
Api(influxdb2_client::Client),
Directory(PathBuf),
ParquetFile(PathBuf),
NoOp {
perform_write: bool,
},
@ -144,6 +192,17 @@ impl PointsWriterBuilder {
})
}
/// Write points to a Parquet file in the directory specified.
pub fn new_parquet<P: AsRef<Path>>(path: P) -> Result<Self> {
fs::create_dir_all(&path).context(CantCreateDirectorySnafu)?;
let metadata = fs::metadata(&path).context(CantGetMetadataSnafu)?;
ensure!(metadata.is_dir(), MustBeDirectorySnafu);
Ok(Self {
config: PointsWriterConfig::ParquetFile(PathBuf::from(path.as_ref())),
})
}
/// Write points to stdout
pub fn new_std_out() -> Self {
Self {
@ -187,6 +246,12 @@ impl PointsWriterBuilder {
InnerPointsWriter::File { file }
}
PointsWriterConfig::ParquetFile(dir_path) => InnerPointsWriter::ParquetFile {
dir_path: dir_path.clone(),
agent_name: name.into(),
},
PointsWriterConfig::NoOp { perform_write } => InnerPointsWriter::NoOp {
perform_write: *perform_write,
},
@ -230,6 +295,10 @@ enum InnerPointsWriter {
File {
file: BufWriter<File>,
},
ParquetFile {
dir_path: PathBuf,
agent_name: String,
},
NoOp {
perform_write: bool,
},
@ -261,6 +330,52 @@ impl InnerPointsWriter {
.context(CantWriteToLineProtocolFileSnafu)?;
}
}
Self::ParquetFile {
dir_path,
agent_name,
} => {
let mut raw_line_protocol = Vec::new();
for point in points {
point
.write_data_point_to(&mut raw_line_protocol)
.context(WriteToVecSnafu)?;
}
let line_protocol = String::from_utf8(raw_line_protocol)
.expect("Generator should be creating valid UTF-8");
let batches_by_measurement =
lines_to_batches(&line_protocol, 0).context(ConvertToMutableBatchSnafu)?;
for (measurement, batch) in batches_by_measurement {
let record_batch = batch
.to_arrow(Selection::All)
.context(ConvertToArrowSnafu)?;
let stream = futures::stream::iter([Ok(record_batch)]);
let meta = IoxMetadata::external(crate::now_ns(), &*measurement);
let (data, _parquet_file_meta) = serialize::to_parquet_bytes(stream, &meta)
.await
.context(ParquetSerializationSnafu)?;
let data = Bytes::from(data);
let mut filename = dir_path.clone();
filename.push(format!("{agent_name}_{measurement}"));
filename.set_extension("parquet");
let file = OpenOptions::new()
.create(true)
.write(true)
.open(&filename)
.context(CantOpenParquetFileSnafu { filename })?;
let mut file = BufWriter::new(file);
file.write_all(&data).context(WriteToParquetFileSnafu)?;
}
}
Self::NoOp { perform_write } => {
if *perform_write {
let mut sink = std::io::sink();

View File

@ -606,7 +606,7 @@ impl TestPartition {
table_catalog_schema
.columns
.get(f.name())
.expect("Column registered")
.unwrap_or_else(|| panic!("Column {} is not registered", f.name()))
.id
}));

View File

@ -124,7 +124,6 @@ where
/// [`GrpcDelegate`]: router::server::grpc::GrpcDelegate
async fn server_grpc(self: Arc<Self>, builder_input: RpcBuilderInput) -> Result<(), RpcError> {
let builder = setup_builder!(builder_input, self);
add_service!(builder, self.server.grpc().write_service());
add_service!(builder, self.server.grpc().schema_service());
add_service!(builder, self.server.grpc().catalog_service());
add_service!(builder, self.server.grpc().object_store_service());
@ -284,7 +283,7 @@ pub async fn create_router_server_type(
// Initialise the shard-mapping gRPC service.
let shard_service = init_shard_service(sharder, write_buffer_config, catalog).await?;
// Initialise the API delegates, sharing the handler stack between them.
// Initialise the API delegates
let handler_stack = Arc::new(handler_stack);
let http = HttpDelegate::new(
common_state.run_config().max_http_request_size,
@ -292,13 +291,7 @@ pub async fn create_router_server_type(
Arc::clone(&handler_stack),
&metrics,
);
let grpc = GrpcDelegate::new(
handler_stack,
schema_catalog,
object_store,
Arc::clone(&metrics),
shard_service,
);
let grpc = GrpcDelegate::new(schema_catalog, object_store, shard_service);
let router_server = RouterServer::new(http, grpc, metrics, common_state.trace_collector());
let server_type = Arc::new(RouterServerType::new(router_server, common_state));

View File

@ -394,6 +394,27 @@ impl IoxMetadata {
})
}
/// Generate metadata for a file generated from some process other than IOx ingesting.
///
/// This metadata will not have valid catalog values; inserting files with this metadata into
/// the catalog should get valid values out-of-band.
pub fn external(creation_timestamp_ns: i64, table_name: impl Into<Arc<str>>) -> Self {
Self {
object_store_id: Default::default(),
creation_timestamp: Time::from_timestamp_nanos(creation_timestamp_ns),
namespace_id: NamespaceId::new(1),
namespace_name: "external".into(),
shard_id: ShardId::new(1),
table_id: TableId::new(1),
table_name: table_name.into(),
partition_id: PartitionId::new(1),
partition_key: "unknown".into(),
max_sequence_number: SequenceNumber::new(1),
compaction_level: CompactionLevel::Initial,
sort_key: None,
}
}
/// verify uuid
pub fn match_object_store_id(&self, uuid: Uuid) -> bool {
uuid == self.object_store_id

View File

@ -1,12 +1,5 @@
//! Code that can convert between parquet files and line protocol
use std::{
io::Write,
path::{Path, PathBuf},
result::Result,
sync::Arc,
};
use datafusion::{
arrow::datatypes::SchemaRef as ArrowSchemaRef,
datasource::{
@ -28,11 +21,15 @@ use object_store::{
};
use parquet_file::metadata::{IoxMetadata, METADATA_KEY};
use schema::Schema;
use snafu::{OptionExt, ResultExt, Snafu};
use std::{
io::Write,
path::{Path, PathBuf},
result::Result,
sync::Arc,
};
mod batch;
use batch::convert_to_lines;
#[derive(Debug, Snafu)]
@ -155,7 +152,7 @@ where
/// Handles the details of interacting with parquet libraries /
/// readers. Tries not to have any IOx specific logic
struct ParquetFileReader {
pub struct ParquetFileReader {
object_store: Arc<dyn ObjectStore>,
object_store_url: ObjectStoreUrl,
/// Name / path information of the object to read
@ -171,7 +168,7 @@ struct ParquetFileReader {
impl ParquetFileReader {
/// Find and open the specified parquet file, and read its metadata / schema
async fn try_new(
pub async fn try_new(
object_store: Arc<dyn ObjectStore>,
object_store_url: ObjectStoreUrl,
object_meta: ObjectMeta,
@ -196,12 +193,12 @@ impl ParquetFileReader {
}
// retrieves the Arrow schema for this file
fn schema(&self) -> ArrowSchemaRef {
pub fn schema(&self) -> ArrowSchemaRef {
Arc::clone(&self.schema)
}
/// read the parquet file as a stream
async fn read(&self) -> Result<SendableRecordBatchStream, Error> {
pub async fn read(&self) -> Result<SendableRecordBatchStream, Error> {
let base_config = FileScanConfig {
object_store_url: self.object_store_url.clone(),
file_schema: self.schema(),

View File

@ -18,7 +18,7 @@ pub struct RouterServer<D, S> {
trace_collector: Option<Arc<dyn TraceCollector>>,
http: HttpDelegate<D>,
grpc: GrpcDelegate<D, S>,
grpc: GrpcDelegate<S>,
}
impl<D, S> RouterServer<D, S> {
@ -26,7 +26,7 @@ impl<D, S> RouterServer<D, S> {
/// handlers.
pub fn new(
http: HttpDelegate<D>,
grpc: GrpcDelegate<D, S>,
grpc: GrpcDelegate<S>,
metrics: Arc<metric::Registry>,
trace_collector: Option<Arc<dyn TraceCollector>>,
) -> Self {
@ -59,7 +59,7 @@ where
}
/// Get a reference to the router grpc delegate.
pub fn grpc(&self) -> &GrpcDelegate<D, S> {
pub fn grpc(&self) -> &GrpcDelegate<S> {
&self.grpc
}
}

View File

@ -3,83 +3,45 @@
pub mod sharder;
use self::sharder::ShardService;
use crate::{
dml_handlers::{DmlError, DmlHandler, PartitionError},
shard::Shard,
};
use crate::shard::Shard;
use ::sharder::Sharder;
use generated_types::{
google::FieldViolation,
influxdata::{
iox::{catalog::v1::*, object_store::v1::*, schema::v1::*, sharder::v1::*},
pbdata::v1::*,
},
use generated_types::influxdata::iox::{
catalog::v1::*, object_store::v1::*, schema::v1::*, sharder::v1::*,
};
use hashbrown::HashMap;
use iox_catalog::interface::Catalog;
use metric::U64Counter;
use mutable_batch::MutableBatch;
use object_store::DynObjectStore;
use observability_deps::tracing::*;
use schema::selection::Selection;
use service_grpc_catalog::CatalogService;
use service_grpc_object_store::ObjectStoreService;
use service_grpc_schema::SchemaService;
use std::sync::Arc;
use tonic::{metadata::AsciiMetadataValue, Request, Response, Status};
use trace::ctx::SpanContext;
use write_summary::WriteSummary;
// HERE BE DRAGONS: Uppercase characters in this constant cause a panic. Insert them and
// investigate the cause if you dare.
const WRITE_TOKEN_GRPC_HEADER: &str = "x-iox-write-token";
/// This type is responsible for managing all gRPC services exposed by `router`.
#[derive(Debug)]
pub struct GrpcDelegate<D, S> {
dml_handler: Arc<D>,
pub struct GrpcDelegate<S> {
catalog: Arc<dyn Catalog>,
object_store: Arc<DynObjectStore>,
metrics: Arc<metric::Registry>,
shard_service: ShardService<S>,
}
impl<D, S> GrpcDelegate<D, S> {
impl<S> GrpcDelegate<S> {
/// Initialise a new gRPC handler, dispatching DML operations to `dml_handler`.
pub fn new(
dml_handler: Arc<D>,
catalog: Arc<dyn Catalog>,
object_store: Arc<DynObjectStore>,
metrics: Arc<metric::Registry>,
shard_service: ShardService<S>,
) -> Self {
Self {
dml_handler,
catalog,
object_store,
metrics,
shard_service,
}
}
}
impl<D, S> GrpcDelegate<D, S>
impl<S> GrpcDelegate<S>
where
D: DmlHandler<WriteInput = HashMap<String, MutableBatch>, WriteOutput = WriteSummary> + 'static,
S: Sharder<(), Item = Arc<Shard>> + Clone + 'static,
{
/// Acquire a [`WriteService`] gRPC service implementation.
///
/// [`WriteService`]: generated_types::influxdata::pbdata::v1::write_service_server::WriteService.
pub fn write_service(
&self,
) -> write_service_server::WriteServiceServer<impl write_service_server::WriteService> {
write_service_server::WriteServiceServer::new(WriteService::new(
Arc::clone(&self.dml_handler),
&*self.metrics,
))
}
/// Acquire a [`SchemaService`] gRPC service implementation.
///
/// [`SchemaService`]: generated_types::influxdata::iox::schema::v1::schema_service_server::SchemaService.
@ -124,241 +86,3 @@ where
shard_service_server::ShardServiceServer::new(self.shard_service.clone())
}
}
#[derive(Debug)]
struct WriteService<D> {
dml_handler: Arc<D>,
write_metric_rows: U64Counter,
write_metric_columns: U64Counter,
write_metric_tables: U64Counter,
}
impl<D> WriteService<D> {
fn new(dml_handler: Arc<D>, metrics: &metric::Registry) -> Self {
let write_metric_rows = metrics
.register_metric::<U64Counter>(
"grpc_write_rows_total",
"cumulative number of rows successfully routed",
)
.recorder(&[]);
let write_metric_columns = metrics
.register_metric::<U64Counter>(
"grpc_write_fields_total",
"cumulative number of fields successfully routed",
)
.recorder(&[]);
let write_metric_tables = metrics
.register_metric::<U64Counter>(
"grpc_write_tables_total",
"cumulative number of tables in each write request",
)
.recorder(&[]);
Self {
dml_handler,
write_metric_rows,
write_metric_columns,
write_metric_tables,
}
}
}
#[tonic::async_trait]
impl<D> write_service_server::WriteService for WriteService<D>
where
D: DmlHandler<WriteInput = HashMap<String, MutableBatch>, WriteOutput = WriteSummary> + 'static,
{
/// Receive a gRPC [`WriteRequest`] and dispatch it to the DML handler.
async fn write(
&self,
request: Request<WriteRequest>,
) -> Result<Response<WriteResponse>, Status> {
let span_ctx: Option<SpanContext> = request.extensions().get().cloned();
let database_batch = request
.into_inner()
.database_batch
.ok_or_else(|| FieldViolation::required("database_batch"))?;
let tables =
mutable_batch_pb::decode::decode_database_batch(&database_batch).map_err(|e| {
FieldViolation {
field: "database_batch".into(),
description: format!("Invalid DatabaseBatch: {}", e),
}
})?;
let (row_count, column_count) =
tables.values().fold((0, 0), |(acc_rows, acc_cols), batch| {
let cols = batch
.schema(Selection::All)
.expect("failed to get schema")
.len();
let rows = batch.rows();
(acc_rows + rows, acc_cols + cols)
});
let namespace = database_batch
.database_name
.try_into()
.map_err(|e| FieldViolation {
field: "database_name".into(),
description: format!("Invalid namespace: {}", e),
})?;
let num_tables = tables.len();
debug!(
num_tables,
%namespace,
"routing grpc write",
);
let summary = self
.dml_handler
.write(&namespace, tables, span_ctx)
.await
.map_err(|e| match e.into() {
e @ DmlError::DatabaseNotFound(_) => Status::not_found(e.to_string()),
e @ DmlError::Schema(_) => Status::aborted(e.to_string()),
e @ (DmlError::Internal(_)
| DmlError::WriteBuffer(_)
| DmlError::NamespaceCreation(_)
| DmlError::Partition(PartitionError::BatchWrite(_))) => {
Status::internal(e.to_string())
}
})?;
self.write_metric_rows.inc(row_count as _);
self.write_metric_columns.inc(column_count as _);
self.write_metric_tables.inc(num_tables as _);
let mut response = Response::new(WriteResponse {});
let metadata = response.metadata_mut();
metadata.insert(
WRITE_TOKEN_GRPC_HEADER,
AsciiMetadataValue::try_from(&summary.to_token()).map_err(|e| {
Status::internal(format!(
"Could not convert WriteSummary token to AsciiMetadataValue: {e}"
))
})?,
);
Ok(response)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::dml_handlers::{mock::MockDmlHandler, DmlError};
use generated_types::influxdata::pbdata::v1::write_service_server::WriteService;
use std::sync::Arc;
fn summary() -> WriteSummary {
WriteSummary::default()
}
#[tokio::test]
async fn test_write_no_batch() {
let metrics = Arc::new(metric::Registry::default());
let handler = Arc::new(MockDmlHandler::default().with_write_return([Ok(summary())]));
let grpc = super::WriteService::new(Arc::clone(&handler), &metrics);
let req = WriteRequest::default();
let err = grpc
.write(Request::new(req))
.await
.expect_err("rpc request should fail");
assert_eq!(err.code(), tonic::Code::InvalidArgument);
assert!(err.message().contains("database_batch"));
}
#[tokio::test]
async fn test_write_no_namespace() {
let metrics = Arc::new(metric::Registry::default());
let handler = Arc::new(MockDmlHandler::default().with_write_return([Ok(summary())]));
let grpc = super::WriteService::new(Arc::clone(&handler), &metrics);
let req = WriteRequest {
database_batch: Some(DatabaseBatch {
database_name: "".to_owned(),
table_batches: vec![],
partition_key: Default::default(),
}),
};
let err = grpc
.write(Request::new(req))
.await
.expect_err("rpc request should fail");
assert_eq!(err.code(), tonic::Code::InvalidArgument);
assert!(err.message().contains("database_name"));
}
#[tokio::test]
async fn test_write_ok() {
let metrics = Arc::new(metric::Registry::default());
let handler = Arc::new(MockDmlHandler::default().with_write_return([Ok(summary())]));
let grpc = super::WriteService::new(Arc::clone(&handler), &metrics);
let req = WriteRequest {
database_batch: Some(DatabaseBatch {
database_name: "bananas".to_owned(),
table_batches: vec![],
partition_key: Default::default(),
}),
};
grpc.write(Request::new(req))
.await
.expect("rpc request should succeed");
}
#[tokio::test]
async fn test_write_ok_with_partition_key() {
let metrics = Arc::new(metric::Registry::default());
let handler = Arc::new(MockDmlHandler::default().with_write_return([Ok(summary())]));
let grpc = super::WriteService::new(Arc::clone(&handler), &metrics);
let req = WriteRequest {
database_batch: Some(DatabaseBatch {
database_name: "bananas".to_owned(),
table_batches: vec![],
partition_key: "platanos".to_owned(),
}),
};
grpc.write(Request::new(req))
.await
.expect("rpc request should succeed");
}
#[tokio::test]
async fn test_write_dml_handler_error() {
let metrics = Arc::new(metric::Registry::default());
let handler = Arc::new(
MockDmlHandler::default()
.with_write_return([Err(DmlError::DatabaseNotFound("nope".to_string()))]),
);
let grpc = super::WriteService::new(Arc::clone(&handler), &metrics);
let req = WriteRequest {
database_batch: Some(DatabaseBatch {
database_name: "bananas".to_owned(),
table_batches: vec![],
partition_key: Default::default(),
}),
};
let err = grpc
.write(Request::new(req))
.await
.expect_err("rpc request should fail");
assert_eq!(err.code(), tonic::Code::NotFound);
assert!(err.message().contains("nope"));
}
}

View File

@ -492,8 +492,11 @@ impl TestServer {
);
}
ServerType::Router => {
if check_write_service_health(server_type, connections.router_grpc_connection())
.await
if check_catalog_service_health(
server_type,
connections.router_grpc_connection(),
)
.await
{
return;
}
@ -521,8 +524,11 @@ impl TestServer {
ServerType::AllInOne => {
// ensure we can write and query
// TODO also check ingester
if check_write_service_health(server_type, connections.router_grpc_connection())
.await
if check_catalog_service_health(
server_type,
connections.router_grpc_connection(),
)
.await
&& check_arrow_service_health(
server_type,
connections.ingester_grpc_connection(),
@ -543,21 +549,25 @@ impl TestServer {
}
}
/// checks write service health, returning false if the service should be checked again
async fn check_write_service_health(server_type: ServerType, connection: Connection) -> bool {
/// checks catalog service health, as a proxy for all gRPC
/// services. Returns false if the service should be checked again
async fn check_catalog_service_health(server_type: ServerType, connection: Connection) -> bool {
let mut health = influxdb_iox_client::health::Client::new(connection);
match health.check("influxdata.pbdata.v1.WriteService").await {
match health
.check("influxdata.iox.catalog.v1.CatalogService")
.await
{
Ok(true) => {
info!("Write service {:?} is running", server_type);
info!("CatalogService service {:?} is running", server_type);
true
}
Ok(false) => {
info!("Write service {:?} is not running", server_type);
info!("CatalogService {:?} is not running", server_type);
true
}
Err(e) => {
info!("Write service {:?} not yet healthy: {:?}", server_type, e);
info!("CatalogService {:?} not yet healthy: {:?}", server_type, e);
false
}
}