feat: Improve line protocol parser error recovery, avoid infinite loop (#152)
* feat: Improve line protocol parser error recovery, avoid infinite loop feat: port splitLines logic to rust line protocol parser fix: consume trailing optional whitespace after timestamp test: Add tests for same * fix: Apply suggestions from code review Co-authored-by: Carol (Nichols || Goulding) <193874+carols10cents@users.noreply.github.com> Co-authored-by: Carol (Nichols || Goulding) <193874+carols10cents@users.noreply.github.com>pull/24376/head
parent
cf248f2143
commit
94f1968deb
|
@ -1,3 +1,12 @@
|
|||
//! This module contains a pure rust implementation of a parser for InfluxDB Line Protocol
|
||||
//! https://v2.docs.influxdata.com/v2.0/reference/syntax/line-protocol/
|
||||
//!
|
||||
//! This implementation is intended to be compatible with the Go implementation,
|
||||
//! https://github.com/influxdata/influxdb/blob/217eddc87e14a79b01d0c22994fc139f530094a2/models/points_parser.go
|
||||
//!
|
||||
//! However, this implementation uses a nom combinator based parser
|
||||
//! rather than attempting to port the imperative Go logic.
|
||||
|
||||
#![deny(rust_2018_idioms)]
|
||||
#![warn(
|
||||
missing_copy_implementations,
|
||||
|
@ -52,6 +61,12 @@ pub enum Error {
|
|||
))]
|
||||
EndsWithBackslash,
|
||||
|
||||
#[snafu(display(
|
||||
"Could not parse entire line. Found trailing content: '{}'",
|
||||
trailing_content
|
||||
))]
|
||||
CannotParseEntireLine { trailing_content: String },
|
||||
|
||||
// TODO: Replace this with specific failures.
|
||||
#[snafu(display(r#"A generic parsing error occurred: {:?}"#, kind))]
|
||||
GenericParsingError {
|
||||
|
@ -289,9 +304,9 @@ impl PartialEq<EscapedStr<'_>> for &str {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn parse_lines(mut i: &str) -> impl Iterator<Item = Result<ParsedLine<'_>>> {
|
||||
std::iter::from_fn(move || {
|
||||
i = trim_leading(i);
|
||||
pub fn parse_lines(input: &str) -> impl Iterator<Item = Result<ParsedLine<'_>>> {
|
||||
split_lines(input).filter_map(|line| {
|
||||
let i = trim_leading(line);
|
||||
|
||||
if i.is_empty() {
|
||||
return None;
|
||||
|
@ -299,8 +314,17 @@ pub fn parse_lines(mut i: &str) -> impl Iterator<Item = Result<ParsedLine<'_>>>
|
|||
|
||||
match parse_line(i) {
|
||||
Ok((remaining, line)) => {
|
||||
i = remaining;
|
||||
Some(Ok(line))
|
||||
// should have parsed the whole input line, if any
|
||||
// data remains it is a parse error for this line
|
||||
// corresponding Go logic:
|
||||
// https://github.com/influxdata/influxdb/blob/217eddc87e14a79b01d0c22994fc139f530094a2/models/points_parser.go#L259-L266
|
||||
if !remaining.is_empty() {
|
||||
Some(Err(Error::CannotParseEntireLine {
|
||||
trailing_content: String::from(remaining),
|
||||
}))
|
||||
} else {
|
||||
Some(Ok(line))
|
||||
}
|
||||
}
|
||||
Err(nom::Err::Error(e)) | Err(nom::Err::Failure(e)) => Some(Err(e)),
|
||||
Err(nom::Err::Incomplete(_)) => unreachable!("Cannot have incomplete data"), // Only streaming parsers have this
|
||||
|
@ -308,9 +332,76 @@ pub fn parse_lines(mut i: &str) -> impl Iterator<Item = Result<ParsedLine<'_>>>
|
|||
})
|
||||
}
|
||||
|
||||
/// Split `input` into invidividual lines to be parsed, based on the
|
||||
/// rules of the Line Protocol format.
|
||||
///
|
||||
/// This code is more or less a direct port of the [Go implementation of
|
||||
/// `scanLine`](https://github.com/influxdata/influxdb/blob/217eddc87e14a79b01d0c22994fc139f530094a2/models/points.go#L1078)
|
||||
///
|
||||
/// While this choice of implementation definitely means there is
|
||||
/// logic duplication for scanning fields, duplicating it also means
|
||||
/// we can be more sure of the compatibility of the rust parser and
|
||||
/// the canonical Go parser.
|
||||
fn split_lines(input: &str) -> impl Iterator<Item = &str> {
|
||||
// NB: This is ported as closely as possibly from the original Go code:
|
||||
let mut quoted = false;
|
||||
let mut fields = false;
|
||||
|
||||
// tracks how many '=' and commas we've seen
|
||||
// this duplicates some of the functionality in scanFields
|
||||
let mut equals = 0;
|
||||
let mut commas = 0;
|
||||
|
||||
let mut in_escape = false;
|
||||
input.split(move |c| {
|
||||
// skip past escaped characters
|
||||
if in_escape {
|
||||
in_escape = false;
|
||||
return false;
|
||||
}
|
||||
|
||||
if c == '\\' {
|
||||
in_escape = true;
|
||||
return false;
|
||||
}
|
||||
|
||||
if c == ' ' {
|
||||
fields = true;
|
||||
return false;
|
||||
}
|
||||
|
||||
// If we see a double quote, makes sure it is not escaped
|
||||
if fields {
|
||||
if !quoted && c == '=' {
|
||||
equals += 1;
|
||||
return false;
|
||||
} else if !quoted && c == ',' {
|
||||
commas += 1;
|
||||
return false;
|
||||
} else if c == '"' && equals > commas {
|
||||
quoted = !quoted;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
if c == '\n' && !quoted {
|
||||
// reset all the state -- we found a line
|
||||
quoted = false;
|
||||
fields = false;
|
||||
equals = 0;
|
||||
commas = 0;
|
||||
assert!(!in_escape);
|
||||
in_escape = false;
|
||||
return true;
|
||||
}
|
||||
|
||||
false
|
||||
})
|
||||
}
|
||||
|
||||
fn parse_line(i: &str) -> IResult<&str, ParsedLine<'_>> {
|
||||
let field_set = preceded(whitespace, field_set);
|
||||
let timestamp = preceded(whitespace, timestamp);
|
||||
let timestamp = preceded(whitespace, terminated(timestamp, opt(whitespace)));
|
||||
|
||||
let line = tuple((series, field_set, opt(timestamp)));
|
||||
|
||||
|
@ -761,6 +852,46 @@ mod test {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_split_lines() -> Result {
|
||||
assert_eq!(split_lines("").collect::<Vec<_>>(), vec![""]);
|
||||
assert_eq!(split_lines("foo").collect::<Vec<_>>(), vec!["foo"]);
|
||||
assert_eq!(
|
||||
split_lines("foo\nbar").collect::<Vec<_>>(),
|
||||
vec!["foo", "bar"]
|
||||
);
|
||||
assert_eq!(
|
||||
split_lines("foo\nbar\nbaz").collect::<Vec<_>>(),
|
||||
vec!["foo", "bar", "baz"]
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
split_lines("foo\\nbar\nbaz").collect::<Vec<_>>(),
|
||||
vec!["foo\\nbar", "baz"]
|
||||
);
|
||||
assert_eq!(
|
||||
split_lines("meas tag=val field=1\nnext\n").collect::<Vec<_>>(),
|
||||
vec!["meas tag=val field=1", "next", ""]
|
||||
);
|
||||
assert_eq!(
|
||||
split_lines("meas tag=val field=\"\nval\"\nnext").collect::<Vec<_>>(),
|
||||
vec!["meas tag=val field=\"\nval\"", "next"]
|
||||
);
|
||||
assert_eq!(
|
||||
split_lines("meas tag=val field=\\\"\nval\"\nnext").collect::<Vec<_>>(),
|
||||
vec!["meas tag=val field=\\\"", "val\"", "next"]
|
||||
);
|
||||
assert_eq!(
|
||||
split_lines("meas tag=val field=1,field=\"\nval\"\nnext").collect::<Vec<_>>(),
|
||||
vec!["meas tag=val field=1,field=\"\nval\"", "next"]
|
||||
);
|
||||
assert_eq!(
|
||||
split_lines("meas tag=val field=1,field=\\\"\nval\"\nnext").collect::<Vec<_>>(),
|
||||
vec!["meas tag=val field=1,field=\\\"", "val\"", "next"]
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn escaped_str_multi_to_string() -> Result {
|
||||
let (_, es) = measurement("Foo\\aBar")?;
|
||||
|
@ -1378,6 +1509,45 @@ her"#,
|
|||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_advance_after_error() -> Result {
|
||||
// Note that the first line has an error (23.1.22 is not a number)
|
||||
// but there is valid data afterwrds,
|
||||
let input = "foo,tag0=value1 asdf=23.1.22,jkl=4\n\
|
||||
foo,tag0=value2 asdf=22.1,jkl=5";
|
||||
|
||||
let vals: Vec<_> = super::parse_lines(input).collect();
|
||||
|
||||
assert_eq!(vals.len(), 2);
|
||||
assert!(vals[0].is_err());
|
||||
assert_eq!(
|
||||
format!("{:?}", &vals[0]),
|
||||
"Err(CannotParseEntireLine { trailing_content: \".22,jkl=4\" })"
|
||||
);
|
||||
|
||||
assert!(vals[1].is_ok());
|
||||
let parsed_line = vals[1].as_ref().expect("second line succeeded");
|
||||
assert_eq!(parsed_line.series.measurement, "foo");
|
||||
assert_eq!(parsed_line.series.tag_set.as_ref().unwrap()[0].0, "tag0");
|
||||
assert_eq!(parsed_line.series.tag_set.as_ref().unwrap()[0].1, "value2");
|
||||
|
||||
assert_eq!(parsed_line.timestamp, None);
|
||||
|
||||
assert_eq!(parsed_line.field_set[0].0, "asdf");
|
||||
assert!(approximately_equal(
|
||||
parsed_line.field_set[0].1.unwrap_f64(),
|
||||
22.1
|
||||
));
|
||||
|
||||
assert_eq!(parsed_line.field_set[1].0, "jkl");
|
||||
assert!(approximately_equal(
|
||||
parsed_line.field_set[1].1.unwrap_f64(),
|
||||
5.
|
||||
));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[allow(clippy::op_ref)]
|
||||
// Clippy disabled because it wascomplaining about uselessly
|
||||
|
|
Loading…
Reference in New Issue