fix: handle LP lines with duplicated fields/tags
This commit allows the LP consumer to correctly handle line protocol
writes that duplicate one or more fields or tags within a single line:
table v=2,bananas=42,v=3,platanos=24
▲ ▲
└───────┬──────┘
│
duplicate field "v"
This change implements the following logic when processing fields:
IF field is duplicated
IF all duplicate field values are of the same data type
use last occurrence ("last write wins")
ELSE
return an error
Any duplication of tags is rejected, and use of a field name as both a
field and a tag is remains forbidden (unchanged in this PR, a previously
agreed breaking change from TSM).
See https://github.com/influxdata/influxdb_iox/issues/4326 for context.
pull/24376/head
parent
c0ed688043
commit
cf200e509e
|
|
@ -3525,6 +3525,7 @@ name = "mutable_batch_lp"
|
|||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"arrow_util",
|
||||
"assert_matches",
|
||||
"hashbrown 0.12.0",
|
||||
"influxdb_line_protocol",
|
||||
"mutable_batch",
|
||||
|
|
|
|||
|
|
@ -342,6 +342,19 @@ pub enum FieldValue<'a> {
|
|||
Boolean(bool),
|
||||
}
|
||||
|
||||
impl<'a> FieldValue<'a> {
|
||||
/// Returns true if `self` and `other` are of the same data type.
|
||||
pub fn is_same_type(&self, other: &Self) -> bool {
|
||||
match self {
|
||||
FieldValue::I64(_) => matches!(other, FieldValue::I64(_)),
|
||||
FieldValue::U64(_) => matches!(other, FieldValue::U64(_)),
|
||||
FieldValue::F64(_) => matches!(other, FieldValue::F64(_)),
|
||||
FieldValue::String(_) => matches!(other, FieldValue::String(_)),
|
||||
FieldValue::Boolean(_) => matches!(other, FieldValue::Boolean(_)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Converts FieldValue back to LineProtocol
|
||||
/// See <https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/>
|
||||
/// for more detail.
|
||||
|
|
@ -369,7 +382,7 @@ impl<'a> Display for FieldValue<'a> {
|
|||
/// For example the 8 character string `Foo\\Bar` (note the double
|
||||
/// `\\`) is parsed into the logical 7 character string `Foo\Bar`
|
||||
/// (note the single `\`)
|
||||
#[derive(Debug, Clone, Eq)]
|
||||
#[derive(Debug, Clone, Eq, Hash)]
|
||||
pub enum EscapedStr<'a> {
|
||||
SingleSlice(&'a str),
|
||||
CopiedValue(String),
|
||||
|
|
@ -2217,4 +2230,28 @@ her"#,
|
|||
|
||||
assert_eq!(vals[0].tag_value("asdf"), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_field_value_same_type() {
|
||||
// True cases
|
||||
assert!(FieldValue::I64(0).is_same_type(&FieldValue::I64(42)));
|
||||
assert!(FieldValue::U64(0).is_same_type(&FieldValue::U64(42)));
|
||||
assert!(FieldValue::F64(0.0).is_same_type(&FieldValue::F64(4.2)));
|
||||
assert!(
|
||||
FieldValue::String(EscapedStr::CopiedValue("bananas".to_string())).is_same_type(
|
||||
&FieldValue::String(EscapedStr::CopiedValue("platanos".to_string()))
|
||||
)
|
||||
);
|
||||
assert!(FieldValue::Boolean(true).is_same_type(&FieldValue::Boolean(false)));
|
||||
|
||||
// Some false cases
|
||||
assert!(!FieldValue::I64(0).is_same_type(&FieldValue::U64(42)));
|
||||
assert!(!FieldValue::U64(0).is_same_type(&FieldValue::I64(42)));
|
||||
assert!(!FieldValue::F64(0.0).is_same_type(&FieldValue::U64(42)));
|
||||
assert!(
|
||||
!FieldValue::String(EscapedStr::CopiedValue("bananas".to_string()))
|
||||
.is_same_type(&FieldValue::U64(42))
|
||||
);
|
||||
assert!(!FieldValue::Boolean(true).is_same_type(&FieldValue::U64(42)));
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -14,3 +14,4 @@ workspace-hack = { path = "../workspace-hack"}
|
|||
|
||||
[dev-dependencies]
|
||||
arrow_util = { path = "../arrow_util" }
|
||||
assert_matches = "1.5.0"
|
||||
|
|
|
|||
|
|
@ -11,7 +11,7 @@
|
|||
clippy::clone_on_ref_ptr
|
||||
)]
|
||||
|
||||
use hashbrown::HashMap;
|
||||
use hashbrown::{hash_map::Entry, HashMap, HashSet};
|
||||
use influxdb_line_protocol::{parse_lines, FieldValue, ParsedLine};
|
||||
use mutable_batch::writer::Writer;
|
||||
use mutable_batch::MutableBatch;
|
||||
|
|
@ -28,10 +28,7 @@ pub enum Error {
|
|||
},
|
||||
|
||||
#[snafu(display("error writing line {}: {}", line, source))]
|
||||
Write {
|
||||
source: mutable_batch::writer::Error,
|
||||
line: usize,
|
||||
},
|
||||
Write { source: LineWriteError, line: usize },
|
||||
|
||||
#[snafu(display("empty write payload"))]
|
||||
EmptyPayload,
|
||||
|
|
@ -81,7 +78,25 @@ impl LinesConverter {
|
|||
self.timestamp_base = timestamp_base
|
||||
}
|
||||
|
||||
/// Write some line protocol data
|
||||
/// Write some line protocol data.
|
||||
///
|
||||
/// If a field / tag name appears more than once in a single line, the
|
||||
/// following semantics apply:
|
||||
///
|
||||
/// * duplicate fields, same value: do nothing/coalesce
|
||||
/// * duplicate fields, different value: last occurrence wins
|
||||
/// * duplicate fields, different types: return
|
||||
/// [`LineWriteError::ConflictedFieldTypes`]
|
||||
/// * duplicate tags, same value: return [`LineWriteError::DuplicateTag`]
|
||||
/// * duplicate tags, different value: return
|
||||
/// [`LineWriteError::DuplicateTag`]
|
||||
/// * duplicate tags, different types: return
|
||||
/// [`LineWriteError::DuplicateTag`]
|
||||
/// * same name for tag and field: return
|
||||
/// [`mutable_batch::writer::Error::TypeMismatch`]
|
||||
/// * same name for tag and field, different type :
|
||||
/// [`mutable_batch::writer::Error::TypeMismatch`]
|
||||
///
|
||||
pub fn write_lp(&mut self, lines: &str) -> Result<()> {
|
||||
for (line_idx, maybe_line) in parse_lines(lines).enumerate() {
|
||||
let mut line = maybe_line.context(LineProtocolSnafu { line: line_idx + 1 })?;
|
||||
|
|
@ -139,38 +154,155 @@ pub fn lines_to_batches_stats(
|
|||
converter.finish()
|
||||
}
|
||||
|
||||
/// Writes the [`ParsedLine`] to the [`MutableBatch`]
|
||||
/// An error applying an already-parsed line protocol line ([`ParsedLine`]) to a
|
||||
/// [`MutableBatch`].
|
||||
#[allow(missing_copy_implementations)]
|
||||
#[derive(Debug, Snafu)]
|
||||
pub enum LineWriteError {
|
||||
/// A transparent error wrapper over the underling mutable batch error.
|
||||
#[snafu(display("{}", source))]
|
||||
MutableBatch {
|
||||
/// The underlying error
|
||||
source: mutable_batch::writer::Error,
|
||||
},
|
||||
|
||||
/// The specified tag name appears twice in one LP line, with conflicting
|
||||
/// values.
|
||||
#[snafu(display(
|
||||
"the tag '{}' is specified more than once with conflicting values",
|
||||
name
|
||||
))]
|
||||
DuplicateTag {
|
||||
/// The duplicated tag name.
|
||||
name: String,
|
||||
},
|
||||
|
||||
/// The specified field name appears twice in one LP line, with conflicting
|
||||
/// types.
|
||||
#[snafu(display(
|
||||
"the field '{}' is specified more than once with conflicting types",
|
||||
name
|
||||
))]
|
||||
ConflictedFieldTypes {
|
||||
/// The duplicated field name.
|
||||
name: String,
|
||||
},
|
||||
}
|
||||
|
||||
/// Writes the [`ParsedLine`] to the [`MutableBatch`], respecting the edge case
|
||||
/// semantics described in [`LinesConverter::write_lp()`].
|
||||
pub fn write_line(
|
||||
writer: &mut Writer<'_>,
|
||||
line: &ParsedLine<'_>,
|
||||
default_time: i64,
|
||||
) -> mutable_batch::writer::Result<()> {
|
||||
for (tag_key, tag_value) in line.series.tag_set.iter().flatten() {
|
||||
writer.write_tag(tag_key.as_str(), None, std::iter::once(tag_value.as_str()))?
|
||||
}
|
||||
|
||||
for (field_key, field_value) in &line.field_set {
|
||||
match field_value {
|
||||
FieldValue::I64(value) => {
|
||||
writer.write_i64(field_key.as_str(), None, std::iter::once(*value))?;
|
||||
}
|
||||
FieldValue::U64(value) => {
|
||||
writer.write_u64(field_key.as_str(), None, std::iter::once(*value))?;
|
||||
}
|
||||
FieldValue::F64(value) => {
|
||||
writer.write_f64(field_key.as_str(), None, std::iter::once(*value))?;
|
||||
}
|
||||
FieldValue::String(value) => {
|
||||
writer.write_string(field_key.as_str(), None, std::iter::once(value.as_str()))?;
|
||||
}
|
||||
FieldValue::Boolean(value) => {
|
||||
writer.write_bool(field_key.as_str(), None, std::iter::once(*value))?;
|
||||
) -> Result<(), LineWriteError> {
|
||||
// Only allocate the seen tags hashset if there are tags.
|
||||
if let Some(tags) = &line.series.tag_set {
|
||||
let mut seen = HashSet::with_capacity(tags.len());
|
||||
for (tag_key, tag_value) in tags.iter() {
|
||||
// Check if a field with this name has been observed previously.
|
||||
if !seen.insert(tag_key) {
|
||||
// This tag_key appears more than once, with differing values.
|
||||
//
|
||||
// This is always an error.
|
||||
return Err(LineWriteError::DuplicateTag {
|
||||
name: tag_key.to_string(),
|
||||
});
|
||||
}
|
||||
writer
|
||||
.write_tag(tag_key.as_str(), None, std::iter::once(tag_value.as_str()))
|
||||
.context(MutableBatchSnafu)?
|
||||
}
|
||||
}
|
||||
|
||||
// In order to maintain parity with TSM, if a field within a single line is
|
||||
// repeated, the last occurrence is retained - for example, in the following
|
||||
// line protocol write:
|
||||
//
|
||||
// table v=2,bananas=42,v=3,platanos=24
|
||||
// ▲ ▲
|
||||
// └───────┬──────┘
|
||||
// │
|
||||
// duplicate field "v"
|
||||
//
|
||||
// The duplicate "v" field should be collapsed into a single "v=3" field,
|
||||
// yielding an effective write of:
|
||||
//
|
||||
// table bananas=42,v=3,platanos=24
|
||||
//
|
||||
// To do this in O(n) time, the code below walks backwards (from right to
|
||||
// left) when visiting parsed fields, and tracks which fields it has
|
||||
// observed. Any time a previously observed field is visited, it is skipped:
|
||||
//
|
||||
// visit direction
|
||||
// ◀─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
|
||||
//
|
||||
// table v=2,bananas=42,v=3,platanos=24
|
||||
// ▲ ▲
|
||||
// │ │
|
||||
// skip keep
|
||||
//
|
||||
// A notable exception of this "last value wins" rule is if the types differ
|
||||
// between each occurrence of "v":
|
||||
//
|
||||
// table v=2i,v=3u
|
||||
//
|
||||
// In this instance we break from established TSM behaviour and return an
|
||||
// error. IOx features schema enforcement, and as such is expected to reject
|
||||
// conflicting types for writes. See the github issue[1] for the desired
|
||||
// semantics.
|
||||
//
|
||||
// Tests below codify each of the scenarios described in the ticket.
|
||||
//
|
||||
// [1]: https://github.com/influxdata/influxdb_iox/issues/4326
|
||||
|
||||
let mut seen = HashMap::<_, &FieldValue<'_>>::with_capacity(line.field_set.len());
|
||||
for (field_key, field_value) in line.field_set.iter().rev() {
|
||||
// Check if a field with this name has been observed previously.
|
||||
match seen.entry(field_key) {
|
||||
Entry::Occupied(e) if e.get().is_same_type(field_value) => {
|
||||
// This field_value, and the "last" occurrence of this field_key
|
||||
// (the first visited) are of the same type - this occurrence is
|
||||
// skipped.
|
||||
continue;
|
||||
}
|
||||
Entry::Occupied(_) => {
|
||||
// This occurrence of "field_key" is of a different type to that
|
||||
// of the "last" (fist visited) occurrence. This is an
|
||||
// internally type-conflicted line and should be rejected.
|
||||
return Err(LineWriteError::ConflictedFieldTypes {
|
||||
name: field_key.to_string(),
|
||||
});
|
||||
}
|
||||
Entry::Vacant(v) => {
|
||||
v.insert(field_value);
|
||||
}
|
||||
};
|
||||
|
||||
match field_value {
|
||||
FieldValue::I64(value) => {
|
||||
writer.write_i64(field_key.as_str(), None, std::iter::once(*value))
|
||||
}
|
||||
FieldValue::U64(value) => {
|
||||
writer.write_u64(field_key.as_str(), None, std::iter::once(*value))
|
||||
}
|
||||
FieldValue::F64(value) => {
|
||||
writer.write_f64(field_key.as_str(), None, std::iter::once(*value))
|
||||
}
|
||||
FieldValue::String(value) => {
|
||||
writer.write_string(field_key.as_str(), None, std::iter::once(value.as_str()))
|
||||
}
|
||||
FieldValue::Boolean(value) => {
|
||||
writer.write_bool(field_key.as_str(), None, std::iter::once(*value))
|
||||
}
|
||||
}
|
||||
.context(MutableBatchSnafu)?;
|
||||
}
|
||||
|
||||
let time = line.timestamp.unwrap_or(default_time);
|
||||
writer.write_time("time", std::iter::once(time))?;
|
||||
writer
|
||||
.write_time("time", std::iter::once(time))
|
||||
.context(MutableBatchSnafu)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
@ -192,6 +324,7 @@ pub mod test_helpers {
|
|||
mod tests {
|
||||
use super::*;
|
||||
use arrow_util::assert_batches_eq;
|
||||
use assert_matches::assert_matches;
|
||||
use schema::selection::Selection;
|
||||
|
||||
#[test]
|
||||
|
|
@ -309,4 +442,136 @@ m b=t 1639612800000000000
|
|||
assert!(u.is_valid(1));
|
||||
assert!(!u.is_valid(2));
|
||||
}
|
||||
|
||||
// https://github.com/influxdata/influxdb_iox/issues/4326
|
||||
mod issue4326 {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_duplicate_field_same_value() {
|
||||
let lp = "m1 val=2i,val=2i 0";
|
||||
|
||||
let batches = lines_to_batches(lp, 5).unwrap();
|
||||
assert_eq!(batches.len(), 1);
|
||||
|
||||
assert_batches_eq!(
|
||||
&[
|
||||
"+----------------------+-----+",
|
||||
"| time | val |",
|
||||
"+----------------------+-----+",
|
||||
"| 1970-01-01T00:00:00Z | 2 |",
|
||||
"+----------------------+-----+",
|
||||
],
|
||||
&[batches["m1"].to_arrow(Selection::All).unwrap()]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_duplicate_field_different_values() {
|
||||
let lp = "m1 val=1i,val=2i 0";
|
||||
|
||||
let batches = lines_to_batches(lp, 5).unwrap();
|
||||
assert_eq!(batches.len(), 1);
|
||||
|
||||
// "last value wins"
|
||||
assert_batches_eq!(
|
||||
&[
|
||||
"+----------------------+-----+",
|
||||
"| time | val |",
|
||||
"+----------------------+-----+",
|
||||
"| 1970-01-01T00:00:00Z | 2 |",
|
||||
"+----------------------+-----+",
|
||||
],
|
||||
&[batches["m1"].to_arrow(Selection::All).unwrap()]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_duplicate_fields_different_type() {
|
||||
let lp = "m1 val=1i,val=2.0 0";
|
||||
|
||||
let err = lines_to_batches(lp, 5).expect_err("type conflicted write should fail");
|
||||
assert_matches!( err,
|
||||
Error::Write {
|
||||
source: LineWriteError::ConflictedFieldTypes { name },
|
||||
line: 1
|
||||
}
|
||||
=> {
|
||||
assert_eq!(name, "val");
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_duplicate_tags_same_value() {
|
||||
let lp = "m1,tag=1,tag=1 val=1i 0";
|
||||
|
||||
let err = lines_to_batches(lp, 5).expect_err("type conflicted write should fail");
|
||||
assert_matches!( err,
|
||||
Error::Write {
|
||||
source: LineWriteError::DuplicateTag { name },
|
||||
line: 1
|
||||
}
|
||||
=> {
|
||||
assert_eq!(name, "tag");
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_duplicate_tags_different_value() {
|
||||
let lp = "m1,tag=1,tag=2 val=1i 0";
|
||||
|
||||
let err = lines_to_batches(lp, 5).expect_err("type conflicted write should fail");
|
||||
assert_matches!( err,
|
||||
Error::Write {
|
||||
source: LineWriteError::DuplicateTag { name },
|
||||
line: 1
|
||||
}
|
||||
=> {
|
||||
assert_eq!(name, "tag");
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_duplicate_tags_different_type() {
|
||||
let lp = "m1,tag=1,tag=2.0 val=1i 0";
|
||||
|
||||
let err = lines_to_batches(lp, 5).expect_err("type conflicted write should fail");
|
||||
assert_matches!( err,
|
||||
Error::Write {
|
||||
source: LineWriteError::DuplicateTag { name },
|
||||
line: 1
|
||||
}
|
||||
=> {
|
||||
assert_eq!(name, "tag");
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_duplicate_is_tag_and_field() {
|
||||
let lp = "m1,v=1i v=1i 0";
|
||||
|
||||
let err = lines_to_batches(lp, 5).expect_err("type conflicted write should fail");
|
||||
assert_matches!(
|
||||
err,
|
||||
Error::Write {
|
||||
source: LineWriteError::MutableBatch { .. },
|
||||
line: 1
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_duplicate_is_tag_and_field_different_types() {
|
||||
let lp = "m1,v=1i v=1.0 0";
|
||||
|
||||
let err = lines_to_batches(lp, 5).expect_err("type conflicted write should fail");
|
||||
assert_matches!(
|
||||
err,
|
||||
Error::Write {
|
||||
source: LineWriteError::MutableBatch { .. },
|
||||
line: 1
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -458,6 +458,8 @@ mod tests {
|
|||
use flate2::{write::GzEncoder, Compression};
|
||||
use hyper::header::HeaderValue;
|
||||
use metric::{Attributes, Metric};
|
||||
use mutable_batch::column::ColumnData;
|
||||
use mutable_batch_lp::LineWriteError;
|
||||
|
||||
use crate::dml_handlers::mock::{MockDmlHandler, MockDmlHandlerCall};
|
||||
|
||||
|
|
@ -829,6 +831,32 @@ mod tests {
|
|||
}
|
||||
);
|
||||
|
||||
test_write_handler!(
|
||||
field_upsert_within_batch,
|
||||
query_string = "?org=bananas&bucket=test",
|
||||
body = "test field=1u 100\ntest field=2u 100".as_bytes(),
|
||||
dml_handler = [Ok(summary())],
|
||||
want_result = Ok(_),
|
||||
want_dml_calls = [MockDmlHandlerCall::Write{namespace, write_input}] => {
|
||||
assert_eq!(namespace, "bananas_test");
|
||||
let table = write_input.get("test").expect("table not in write");
|
||||
let col = table.column("field").expect("column missing");
|
||||
assert_matches!(col.data(), ColumnData::U64(data, _) => {
|
||||
// Ensure both values are recorded, in the correct order.
|
||||
assert_eq!(data.as_slice(), [1, 2]);
|
||||
});
|
||||
}
|
||||
);
|
||||
|
||||
test_write_handler!(
|
||||
column_named_time,
|
||||
query_string = "?org=bananas&bucket=test",
|
||||
body = "test field=1u,time=42u 100".as_bytes(),
|
||||
dml_handler = [],
|
||||
want_result = Err(_),
|
||||
want_dml_calls = []
|
||||
);
|
||||
|
||||
test_delete_handler!(
|
||||
ok,
|
||||
query_string = "?org=bananas&bucket=test",
|
||||
|
|
@ -931,4 +959,119 @@ mod tests {
|
|||
want_result = Err(Error::NoHandler),
|
||||
want_dml_calls = []
|
||||
);
|
||||
|
||||
// https://github.com/influxdata/influxdb_iox/issues/4326
|
||||
mod issue4326 {
|
||||
use super::*;
|
||||
|
||||
test_write_handler!(
|
||||
duplicate_fields_same_value,
|
||||
query_string = "?org=bananas&bucket=test",
|
||||
body = "whydo InputPower=300i,InputPower=300i".as_bytes(),
|
||||
dml_handler = [Ok(summary())],
|
||||
want_result = Ok(_),
|
||||
want_dml_calls = [MockDmlHandlerCall::Write{namespace, write_input}] => {
|
||||
assert_eq!(namespace, "bananas_test");
|
||||
let table = write_input.get("whydo").expect("table not in write");
|
||||
let col = table.column("InputPower").expect("column missing");
|
||||
assert_matches!(col.data(), ColumnData::I64(data, _) => {
|
||||
// Ensure the duplicate values are coalesced.
|
||||
assert_eq!(data.as_slice(), [300]);
|
||||
});
|
||||
}
|
||||
);
|
||||
|
||||
test_write_handler!(
|
||||
duplicate_fields_different_value,
|
||||
query_string = "?org=bananas&bucket=test",
|
||||
body = "whydo InputPower=300i,InputPower=42i".as_bytes(),
|
||||
dml_handler = [Ok(summary())],
|
||||
want_result = Ok(_),
|
||||
want_dml_calls = [MockDmlHandlerCall::Write{namespace, write_input}] => {
|
||||
assert_eq!(namespace, "bananas_test");
|
||||
let table = write_input.get("whydo").expect("table not in write");
|
||||
let col = table.column("InputPower").expect("column missing");
|
||||
assert_matches!(col.data(), ColumnData::I64(data, _) => {
|
||||
// Last value wins
|
||||
assert_eq!(data.as_slice(), [42]);
|
||||
});
|
||||
}
|
||||
);
|
||||
|
||||
test_write_handler!(
|
||||
duplicate_fields_different_type,
|
||||
query_string = "?org=bananas&bucket=test",
|
||||
body = "whydo InputPower=300i,InputPower=4.2".as_bytes(),
|
||||
dml_handler = [],
|
||||
want_result = Err(Error::ParseLineProtocol(mutable_batch_lp::Error::Write {
|
||||
source: LineWriteError::ConflictedFieldTypes { .. },
|
||||
..
|
||||
})),
|
||||
want_dml_calls = []
|
||||
);
|
||||
|
||||
test_write_handler!(
|
||||
duplicate_tags_same_value,
|
||||
query_string = "?org=bananas&bucket=test",
|
||||
body = "whydo,InputPower=300i,InputPower=300i field=42i".as_bytes(),
|
||||
dml_handler = [],
|
||||
want_result = Err(Error::ParseLineProtocol(mutable_batch_lp::Error::Write {
|
||||
source: LineWriteError::DuplicateTag { .. },
|
||||
..
|
||||
})),
|
||||
want_dml_calls = []
|
||||
);
|
||||
|
||||
test_write_handler!(
|
||||
duplicate_tags_different_value,
|
||||
query_string = "?org=bananas&bucket=test",
|
||||
body = "whydo,InputPower=300i,InputPower=42i field=42i".as_bytes(),
|
||||
dml_handler = [],
|
||||
want_result = Err(Error::ParseLineProtocol(mutable_batch_lp::Error::Write {
|
||||
source: LineWriteError::DuplicateTag { .. },
|
||||
..
|
||||
})),
|
||||
want_dml_calls = []
|
||||
);
|
||||
|
||||
test_write_handler!(
|
||||
duplicate_tags_different_type,
|
||||
query_string = "?org=bananas&bucket=test",
|
||||
body = "whydo,InputPower=300i,InputPower=4.2 field=42i".as_bytes(),
|
||||
dml_handler = [],
|
||||
want_result = Err(Error::ParseLineProtocol(mutable_batch_lp::Error::Write {
|
||||
source: LineWriteError::DuplicateTag { .. },
|
||||
..
|
||||
})),
|
||||
want_dml_calls = []
|
||||
);
|
||||
|
||||
test_write_handler!(
|
||||
duplicate_is_tag_and_field,
|
||||
query_string = "?org=bananas&bucket=test",
|
||||
body = "whydo,InputPower=300i InputPower=300i".as_bytes(),
|
||||
dml_handler = [],
|
||||
want_result = Err(Error::ParseLineProtocol(mutable_batch_lp::Error::Write {
|
||||
source: LineWriteError::MutableBatch {
|
||||
source: mutable_batch::writer::Error::TypeMismatch { .. }
|
||||
},
|
||||
..
|
||||
})),
|
||||
want_dml_calls = []
|
||||
);
|
||||
|
||||
test_write_handler!(
|
||||
duplicate_is_tag_and_field_different_types,
|
||||
query_string = "?org=bananas&bucket=test",
|
||||
body = "whydo,InputPower=300i InputPower=30.0".as_bytes(),
|
||||
dml_handler = [],
|
||||
want_result = Err(Error::ParseLineProtocol(mutable_batch_lp::Error::Write {
|
||||
source: LineWriteError::MutableBatch {
|
||||
source: mutable_batch::writer::Error::TypeMismatch { .. }
|
||||
},
|
||||
..
|
||||
})),
|
||||
want_dml_calls = []
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue