From cf200e509e72b1bc10ebc3eceb2c5d99102b71d8 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Fri, 22 Apr 2022 14:40:51 +0100 Subject: [PATCH] fix: handle LP lines with duplicated fields/tags MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit allows the LP consumer to correctly handle line protocol writes that duplicate one or more fields or tags within a single line: table v=2,bananas=42,v=3,platanos=24 ▲ ▲ └───────┬──────┘ │ duplicate field "v" This change implements the following logic when processing fields: IF field is duplicated IF all duplicate field values are of the same data type use last occurrence ("last write wins") ELSE return an error Any duplication of tags is rejected, and use of a field name as both a field and a tag is remains forbidden (unchanged in this PR, a previously agreed breaking change from TSM). See https://github.com/influxdata/influxdb_iox/issues/4326 for context. --- Cargo.lock | 1 + influxdb_line_protocol/src/lib.rs | 39 +++- mutable_batch_lp/Cargo.toml | 1 + mutable_batch_lp/src/lib.rs | 323 +++++++++++++++++++++++++++--- router2/src/server/http.rs | 143 +++++++++++++ 5 files changed, 477 insertions(+), 30 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d406f121cf..b1a571c563 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3525,6 +3525,7 @@ name = "mutable_batch_lp" version = "0.1.0" dependencies = [ "arrow_util", + "assert_matches", "hashbrown 0.12.0", "influxdb_line_protocol", "mutable_batch", diff --git a/influxdb_line_protocol/src/lib.rs b/influxdb_line_protocol/src/lib.rs index 18caa0bf20..f629cf9807 100644 --- a/influxdb_line_protocol/src/lib.rs +++ b/influxdb_line_protocol/src/lib.rs @@ -342,6 +342,19 @@ pub enum FieldValue<'a> { Boolean(bool), } +impl<'a> FieldValue<'a> { + /// Returns true if `self` and `other` are of the same data type. + pub fn is_same_type(&self, other: &Self) -> bool { + match self { + FieldValue::I64(_) => matches!(other, FieldValue::I64(_)), + FieldValue::U64(_) => matches!(other, FieldValue::U64(_)), + FieldValue::F64(_) => matches!(other, FieldValue::F64(_)), + FieldValue::String(_) => matches!(other, FieldValue::String(_)), + FieldValue::Boolean(_) => matches!(other, FieldValue::Boolean(_)), + } + } +} + /// Converts FieldValue back to LineProtocol /// See /// for more detail. @@ -369,7 +382,7 @@ impl<'a> Display for FieldValue<'a> { /// For example the 8 character string `Foo\\Bar` (note the double /// `\\`) is parsed into the logical 7 character string `Foo\Bar` /// (note the single `\`) -#[derive(Debug, Clone, Eq)] +#[derive(Debug, Clone, Eq, Hash)] pub enum EscapedStr<'a> { SingleSlice(&'a str), CopiedValue(String), @@ -2217,4 +2230,28 @@ her"#, assert_eq!(vals[0].tag_value("asdf"), None); } + + #[test] + fn test_field_value_same_type() { + // True cases + assert!(FieldValue::I64(0).is_same_type(&FieldValue::I64(42))); + assert!(FieldValue::U64(0).is_same_type(&FieldValue::U64(42))); + assert!(FieldValue::F64(0.0).is_same_type(&FieldValue::F64(4.2))); + assert!( + FieldValue::String(EscapedStr::CopiedValue("bananas".to_string())).is_same_type( + &FieldValue::String(EscapedStr::CopiedValue("platanos".to_string())) + ) + ); + assert!(FieldValue::Boolean(true).is_same_type(&FieldValue::Boolean(false))); + + // Some false cases + assert!(!FieldValue::I64(0).is_same_type(&FieldValue::U64(42))); + assert!(!FieldValue::U64(0).is_same_type(&FieldValue::I64(42))); + assert!(!FieldValue::F64(0.0).is_same_type(&FieldValue::U64(42))); + assert!( + !FieldValue::String(EscapedStr::CopiedValue("bananas".to_string())) + .is_same_type(&FieldValue::U64(42)) + ); + assert!(!FieldValue::Boolean(true).is_same_type(&FieldValue::U64(42))); + } } diff --git a/mutable_batch_lp/Cargo.toml b/mutable_batch_lp/Cargo.toml index 697a98a8cb..6d3f465eae 100644 --- a/mutable_batch_lp/Cargo.toml +++ b/mutable_batch_lp/Cargo.toml @@ -14,3 +14,4 @@ workspace-hack = { path = "../workspace-hack"} [dev-dependencies] arrow_util = { path = "../arrow_util" } +assert_matches = "1.5.0" diff --git a/mutable_batch_lp/src/lib.rs b/mutable_batch_lp/src/lib.rs index 2204999300..fc450b0928 100644 --- a/mutable_batch_lp/src/lib.rs +++ b/mutable_batch_lp/src/lib.rs @@ -11,7 +11,7 @@ clippy::clone_on_ref_ptr )] -use hashbrown::HashMap; +use hashbrown::{hash_map::Entry, HashMap, HashSet}; use influxdb_line_protocol::{parse_lines, FieldValue, ParsedLine}; use mutable_batch::writer::Writer; use mutable_batch::MutableBatch; @@ -28,10 +28,7 @@ pub enum Error { }, #[snafu(display("error writing line {}: {}", line, source))] - Write { - source: mutable_batch::writer::Error, - line: usize, - }, + Write { source: LineWriteError, line: usize }, #[snafu(display("empty write payload"))] EmptyPayload, @@ -81,7 +78,25 @@ impl LinesConverter { self.timestamp_base = timestamp_base } - /// Write some line protocol data + /// Write some line protocol data. + /// + /// If a field / tag name appears more than once in a single line, the + /// following semantics apply: + /// + /// * duplicate fields, same value: do nothing/coalesce + /// * duplicate fields, different value: last occurrence wins + /// * duplicate fields, different types: return + /// [`LineWriteError::ConflictedFieldTypes`] + /// * duplicate tags, same value: return [`LineWriteError::DuplicateTag`] + /// * duplicate tags, different value: return + /// [`LineWriteError::DuplicateTag`] + /// * duplicate tags, different types: return + /// [`LineWriteError::DuplicateTag`] + /// * same name for tag and field: return + /// [`mutable_batch::writer::Error::TypeMismatch`] + /// * same name for tag and field, different type : + /// [`mutable_batch::writer::Error::TypeMismatch`] + /// pub fn write_lp(&mut self, lines: &str) -> Result<()> { for (line_idx, maybe_line) in parse_lines(lines).enumerate() { let mut line = maybe_line.context(LineProtocolSnafu { line: line_idx + 1 })?; @@ -139,38 +154,155 @@ pub fn lines_to_batches_stats( converter.finish() } -/// Writes the [`ParsedLine`] to the [`MutableBatch`] +/// An error applying an already-parsed line protocol line ([`ParsedLine`]) to a +/// [`MutableBatch`]. +#[allow(missing_copy_implementations)] +#[derive(Debug, Snafu)] +pub enum LineWriteError { + /// A transparent error wrapper over the underling mutable batch error. + #[snafu(display("{}", source))] + MutableBatch { + /// The underlying error + source: mutable_batch::writer::Error, + }, + + /// The specified tag name appears twice in one LP line, with conflicting + /// values. + #[snafu(display( + "the tag '{}' is specified more than once with conflicting values", + name + ))] + DuplicateTag { + /// The duplicated tag name. + name: String, + }, + + /// The specified field name appears twice in one LP line, with conflicting + /// types. + #[snafu(display( + "the field '{}' is specified more than once with conflicting types", + name + ))] + ConflictedFieldTypes { + /// The duplicated field name. + name: String, + }, +} + +/// Writes the [`ParsedLine`] to the [`MutableBatch`], respecting the edge case +/// semantics described in [`LinesConverter::write_lp()`]. pub fn write_line( writer: &mut Writer<'_>, line: &ParsedLine<'_>, default_time: i64, -) -> mutable_batch::writer::Result<()> { - for (tag_key, tag_value) in line.series.tag_set.iter().flatten() { - writer.write_tag(tag_key.as_str(), None, std::iter::once(tag_value.as_str()))? - } - - for (field_key, field_value) in &line.field_set { - match field_value { - FieldValue::I64(value) => { - writer.write_i64(field_key.as_str(), None, std::iter::once(*value))?; - } - FieldValue::U64(value) => { - writer.write_u64(field_key.as_str(), None, std::iter::once(*value))?; - } - FieldValue::F64(value) => { - writer.write_f64(field_key.as_str(), None, std::iter::once(*value))?; - } - FieldValue::String(value) => { - writer.write_string(field_key.as_str(), None, std::iter::once(value.as_str()))?; - } - FieldValue::Boolean(value) => { - writer.write_bool(field_key.as_str(), None, std::iter::once(*value))?; +) -> Result<(), LineWriteError> { + // Only allocate the seen tags hashset if there are tags. + if let Some(tags) = &line.series.tag_set { + let mut seen = HashSet::with_capacity(tags.len()); + for (tag_key, tag_value) in tags.iter() { + // Check if a field with this name has been observed previously. + if !seen.insert(tag_key) { + // This tag_key appears more than once, with differing values. + // + // This is always an error. + return Err(LineWriteError::DuplicateTag { + name: tag_key.to_string(), + }); } + writer + .write_tag(tag_key.as_str(), None, std::iter::once(tag_value.as_str())) + .context(MutableBatchSnafu)? } } + // In order to maintain parity with TSM, if a field within a single line is + // repeated, the last occurrence is retained - for example, in the following + // line protocol write: + // + // table v=2,bananas=42,v=3,platanos=24 + // ▲ ▲ + // └───────┬──────┘ + // │ + // duplicate field "v" + // + // The duplicate "v" field should be collapsed into a single "v=3" field, + // yielding an effective write of: + // + // table bananas=42,v=3,platanos=24 + // + // To do this in O(n) time, the code below walks backwards (from right to + // left) when visiting parsed fields, and tracks which fields it has + // observed. Any time a previously observed field is visited, it is skipped: + // + // visit direction + // ◀─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ + // + // table v=2,bananas=42,v=3,platanos=24 + // ▲ ▲ + // │ │ + // skip keep + // + // A notable exception of this "last value wins" rule is if the types differ + // between each occurrence of "v": + // + // table v=2i,v=3u + // + // In this instance we break from established TSM behaviour and return an + // error. IOx features schema enforcement, and as such is expected to reject + // conflicting types for writes. See the github issue[1] for the desired + // semantics. + // + // Tests below codify each of the scenarios described in the ticket. + // + // [1]: https://github.com/influxdata/influxdb_iox/issues/4326 + + let mut seen = HashMap::<_, &FieldValue<'_>>::with_capacity(line.field_set.len()); + for (field_key, field_value) in line.field_set.iter().rev() { + // Check if a field with this name has been observed previously. + match seen.entry(field_key) { + Entry::Occupied(e) if e.get().is_same_type(field_value) => { + // This field_value, and the "last" occurrence of this field_key + // (the first visited) are of the same type - this occurrence is + // skipped. + continue; + } + Entry::Occupied(_) => { + // This occurrence of "field_key" is of a different type to that + // of the "last" (fist visited) occurrence. This is an + // internally type-conflicted line and should be rejected. + return Err(LineWriteError::ConflictedFieldTypes { + name: field_key.to_string(), + }); + } + Entry::Vacant(v) => { + v.insert(field_value); + } + }; + + match field_value { + FieldValue::I64(value) => { + writer.write_i64(field_key.as_str(), None, std::iter::once(*value)) + } + FieldValue::U64(value) => { + writer.write_u64(field_key.as_str(), None, std::iter::once(*value)) + } + FieldValue::F64(value) => { + writer.write_f64(field_key.as_str(), None, std::iter::once(*value)) + } + FieldValue::String(value) => { + writer.write_string(field_key.as_str(), None, std::iter::once(value.as_str())) + } + FieldValue::Boolean(value) => { + writer.write_bool(field_key.as_str(), None, std::iter::once(*value)) + } + } + .context(MutableBatchSnafu)?; + } + let time = line.timestamp.unwrap_or(default_time); - writer.write_time("time", std::iter::once(time))?; + writer + .write_time("time", std::iter::once(time)) + .context(MutableBatchSnafu)?; Ok(()) } @@ -192,6 +324,7 @@ pub mod test_helpers { mod tests { use super::*; use arrow_util::assert_batches_eq; + use assert_matches::assert_matches; use schema::selection::Selection; #[test] @@ -309,4 +442,136 @@ m b=t 1639612800000000000 assert!(u.is_valid(1)); assert!(!u.is_valid(2)); } + + // https://github.com/influxdata/influxdb_iox/issues/4326 + mod issue4326 { + use super::*; + + #[test] + fn test_duplicate_field_same_value() { + let lp = "m1 val=2i,val=2i 0"; + + let batches = lines_to_batches(lp, 5).unwrap(); + assert_eq!(batches.len(), 1); + + assert_batches_eq!( + &[ + "+----------------------+-----+", + "| time | val |", + "+----------------------+-----+", + "| 1970-01-01T00:00:00Z | 2 |", + "+----------------------+-----+", + ], + &[batches["m1"].to_arrow(Selection::All).unwrap()] + ); + } + + #[test] + fn test_duplicate_field_different_values() { + let lp = "m1 val=1i,val=2i 0"; + + let batches = lines_to_batches(lp, 5).unwrap(); + assert_eq!(batches.len(), 1); + + // "last value wins" + assert_batches_eq!( + &[ + "+----------------------+-----+", + "| time | val |", + "+----------------------+-----+", + "| 1970-01-01T00:00:00Z | 2 |", + "+----------------------+-----+", + ], + &[batches["m1"].to_arrow(Selection::All).unwrap()] + ); + } + + #[test] + fn test_duplicate_fields_different_type() { + let lp = "m1 val=1i,val=2.0 0"; + + let err = lines_to_batches(lp, 5).expect_err("type conflicted write should fail"); + assert_matches!( err, + Error::Write { + source: LineWriteError::ConflictedFieldTypes { name }, + line: 1 + } + => { + assert_eq!(name, "val"); + }); + } + + #[test] + fn test_duplicate_tags_same_value() { + let lp = "m1,tag=1,tag=1 val=1i 0"; + + let err = lines_to_batches(lp, 5).expect_err("type conflicted write should fail"); + assert_matches!( err, + Error::Write { + source: LineWriteError::DuplicateTag { name }, + line: 1 + } + => { + assert_eq!(name, "tag"); + }); + } + + #[test] + fn test_duplicate_tags_different_value() { + let lp = "m1,tag=1,tag=2 val=1i 0"; + + let err = lines_to_batches(lp, 5).expect_err("type conflicted write should fail"); + assert_matches!( err, + Error::Write { + source: LineWriteError::DuplicateTag { name }, + line: 1 + } + => { + assert_eq!(name, "tag"); + }); + } + + #[test] + fn test_duplicate_tags_different_type() { + let lp = "m1,tag=1,tag=2.0 val=1i 0"; + + let err = lines_to_batches(lp, 5).expect_err("type conflicted write should fail"); + assert_matches!( err, + Error::Write { + source: LineWriteError::DuplicateTag { name }, + line: 1 + } + => { + assert_eq!(name, "tag"); + }); + } + + #[test] + fn test_duplicate_is_tag_and_field() { + let lp = "m1,v=1i v=1i 0"; + + let err = lines_to_batches(lp, 5).expect_err("type conflicted write should fail"); + assert_matches!( + err, + Error::Write { + source: LineWriteError::MutableBatch { .. }, + line: 1 + } + ); + } + + #[test] + fn test_duplicate_is_tag_and_field_different_types() { + let lp = "m1,v=1i v=1.0 0"; + + let err = lines_to_batches(lp, 5).expect_err("type conflicted write should fail"); + assert_matches!( + err, + Error::Write { + source: LineWriteError::MutableBatch { .. }, + line: 1 + } + ); + } + } } diff --git a/router2/src/server/http.rs b/router2/src/server/http.rs index ace220c21f..e29f2347bf 100644 --- a/router2/src/server/http.rs +++ b/router2/src/server/http.rs @@ -458,6 +458,8 @@ mod tests { use flate2::{write::GzEncoder, Compression}; use hyper::header::HeaderValue; use metric::{Attributes, Metric}; + use mutable_batch::column::ColumnData; + use mutable_batch_lp::LineWriteError; use crate::dml_handlers::mock::{MockDmlHandler, MockDmlHandlerCall}; @@ -829,6 +831,32 @@ mod tests { } ); + test_write_handler!( + field_upsert_within_batch, + query_string = "?org=bananas&bucket=test", + body = "test field=1u 100\ntest field=2u 100".as_bytes(), + dml_handler = [Ok(summary())], + want_result = Ok(_), + want_dml_calls = [MockDmlHandlerCall::Write{namespace, write_input}] => { + assert_eq!(namespace, "bananas_test"); + let table = write_input.get("test").expect("table not in write"); + let col = table.column("field").expect("column missing"); + assert_matches!(col.data(), ColumnData::U64(data, _) => { + // Ensure both values are recorded, in the correct order. + assert_eq!(data.as_slice(), [1, 2]); + }); + } + ); + + test_write_handler!( + column_named_time, + query_string = "?org=bananas&bucket=test", + body = "test field=1u,time=42u 100".as_bytes(), + dml_handler = [], + want_result = Err(_), + want_dml_calls = [] + ); + test_delete_handler!( ok, query_string = "?org=bananas&bucket=test", @@ -931,4 +959,119 @@ mod tests { want_result = Err(Error::NoHandler), want_dml_calls = [] ); + + // https://github.com/influxdata/influxdb_iox/issues/4326 + mod issue4326 { + use super::*; + + test_write_handler!( + duplicate_fields_same_value, + query_string = "?org=bananas&bucket=test", + body = "whydo InputPower=300i,InputPower=300i".as_bytes(), + dml_handler = [Ok(summary())], + want_result = Ok(_), + want_dml_calls = [MockDmlHandlerCall::Write{namespace, write_input}] => { + assert_eq!(namespace, "bananas_test"); + let table = write_input.get("whydo").expect("table not in write"); + let col = table.column("InputPower").expect("column missing"); + assert_matches!(col.data(), ColumnData::I64(data, _) => { + // Ensure the duplicate values are coalesced. + assert_eq!(data.as_slice(), [300]); + }); + } + ); + + test_write_handler!( + duplicate_fields_different_value, + query_string = "?org=bananas&bucket=test", + body = "whydo InputPower=300i,InputPower=42i".as_bytes(), + dml_handler = [Ok(summary())], + want_result = Ok(_), + want_dml_calls = [MockDmlHandlerCall::Write{namespace, write_input}] => { + assert_eq!(namespace, "bananas_test"); + let table = write_input.get("whydo").expect("table not in write"); + let col = table.column("InputPower").expect("column missing"); + assert_matches!(col.data(), ColumnData::I64(data, _) => { + // Last value wins + assert_eq!(data.as_slice(), [42]); + }); + } + ); + + test_write_handler!( + duplicate_fields_different_type, + query_string = "?org=bananas&bucket=test", + body = "whydo InputPower=300i,InputPower=4.2".as_bytes(), + dml_handler = [], + want_result = Err(Error::ParseLineProtocol(mutable_batch_lp::Error::Write { + source: LineWriteError::ConflictedFieldTypes { .. }, + .. + })), + want_dml_calls = [] + ); + + test_write_handler!( + duplicate_tags_same_value, + query_string = "?org=bananas&bucket=test", + body = "whydo,InputPower=300i,InputPower=300i field=42i".as_bytes(), + dml_handler = [], + want_result = Err(Error::ParseLineProtocol(mutable_batch_lp::Error::Write { + source: LineWriteError::DuplicateTag { .. }, + .. + })), + want_dml_calls = [] + ); + + test_write_handler!( + duplicate_tags_different_value, + query_string = "?org=bananas&bucket=test", + body = "whydo,InputPower=300i,InputPower=42i field=42i".as_bytes(), + dml_handler = [], + want_result = Err(Error::ParseLineProtocol(mutable_batch_lp::Error::Write { + source: LineWriteError::DuplicateTag { .. }, + .. + })), + want_dml_calls = [] + ); + + test_write_handler!( + duplicate_tags_different_type, + query_string = "?org=bananas&bucket=test", + body = "whydo,InputPower=300i,InputPower=4.2 field=42i".as_bytes(), + dml_handler = [], + want_result = Err(Error::ParseLineProtocol(mutable_batch_lp::Error::Write { + source: LineWriteError::DuplicateTag { .. }, + .. + })), + want_dml_calls = [] + ); + + test_write_handler!( + duplicate_is_tag_and_field, + query_string = "?org=bananas&bucket=test", + body = "whydo,InputPower=300i InputPower=300i".as_bytes(), + dml_handler = [], + want_result = Err(Error::ParseLineProtocol(mutable_batch_lp::Error::Write { + source: LineWriteError::MutableBatch { + source: mutable_batch::writer::Error::TypeMismatch { .. } + }, + .. + })), + want_dml_calls = [] + ); + + test_write_handler!( + duplicate_is_tag_and_field_different_types, + query_string = "?org=bananas&bucket=test", + body = "whydo,InputPower=300i InputPower=30.0".as_bytes(), + dml_handler = [], + want_result = Err(Error::ParseLineProtocol(mutable_batch_lp::Error::Write { + source: LineWriteError::MutableBatch { + source: mutable_batch::writer::Error::TypeMismatch { .. } + }, + .. + })), + want_dml_calls = [] + ); + } }