Merge pull request #55 from influxdata/duplicate-tags

feat: Error when parsing lines with duplicate tags
pull/24376/head
Paul Dix 2020-04-02 10:28:22 -04:00 committed by GitHub
commit 8ac21314d6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 131 additions and 77 deletions

23
Cargo.lock generated
View File

@ -408,6 +408,7 @@ dependencies = [
"csv 1.1.3 (registry+https://github.com/rust-lang/crates.io-index)",
"dirs 2.0.2 (registry+https://github.com/rust-lang/crates.io-index)",
"dotenv 0.15.0 (registry+https://github.com/rust-lang/crates.io-index)",
"either 1.5.3 (registry+https://github.com/rust-lang/crates.io-index)",
"env_logger 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)",
"http 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
@ -424,6 +425,7 @@ dependencies = [
"serde 1.0.104 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_json 1.0.48 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_urlencoded 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)",
"snafu 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)",
"tonic 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"tonic-build 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
@ -1555,6 +1557,25 @@ name = "smallvec"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "snafu"
version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"doc-comment 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
"snafu-derive 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "snafu-derive"
version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"proc-macro2 1.0.9 (registry+https://github.com/rust-lang/crates.io-index)",
"quote 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)",
"syn 1.0.16 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "socket2"
version = "0.3.11"
@ -2386,6 +2407,8 @@ dependencies = [
"checksum signal-hook-registry 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "94f478ede9f64724c5d173d7bb56099ec3e2d9fc2774aac65d34b8b890405f41"
"checksum slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8"
"checksum smallvec 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "5c2fb2ec9bcd216a5b0d0ccf31ab17b5ed1d627960edff65bbe95d3ce221cefc"
"checksum snafu 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)" = "546db9181bce2aa22ed883c33d65603b76335b4c2533a98289f54265043de7a1"
"checksum snafu-derive 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)" = "bdc75da2e0323f297402fd9c8fdba709bb04e4c627cbe31d19a2c91fc8d9f0e2"
"checksum socket2 0.3.11 (registry+https://github.com/rust-lang/crates.io-index)" = "e8b74de517221a2cb01a53349cf54182acdc31a074727d3079068448c0676d85"
"checksum sourcefile 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "4bf77cb82ba8453b42b6ae1d692e4cdc92f9a47beaf89a847c8be83f4e328ad3"
"checksum spin 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)" = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"

View File

@ -38,6 +38,8 @@ croaring = "0.4.2"
http = "0.2.0"
serde_urlencoded = "0.6.1"
nom = "5.1.1"
snafu = "0.6.2"
either = "1.5.3"
[dev-dependencies]
criterion = "0.3"

View File

@ -1,3 +1,4 @@
use either::Either;
use nom::{
branch::alt,
bytes::complete::{tag, take_while1},
@ -7,7 +8,16 @@ use nom::{
sequence::{separated_pair, terminated, tuple},
IResult,
};
use std::{error, fmt};
use snafu::Snafu;
use std::collections::BTreeMap;
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display(r#"Must not contain duplicate tags, but "{}" was repeated"#, tag_key))]
DuplicateTag { tag_key: String },
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug, PartialEq, Clone)]
pub struct Point<T> {
@ -18,7 +28,7 @@ pub struct Point<T> {
}
impl<T> Point<T> {
pub fn index_pairs(&self) -> Result<Vec<Pair>, ParseError> {
pub fn index_pairs(&self) -> Result<Vec<Pair>> {
index_pairs(&self.series)
}
}
@ -97,7 +107,7 @@ impl PointType {
}
}
pub fn index_pairs(&self) -> Result<Vec<Pair>, ParseError> {
pub fn index_pairs(&self) -> Result<Vec<Pair>> {
match self {
PointType::I64(p) => p.index_pairs(),
PointType::F64(p) => p.index_pairs(),
@ -109,7 +119,7 @@ impl PointType {
/// index_pairs parses the series key into key value pairs for insertion into the index. In
/// cases where this series is already in the database, this parse step can be skipped entirely.
/// The measurement is represented as a _m key and field as _f.
pub fn index_pairs(key: &str) -> Result<Vec<Pair>, ParseError> {
pub fn index_pairs(key: &str) -> Result<Vec<Pair>> {
let chars = key.chars();
let mut pairs = vec![];
let mut key = "_m".to_string();
@ -153,24 +163,6 @@ pub struct Pair {
pub value: String,
}
#[derive(Debug, Clone)]
pub struct ParseError {
description: String,
}
impl fmt::Display for ParseError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.description)
}
}
impl error::Error for ParseError {
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
// Generic error, underlying cause isn't tracked.
None
}
}
#[derive(Debug)]
struct ParsedLine<'a> {
measurement: &'a str,
@ -186,49 +178,54 @@ enum FieldValue {
}
// TODO: Return an error for invalid inputs
pub fn parse(input: &str) -> Vec<PointType> {
pub fn parse(input: &str) -> Result<Vec<PointType>> {
input
.lines()
.flat_map(|line| match parse_line(line) {
Ok((_remaining, parsed_line)) => {
let ParsedLine {
measurement,
tag_set,
field_set,
timestamp,
} = parsed_line;
let mut tag_set = tag_set.unwrap_or_default();
// TODO: handle duplicates?
tag_set.sort_by(|a, b| a.0.cmp(&b.0));
let tag_set = tag_set;
let timestamp = timestamp.expect("TODO: default timestamp not supported");
let mut series_base = String::from(measurement);
for (tag_key, tag_value) in tag_set {
use std::fmt::Write;
write!(&mut series_base, ",{}={}", tag_key, tag_value)
.expect("Could not append string");
}
let series_base = series_base;
field_set.into_iter().map(move |(field_key, field_value)| {
let series = format!("{}\t{}", series_base, field_key);
match field_value {
FieldValue::I64(value) => PointType::new_i64(series, value, timestamp),
FieldValue::F64(value) => PointType::new_f64(series, value, timestamp),
}
})
}
Err(e) => {
panic!("TODO: Failed to parse: {}", e);
}
Ok((_remaining, parsed_line)) => match line_to_points(parsed_line) {
Ok(i) => Either::Left(i.map(Ok)),
Err(e) => Either::Right(std::iter::once(Err(e))),
},
Err(e) => panic!("TODO: Failed to parse: {}", e),
})
.collect()
}
fn line_to_points(parsed_line: ParsedLine<'_>) -> Result<impl Iterator<Item = PointType> + '_> {
let ParsedLine {
measurement,
tag_set,
field_set,
timestamp,
} = parsed_line;
let mut unique_sorted_tag_set = BTreeMap::new();
for (tag_key, tag_value) in tag_set.unwrap_or_default() {
if unique_sorted_tag_set.insert(tag_key, tag_value).is_some() {
return DuplicateTag { tag_key }.fail();
}
}
let tag_set = unique_sorted_tag_set;
let timestamp = timestamp.expect("TODO: default timestamp not supported");
let mut series_base = String::from(measurement);
for (tag_key, tag_value) in tag_set {
use std::fmt::Write;
write!(&mut series_base, ",{}={}", tag_key, tag_value).expect("Could not append string");
}
let series_base = series_base;
Ok(field_set.into_iter().map(move |(field_key, field_value)| {
let series = format!("{}\t{}", series_base, field_key);
match field_value {
FieldValue::I64(value) => PointType::new_i64(series, value, timestamp),
FieldValue::F64(value) => PointType::new_f64(series, value, timestamp),
}
}))
}
fn parse_line(i: &str) -> IResult<&str, ParsedLine<'_>> {
let tag_set = map(tuple((tag(","), tag_set)), |(_, ts)| ts);
let field_set = map(tuple((tag(" "), field_set)), |(_, fs)| fs);
@ -291,40 +288,49 @@ mod test {
use super::*;
use crate::tests::approximately_equal;
type Error = Box<dyn std::error::Error>;
type Result<T = (), E = Error> = std::result::Result<T, E>;
#[test]
fn parse_single_field_integer() {
fn parse_single_field_integer() -> Result {
let input = "foo asdf=23i 1234";
let vals = parse(input);
let vals = parse(input)?;
assert_eq!(vals[0].series(), "foo\tasdf");
assert_eq!(vals[0].time(), 1234);
assert_eq!(vals[0].i64_value().unwrap(), 23);
Ok(())
}
#[test]
fn parse_single_field_float_no_decimal() {
fn parse_single_field_float_no_decimal() -> Result {
let input = "foo asdf=44 546";
let vals = parse(input);
let vals = parse(input)?;
assert_eq!(vals[0].series(), "foo\tasdf");
assert_eq!(vals[0].time(), 546);
assert!(approximately_equal(vals[0].f64_value().unwrap(), 44.0));
Ok(())
}
#[test]
fn parse_single_field_float_with_decimal() {
fn parse_single_field_float_with_decimal() -> Result {
let input = "foo asdf=3.74 123";
let vals = parse(input);
let vals = parse(input)?;
assert_eq!(vals[0].series(), "foo\tasdf");
assert_eq!(vals[0].time(), 123);
assert!(approximately_equal(vals[0].f64_value().unwrap(), 3.74));
Ok(())
}
#[test]
fn parse_two_fields_integer() {
fn parse_two_fields_integer() -> Result {
let input = "foo asdf=23i,bar=5i 1234";
let vals = parse(input);
let vals = parse(input)?;
assert_eq!(vals[0].series(), "foo\tasdf");
assert_eq!(vals[0].time(), 1234);
@ -333,12 +339,14 @@ mod test {
assert_eq!(vals[1].series(), "foo\tbar");
assert_eq!(vals[1].time(), 1234);
assert_eq!(vals[1].i64_value().unwrap(), 5);
Ok(())
}
#[test]
fn parse_two_fields_float() {
fn parse_two_fields_float() -> Result {
let input = "foo asdf=23.1,bar=5 1234";
let vals = parse(input);
let vals = parse(input)?;
assert_eq!(vals[0].series(), "foo\tasdf");
assert_eq!(vals[0].time(), 1234);
@ -347,12 +355,14 @@ mod test {
assert_eq!(vals[1].series(), "foo\tbar");
assert_eq!(vals[1].time(), 1234);
assert!(approximately_equal(vals[1].f64_value().unwrap(), 5.0));
Ok(())
}
#[test]
fn parse_mixed_float_and_integer() {
fn parse_mixed_float_and_integer() -> Result {
let input = "foo asdf=23.1,bar=5i 1234";
let vals = parse(input);
let vals = parse(input)?;
assert_eq!(vals[0].series(), "foo\tasdf");
assert_eq!(vals[0].time(), 1234);
@ -361,22 +371,41 @@ mod test {
assert_eq!(vals[1].series(), "foo\tbar");
assert_eq!(vals[1].time(), 1234);
assert_eq!(vals[1].i64_value().unwrap(), 5);
Ok(())
}
#[test]
fn parse_tag_set_included_in_series() {
fn parse_tag_set_included_in_series() -> Result {
let input = "foo,tag1=1,tag2=2 value=1 123";
let vals = parse(input);
let vals = parse(input)?;
assert_eq!(vals[0].series(), "foo,tag1=1,tag2=2\tvalue");
Ok(())
}
#[test]
fn parse_tag_set_unsorted() {
fn parse_tag_set_unsorted() -> Result {
let input = "foo,tag2=2,tag1=1 value=1 123";
let vals = parse(input);
let vals = parse(input)?;
assert_eq!(vals[0].series(), "foo,tag1=1,tag2=2\tvalue");
Ok(())
}
#[test]
fn parse_tag_set_duplicate_tags() -> Result {
let input = "foo,tag=1,tag=2 value=1 123";
let err = parse(input).expect_err("Parsing duplicate tags should fail");
assert_eq!(
err.to_string(),
r#"Must not contain duplicate tags, but "tag" was repeated"#
);
Ok(())
}
#[test]

View File

@ -93,7 +93,7 @@ async fn write(req: hyper::Request<Body>, app: Arc<App>) -> Result<Body, Applica
let body = body.freeze();
let body = str::from_utf8(&body).unwrap();
let mut points = line_parser::parse(body);
let mut points = line_parser::parse(body).expect("TODO: Unable to parse lines");
app.db
.write_points(write_info.org_id, &bucket, &mut points)

View File

@ -1,5 +1,5 @@
use crate::delorean::{Node, Predicate, TimestampRange};
use crate::line_parser::{ParseError, Point, PointType};
use crate::line_parser::{self, Point, PointType};
use crate::storage::inverted_index::{InvertedIndex, SeriesFilter};
use crate::storage::predicate::{Evaluate, EvaluateVisitor};
use crate::storage::series_store::{ReadPoint, SeriesStore};
@ -173,7 +173,7 @@ impl SeriesMap {
}
}
fn insert_series(&mut self, point: &mut PointType) -> Result<(), ParseError> {
fn insert_series(&mut self, point: &mut PointType) -> line_parser::Result<()> {
if let Some(id) = self.series_key_to_id.get(point.series()) {
point.set_series_id(*id);
return Ok(());