diff --git a/src/line_parser.rs b/src/line_parser.rs index 2781699f3f..9c56ddfc18 100644 --- a/src/line_parser.rs +++ b/src/line_parser.rs @@ -55,6 +55,8 @@ impl Point { pub enum PointType { I64(Point), F64(Point), + String(Point), + Bool(Point), } impl PointType { @@ -76,10 +78,30 @@ impl PointType { }) } + pub fn new_string(series: String, value: impl Into, time: i64) -> Self { + Self::String(Point { + series, + series_id: None, + value: value.into(), + time, + }) + } + + pub fn new_bool(series: String, value: bool, time: i64) -> Self { + Self::Bool(Point { + series, + series_id: None, + value, + time, + }) + } + pub fn series(&self) -> &String { match self { Self::I64(p) => &p.series, Self::F64(p) => &p.series, + Self::String(p) => &p.series, + Self::Bool(p) => &p.series, } } @@ -87,6 +109,8 @@ impl PointType { match self { Self::I64(p) => p.time, Self::F64(p) => p.time, + Self::String(p) => p.time, + Self::Bool(p) => p.time, } } @@ -94,6 +118,8 @@ impl PointType { match self { Self::I64(p) => p.time = t, Self::F64(p) => p.time = t, + Self::String(p) => p.time = t, + Self::Bool(p) => p.time = t, } } @@ -101,6 +127,8 @@ impl PointType { match self { Self::I64(p) => p.series_id, Self::F64(p) => p.series_id, + Self::String(p) => p.series_id, + Self::Bool(p) => p.series_id, } } @@ -108,6 +136,8 @@ impl PointType { match self { Self::I64(p) => p.series_id = Some(id), Self::F64(p) => p.series_id = Some(id), + Self::String(p) => p.series_id = Some(id), + Self::Bool(p) => p.series_id = Some(id), } } @@ -129,6 +159,8 @@ impl PointType { match self { Self::I64(p) => p.index_pairs(), Self::F64(p) => p.index_pairs(), + Self::String(p) => p.index_pairs(), + Self::Bool(p) => p.index_pairs(), } } } @@ -223,8 +255,8 @@ fn line_to_points( match field_value { FieldValue::I64(value) => PointType::new_i64(series, value, timestamp), FieldValue::F64(value) => PointType::new_f64(series, value, timestamp), - FieldValue::String(_) => unimplemented!("String support for points"), - FieldValue::Boolean(_) => unimplemented!("Boolean support for points"), + FieldValue::String(value) => PointType::new_string(series, value, timestamp), + FieldValue::Boolean(value) => PointType::new_bool(series, value, timestamp), } })) } diff --git a/src/server/http_routes.rs b/src/server/http_routes.rs index a271dbb33f..39b9dc6689 100644 --- a/src/server/http_routes.rs +++ b/src/server/http_routes.rs @@ -336,6 +336,24 @@ async fn read(req: hyper::Request, app: Arc) -> Result, wtr.write_record(&vals).unwrap(); } } + ReadValues::String(values) => { + for val in values { + let t = val.time.to_string(); + let v = val.value.to_string(); + vals[vcol] = v; + vals[tcol] = t; + wtr.write_record(&vals).unwrap(); + } + } + ReadValues::Bool(values) => { + for val in values { + let t = val.time.to_string(); + let v = val.value.to_string(); + vals[vcol] = v; + vals[tcol] = t; + wtr.write_record(&vals).unwrap(); + } + } } let mut data = wtr diff --git a/src/server/rpc.rs b/src/server/rpc.rs index f4f3b9a225..b013af7586 100644 --- a/src/server/rpc.rs +++ b/src/server/rpc.rs @@ -10,7 +10,8 @@ use delorean::generated_types::{ delorean_server::Delorean, measurement_fields_response::MessageField, read_response::{ - frame::Data, DataType, FloatPointsFrame, Frame, GroupFrame, IntegerPointsFrame, SeriesFrame, + frame::Data, BooleanPointsFrame, DataType, FloatPointsFrame, Frame, GroupFrame, + IntegerPointsFrame, SeriesFrame, StringPointsFrame, }, storage_server::Storage, CapabilitiesResponse, CreateBucketRequest, CreateBucketResponse, DeleteBucketRequest, @@ -627,6 +628,8 @@ impl Storage for GrpcServer { let field_type = match field_type { SeriesDataType::F64 => DataType::Float, SeriesDataType::I64 => DataType::Integer, + SeriesDataType::String => DataType::String, + SeriesDataType::Bool => DataType::Boolean, } as _; MessageField { @@ -695,6 +698,8 @@ async fn send_series_filters( let data_type = match batch.values { ReadValues::F64(_) => DataType::Float, ReadValues::I64(_) => DataType::Integer, + ReadValues::String(_) => DataType::String, + ReadValues::Bool(_) => DataType::Boolean, } as _; let series_frame_response_header = Ok(ReadResponse { @@ -815,6 +820,29 @@ async fn send_points( }], }); + tx.send(data_frame_response).await.unwrap(); + } + ReadValues::String(values) => { + let (timestamps, values) = values.into_iter().map(|p| (p.time, p.value)).unzip(); + let data_frame_response = Ok(ReadResponse { + frames: vec![Frame { + data: Some(Data::StringPoints(StringPointsFrame { timestamps, values })), + }], + }); + + tx.send(data_frame_response).await.unwrap(); + } + ReadValues::Bool(values) => { + let (timestamps, values) = values.into_iter().map(|p| (p.time, p.value)).unzip(); + let data_frame_response = Ok(ReadResponse { + frames: vec![Frame { + data: Some(Data::BooleanPoints(BooleanPointsFrame { + timestamps, + values, + })), + }], + }); + tx.send(data_frame_response).await.unwrap(); } } diff --git a/src/storage.rs b/src/storage.rs index bc20827378..416cef58bc 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -14,12 +14,12 @@ pub struct ReadPoint { pub value: T, } -impl From<&'_ crate::line_parser::Point> for ReadPoint { +impl From<&'_ crate::line_parser::Point> for ReadPoint { fn from(other: &'_ crate::line_parser::Point) -> Self { let crate::line_parser::Point { time, value, .. } = other; Self { time: *time, - value: *value, + value: value.clone(), } } } @@ -32,9 +32,9 @@ impl From<&'_ crate::line_parser::Point> for ReadPoint { pub enum SeriesDataType { I64 = 0, F64 = 1, + String = 2, + Bool = 3, // U64, - // String, - // Bool, } impl From for u8 { @@ -52,6 +52,8 @@ impl TryFrom for SeriesDataType { match other { v if v == I64 as u8 => Ok(I64), v if v == F64 as u8 => Ok(F64), + v if v == String as u8 => Ok(String), + v if v == Bool as u8 => Ok(Bool), _ => Err(other), } } diff --git a/src/storage/memdb.rs b/src/storage/memdb.rs index 28516a4d67..aa2bd9ebdc 100644 --- a/src/storage/memdb.rs +++ b/src/storage/memdb.rs @@ -43,6 +43,8 @@ struct SeriesData { current_size: usize, i64_series: HashMap>, f64_series: HashMap>, + string_series: HashMap>, + bool_series: HashMap>, } #[derive(Debug, Clone)] @@ -73,6 +75,8 @@ impl StoreInSeriesData for PointType { match self { Self::I64(inner) => inner.write(series_data), Self::F64(inner) => inner.write(series_data), + Self::String(inner) => inner.write(series_data), + Self::Bool(inner) => inner.write(series_data), } } } @@ -111,6 +115,44 @@ impl StoreInSeriesData for Point { } } +impl StoreInSeriesData for Point { + fn write(&self, series_data: &mut SeriesData) { + let point: ReadPoint<_> = self.into(); + series_data.current_size += std::mem::size_of::(); + + match series_data.string_series.get_mut(&self.series_id.unwrap()) { + Some(buff) => buff.values.push(point), + None => { + let buff = SeriesBuffer { + values: vec![point], + }; + series_data + .string_series + .insert(self.series_id.unwrap(), buff); + } + } + } +} + +impl StoreInSeriesData for Point { + fn write(&self, series_data: &mut SeriesData) { + let point: ReadPoint<_> = self.into(); + series_data.current_size += std::mem::size_of::(); + + match series_data.bool_series.get_mut(&self.series_id.unwrap()) { + Some(buff) => buff.values.push(point), + None => { + let buff = SeriesBuffer { + values: vec![point], + }; + series_data + .bool_series + .insert(self.series_id.unwrap(), buff); + } + } + } +} + #[derive(Debug, Default, Clone)] struct SeriesMap { current_size: usize, @@ -150,6 +192,8 @@ impl SeriesMap { let series_type = match point { PointType::I64(_) => SeriesDataType::I64, PointType::F64(_) => SeriesDataType::F64, + PointType::String(_) => SeriesDataType::String, + PointType::Bool(_) => SeriesDataType::Bool, }; self.series_id_to_key_and_type .insert(self.last_id, (point.series().clone(), series_type)); @@ -267,6 +311,14 @@ impl MemDB { let buff = self.series_data.f64_series.get(&id).unwrap(); ReadValues::F64(buff.read(range)) } + SeriesDataType::String => { + let buff = self.series_data.string_series.get(&id).unwrap(); + ReadValues::String(buff.read(range)) + } + SeriesDataType::Bool => { + let buff = self.series_data.bool_series.get(&id).unwrap(); + ReadValues::Bool(buff.read(range)) + } }; // TODO: Encode in the type system that `ReadBatch`es will never be created with an @@ -393,6 +445,20 @@ impl MemDB { .map(|point| point.time) .unwrap_or(std::i64::MIN) } + SeriesDataType::String => { + let buff = self.series_data.string_series.get(&series_id).unwrap(); + buff.read(&range) + .last() + .map(|point| point.time) + .unwrap_or(std::i64::MIN) + } + SeriesDataType::Bool => { + let buff = self.series_data.bool_series.get(&series_id).unwrap(); + buff.read(&range) + .last() + .map(|point| point.time) + .unwrap_or(std::i64::MIN) + } }; match fields.entry(field_pair.value) { diff --git a/src/storage/partitioned_store.rs b/src/storage/partitioned_store.rs index 62aeffec89..240f4dc88a 100644 --- a/src/storage/partitioned_store.rs +++ b/src/storage/partitioned_store.rs @@ -428,6 +428,41 @@ fn points_to_flatbuffer(points: &[PointType]) -> flatbuffers::FlatBufferBuilder< }, ) } + PointType::String(inner_point) => { + let string_value = builder.create_string(&inner_point.value); + let value = wal::StringValue::create( + &mut builder, + &wal::StringValueArgs { + value: Some(string_value), + }, + ); + wal::Point::create( + &mut builder, + &wal::PointArgs { + key: Some(key), + time: p.time(), + value_type: wal::PointValue::StringValue, + value: Some(value.as_union_value()), + }, + ) + } + PointType::Bool(inner_point) => { + let value = wal::BoolValue::create( + &mut builder, + &wal::BoolValueArgs { + value: inner_point.value, + }, + ); + wal::Point::create( + &mut builder, + &wal::PointArgs { + key: Some(key), + time: p.time(), + value_type: wal::PointValue::BoolValue, + value: Some(value.as_union_value()), + }, + ) + } } }) .collect(); @@ -483,6 +518,22 @@ impl From> for PointType { .value(); Self::new_f64(key, value, time) } + wal::PointValue::StringValue => { + let value = other + .value_as_string_value() + .expect("Value should match value type") + .value() + .expect("Value should have a string value") + .to_string(); + Self::new_string(key, value, time) + } + wal::PointValue::BoolValue => { + let value = other + .value_as_bool_value() + .expect("Value should match value type") + .value(); + Self::new_bool(key, value, time) + } _ => unimplemented!(), } } @@ -728,6 +779,8 @@ impl Stream for ReadMergeStream<'_> { pub enum ReadValues { I64(Vec>), F64(Vec>), + String(Vec>), + Bool(Vec>), } impl ReadValues { @@ -735,6 +788,8 @@ impl ReadValues { match self { Self::I64(vals) => vals.is_empty(), Self::F64(vals) => vals.is_empty(), + Self::String(vals) => vals.is_empty(), + Self::Bool(vals) => vals.is_empty(), } } } @@ -755,6 +810,8 @@ impl ReadBatch { match &self.values { ReadValues::I64(vals) => (vals.first().unwrap().time, vals.last().unwrap().time), ReadValues::F64(vals) => (vals.first().unwrap().time, vals.last().unwrap().time), + ReadValues::String(vals) => (vals.first().unwrap().time, vals.last().unwrap().time), + ReadValues::Bool(vals) => (vals.first().unwrap().time, vals.last().unwrap().time), } } @@ -762,6 +819,8 @@ impl ReadBatch { match &mut self.values { ReadValues::I64(vals) => vals.sort_by_key(|v| v.time), ReadValues::F64(vals) => vals.sort_by_key(|v| v.time), + ReadValues::String(vals) => vals.sort_by_key(|v| v.time), + ReadValues::Bool(vals) => vals.sort_by_key(|v| v.time), } } diff --git a/tests/end-to-end.rs b/tests/end-to-end.rs index 4afe0fdbb7..74f7cabd02 100644 --- a/tests/end-to-end.rs +++ b/tests/end-to-end.rs @@ -178,6 +178,14 @@ async fn read_and_write_data() -> Result<()> { .field("out", 4) .timestamp(ns_since_epoch + 6) .build()?, + influxdb2_client::DataPoint::builder("status") + .field("active", true) + .timestamp(ns_since_epoch + 7) + .build()?, + influxdb2_client::DataPoint::builder("attributes") + .field("color", "blue") + .timestamp(ns_since_epoch + 8) + .build()?, ]; write_data(&client2, org_id_str, bucket_id_str, points).await?; @@ -519,7 +527,10 @@ swap,server01,disk0,out,{},4 let values = &responses[0].values; let values: Vec<_> = values.iter().map(|s| str::from_utf8(s).unwrap()).collect(); - assert_eq!(values, vec!["cpu_load_short", "swap", "system"]); + assert_eq!( + values, + vec!["attributes", "cpu_load_short", "status", "swap", "system"] + ); let measurement_tag_keys_request = tonic::Request::new(MeasurementTagKeysRequest { source: read_source.clone(),