fix(line-protocol): add unsigned integer field type

Fixes #904

The line protocol parser was lacking the unsigned integer type, which
suffixes values with `u`. This adds unsigned integer support to the line
protocol parser, and fills a few corresponding gaps in the mutable
buffer.
pull/24376/head
Jacob Marble 2021-03-02 14:08:22 -08:00
parent 500c237f62
commit ac1b0c04ae
6 changed files with 95 additions and 11 deletions

View File

@ -317,6 +317,7 @@ fn add_line<'a>(
for (column, value) in &line.field_set {
let val = match value {
FieldValue::I64(v) => add_i64_value(fbb, column.as_str(), *v),
FieldValue::U64(v) => add_u64_value(fbb, column.as_str(), *v),
FieldValue::F64(v) => add_f64_value(fbb, column.as_str(), *v),
FieldValue::Boolean(v) => add_bool_value(fbb, column.as_str(), *v),
FieldValue::String(v) => add_string_value(fbb, column.as_str(), v.as_str()),
@ -393,6 +394,16 @@ fn add_i64_value<'a>(
add_value(fbb, column, wb::ColumnValue::I64Value, iv.as_union_value())
}
fn add_u64_value<'a>(
fbb: &mut FlatBufferBuilder<'a>,
column: &str,
value: u64,
) -> flatbuffers::WIPOffset<wb::Value<'a>> {
let iv = wb::U64Value::create(fbb, &wb::U64ValueArgs { value });
add_value(fbb, column, wb::ColumnValue::U64Value, iv.as_union_value())
}
fn add_bool_value<'a>(
fbb: &mut FlatBufferBuilder<'a>,
column: &str,

View File

@ -50,6 +50,12 @@ pub enum Error {
value: String,
},
#[snafu(display(r#"Unable to parse unsigned integer value '{}'"#, value))]
UIntegerValueInvalid {
source: std::num::ParseIntError,
value: String,
},
#[snafu(display(r#"Unable to parse floating-point value '{}'"#, value))]
FloatValueInvalid {
source: std::num::ParseFloatError,
@ -333,10 +339,11 @@ pub type FieldSet<'a> = SmallVec<[(EscapedStr<'a>, FieldValue<'a>); 4]>;
pub type TagSet<'a> = SmallVec<[(EscapedStr<'a>, EscapedStr<'a>); 8]>;
/// Allowed types of Fields in a `ParsedLine`. One of the types described in
/// https://docs.influxdata.com/influxdb/v1.8/write_protocols/line_protocol_tutorial/#data-types
/// https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/#data-types-and-format
#[derive(Debug, Clone, PartialEq)]
pub enum FieldValue<'a> {
I64(i64),
U64(u64),
F64(f64),
String(EscapedStr<'a>),
Boolean(bool),
@ -349,6 +356,7 @@ impl<'a> Display for FieldValue<'a> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::I64(v) => write!(f, "{}i", v),
Self::U64(v) => write!(f, "{}u", v),
Self::F64(v) => write!(f, "{}", v),
Self::String(v) => escape_and_write_value(f, v, FIELD_VALUE_STRING_DELIMITERS),
Self::Boolean(v) => write!(f, "{}", v),
@ -644,20 +652,28 @@ fn field_key(i: &str) -> IResult<&str, EscapedStr<'_>> {
fn field_value(i: &str) -> IResult<&str, FieldValue<'_>> {
let int = map(field_integer_value, FieldValue::I64);
let uint = map(field_uinteger_value, FieldValue::U64);
let float = map(field_float_value, FieldValue::F64);
let string = map(field_string_value, FieldValue::String);
let boolv = map(field_bool_value, FieldValue::Boolean);
alt((int, float, string, boolv))(i)
alt((int, uint, float, string, boolv))(i)
}
fn field_integer_value(i: &str) -> IResult<&str, i64> {
let tagged_value = terminated(integral_value_common, tag("i"));
let tagged_value = terminated(integral_value_signed, tag("i"));
map_fail(tagged_value, |value| {
value.parse().context(IntegerValueInvalid { value })
})(i)
}
fn field_uinteger_value(i: &str) -> IResult<&str, u64> {
let tagged_value = terminated(digit1, tag("u"));
map_fail(tagged_value, |value| {
value.parse().context(UIntegerValueInvalid { value })
})(i)
}
fn field_float_value(i: &str) -> IResult<&str, f64> {
let value = alt((field_float_value_with_decimal, field_float_value_no_decimal));
map_fail(value, |value| {
@ -666,25 +682,25 @@ fn field_float_value(i: &str) -> IResult<&str, f64> {
}
fn field_float_value_with_decimal(i: &str) -> IResult<&str, &str> {
recognize(separated_pair(integral_value_common, tag("."), digit1))(i)
recognize(separated_pair(integral_value_signed, tag("."), digit1))(i)
}
fn field_float_value_no_decimal(i: &str) -> IResult<&str, &str> {
integral_value_common(i)
integral_value_signed(i)
}
fn integral_value_common(i: &str) -> IResult<&str, &str> {
fn integral_value_signed(i: &str) -> IResult<&str, &str> {
recognize(preceded(opt(tag("-")), digit1))(i)
}
fn timestamp(i: &str) -> IResult<&str, i64> {
map_fail(integral_value_common, |value| {
map_fail(integral_value_signed, |value| {
value.parse().context(TimestampValueInvalid { value })
})(i)
}
fn field_string_value(i: &str) -> IResult<&str, EscapedStr<'_>> {
// https://docs.influxdata.com/influxdb/v1.8/write_protocols/line_protocol_tutorial/#data-types
// https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/#data-types-and-format
// For string field values, backslash is only used to escape itself(\) or double
// quotes.
let string_data = alt((
@ -707,7 +723,7 @@ fn field_string_value(i: &str) -> IResult<&str, EscapedStr<'_>> {
}
fn field_bool_value(i: &str) -> IResult<&str, bool> {
// https://docs.influxdata.com/influxdb/v1.8/write_protocols/line_protocol_tutorial/#data-types
// https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/#data-types-and-format
// "specify TRUE with t, T, true, True, or TRUE. Specify FALSE with f, F, false,
// False, or FALSE
alt((
@ -1913,7 +1929,8 @@ her"#,
#[test]
fn field_value_display() -> Result {
assert_eq!(FieldValue::I64(42).to_string(), "42i");
assert_eq!(FieldValue::I64(-42).to_string(), "-42i");
assert_eq!(FieldValue::U64(42).to_string(), "42u");
assert_eq!(FieldValue::F64(42.11).to_string(), "42.11");
assert_eq!(
FieldValue::String(EscapedStr::from("foo")).to_string(),

View File

@ -310,6 +310,7 @@ impl<'a> MeasurementSampler<'a> {
let field_type = match field_value {
FieldValue::F64(_) => InfluxFieldType::Float,
FieldValue::I64(_) => InfluxFieldType::Integer,
FieldValue::U64(_) => InfluxFieldType::UInteger,
FieldValue::String(_) => InfluxFieldType::String,
FieldValue::Boolean(_) => InfluxFieldType::Boolean,
};
@ -474,6 +475,9 @@ fn pack_lines<'a>(schema: &Schema, lines: &[ParsedLine<'a>]) -> Vec<Packers> {
FieldValue::I64(i) => {
packer.i64_packer_mut().push(i);
}
FieldValue::U64(i) => {
packer.u64_packer_mut().push(i);
}
FieldValue::String(ref s) => {
packer.bytes_packer_mut().push(ByteArray::from(s.as_str()));
}

View File

@ -34,6 +34,7 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
pub enum Column {
F64(Vec<Option<f64>>, StatValues<f64>),
I64(Vec<Option<i64>>, StatValues<i64>),
U64(Vec<Option<u64>>, StatValues<u64>),
String(Vec<Option<String>>, StatValues<String>),
Bool(Vec<Option<bool>>, StatValues<bool>),
Tag(Vec<Option<u32>>, StatValues<String>),
@ -66,6 +67,15 @@ impl Column {
vals.push(Some(val));
Self::I64(vals, StatValues::new(val))
}
U64Value => {
let val = value
.value_as_u64value()
.expect("u64 value should be present")
.value();
let mut vals = vec![None; capacity];
vals.push(Some(val));
Self::U64(vals, StatValues::new(val))
}
StringValue => {
let val = value
.value_as_string_value()
@ -109,6 +119,7 @@ impl Column {
match self {
Self::F64(v, _) => v.len(),
Self::I64(v, _) => v.len(),
Self::U64(v, _) => v.len(),
Self::String(v, _) => v.len(),
Self::Bool(v, _) => v.len(),
Self::Tag(v, _) => v.len(),
@ -123,6 +134,7 @@ impl Column {
match self {
Self::F64(_, _) => "f64",
Self::I64(_, _) => "i64",
Self::U64(_, _) => "u64",
Self::String(_, _) => "String",
Self::Bool(_, _) => "bool",
Self::Tag(_, _) => "tag",
@ -134,6 +146,7 @@ impl Column {
match self {
Self::F64(..) => ArrowDataType::Float64,
Self::I64(..) => ArrowDataType::Int64,
Self::U64(..) => ArrowDataType::UInt64,
Self::String(..) => ArrowDataType::Utf8,
Self::Bool(..) => ArrowDataType::Boolean,
Self::Tag(..) => ArrowDataType::Utf8,
@ -179,6 +192,15 @@ impl Column {
}
None => false,
},
Self::U64(vals, stats) => match value.value_as_u64value() {
Some(u64_val) => {
let u64_val = u64_val.value();
vals.push(Some(u64_val));
stats.update(u64_val);
true
}
None => false,
},
Self::F64(vals, stats) => match value.value_as_f64value() {
Some(f64_val) => {
let f64_val = f64_val.value();
@ -216,6 +238,11 @@ impl Column {
v.push(None);
}
}
Self::U64(v, _) => {
if v.len() == len {
v.push(None);
}
}
Self::String(v, _) => {
if v.len() == len {
v.push(None);
@ -290,6 +317,9 @@ impl Column {
Self::I64(v, stats) => {
mem::size_of::<Option<i64>>() * v.len() + mem::size_of_val(&stats)
}
Self::U64(v, stats) => {
mem::size_of::<Option<u64>>() * v.len() + mem::size_of_val(&stats)
}
Self::Bool(v, stats) => {
mem::size_of::<Option<bool>>() * v.len() + mem::size_of_val(&stats)
}

View File

@ -23,7 +23,7 @@ use snafu::{OptionExt, ResultExt, Snafu};
use arrow_deps::{
arrow,
arrow::{
array::{ArrayRef, BooleanBuilder, Float64Builder, Int64Builder, StringBuilder},
array::{ArrayRef, BooleanBuilder, Float64Builder, Int64Builder, UInt64Builder, StringBuilder},
datatypes::DataType as ArrowDataType,
record_batch::RecordBatch,
},
@ -326,6 +326,7 @@ impl Table {
schema_builder.field(column_name, ArrowDataType::Int64)
}
}
Column::U64(_, _) => schema_builder.field(column_name, ArrowDataType::UInt64),
Column::Bool(_, _) => schema_builder.field(column_name, ArrowDataType::Boolean),
};
}
@ -399,6 +400,15 @@ impl Table {
Arc::new(builder.finish()) as ArrayRef
}
Column::U64(vals, _) => {
let mut builder = UInt64Builder::new(vals.len());
for v in vals {
builder.append_option(*v).context(ArrowError {})?;
}
Arc::new(builder.finish()) as ArrayRef
}
Column::Bool(vals, _) => {
let mut builder = BooleanBuilder::new(vals.len());
@ -504,6 +514,7 @@ impl Table {
match column {
Column::F64(v, _) => self.column_value_matches_predicate(v, chunk_predicate),
Column::I64(v, _) => self.column_value_matches_predicate(v, chunk_predicate),
Column::U64(v, _) => self.column_value_matches_predicate(v, chunk_predicate),
Column::String(v, _) => self.column_value_matches_predicate(v, chunk_predicate),
Column::Bool(v, _) => self.column_value_matches_predicate(v, chunk_predicate),
Column::Tag(v, _) => self.column_value_matches_predicate(v, chunk_predicate),
@ -545,6 +556,7 @@ impl Table {
let stats = match c {
Column::F64(_, stats) => Statistics::F64(stats.clone()),
Column::I64(_, stats) => Statistics::I64(stats.clone()),
Column::U64(_, stats) => Statistics::U64(stats.clone()),
Column::Bool(_, stats) => Statistics::Bool(stats.clone()),
Column::String(_, stats) | Column::Tag(_, stats) => {
Statistics::String(stats.clone())

View File

@ -20,6 +20,7 @@ use std::default::Default;
pub enum Packers {
Float(Packer<f64>),
Integer(Packer<i64>),
UInteger(Packer<u64>),
Bytes(Packer<ByteArray>),
String(Packer<String>),
Boolean(Packer<bool>),
@ -52,6 +53,7 @@ impl<'a> Packers {
match self {
Self::Float(p) => PackerChunker::Float(p.values.chunks(chunk_size)),
Self::Integer(p) => PackerChunker::Integer(p.values.chunks(chunk_size)),
Self::UInteger(p) => PackerChunker::UInteger(p.values.chunks(chunk_size)),
Self::Bytes(p) => PackerChunker::Bytes(p.values.chunks(chunk_size)),
Self::String(p) => PackerChunker::String(p.values.chunks(chunk_size)),
Self::Boolean(p) => PackerChunker::Boolean(p.values.chunks(chunk_size)),
@ -69,6 +71,7 @@ impl<'a> Packers {
match self {
Self::Float(p) => p.reserve_exact(additional),
Self::Integer(p) => p.reserve_exact(additional),
Self::UInteger(p) => p.reserve_exact(additional),
Self::Bytes(p) => p.reserve_exact(additional),
Self::String(p) => p.reserve_exact(additional),
Self::Boolean(p) => p.reserve_exact(additional),
@ -79,6 +82,7 @@ impl<'a> Packers {
match self {
Self::Float(p) => p.push_option(None),
Self::Integer(p) => p.push_option(None),
Self::UInteger(p) => p.push_option(None),
Self::Bytes(p) => p.push_option(None),
Self::String(p) => p.push_option(None),
Self::Boolean(p) => p.push_option(None),
@ -90,6 +94,7 @@ impl<'a> Packers {
match self {
Self::Float(p) => p.swap(a, b),
Self::Integer(p) => p.swap(a, b),
Self::UInteger(p) => p.swap(a, b),
Self::Bytes(p) => p.swap(a, b),
Self::String(p) => p.swap(a, b),
Self::Boolean(p) => p.swap(a, b),
@ -101,6 +106,7 @@ impl<'a> Packers {
match self {
Self::Float(p) => p.num_rows(),
Self::Integer(p) => p.num_rows(),
Self::UInteger(p) => p.num_rows(),
Self::Bytes(p) => p.num_rows(),
Self::String(p) => p.num_rows(),
Self::Boolean(p) => p.num_rows(),
@ -114,6 +120,7 @@ impl<'a> Packers {
match self {
Self::Float(p) => p.is_null(row),
Self::Integer(p) => p.is_null(row),
Self::UInteger(p) => p.is_null(row),
Self::Bytes(p) => p.is_null(row),
Self::String(p) => p.is_null(row),
Self::Boolean(p) => p.is_null(row),
@ -124,6 +131,7 @@ impl<'a> Packers {
typed_packer_accessors! {
(f64_packer, f64_packer_mut, f64, Float),
(i64_packer, i64_packer_mut, i64, Integer),
(u64_packer, u64_packer_mut, u64, UInteger),
(bytes_packer, bytes_packer_mut, ByteArray, Bytes),
(str_packer, str_packer_mut, String, String),
(bool_packer, bool_packer_mut, bool, Boolean),
@ -245,6 +253,7 @@ impl std::convert::From<Vec<Option<Vec<u8>>>> for Packers {
pub enum PackerChunker<'a> {
Float(Chunks<'a, Option<f64>>),
Integer(Chunks<'a, Option<i64>>),
UInteger(Chunks<'a, Option<u64>>),
Bytes(Chunks<'a, Option<ByteArray>>),
String(Chunks<'a, Option<String>>),
Boolean(Chunks<'a, Option<bool>>),
@ -523,6 +532,7 @@ mod test {
let mut packers: Vec<Packers> = Vec::new();
packers.push(Packers::Float(Packer::new()));
packers.push(Packers::Integer(Packer::new()));
packers.push(Packers::UInteger(Packer::new()));
packers.push(Packers::Boolean(Packer::new()));
packers.get_mut(0).unwrap().f64_packer_mut().push(22.033);