feat: Support storage of points with String and Boolean fields

pull/24376/head
Carol (Nichols || Goulding) 2020-08-14 10:50:16 -04:00
parent c6dadb526f
commit 5f49543fdf
7 changed files with 224 additions and 8 deletions

View File

@ -55,6 +55,8 @@ impl<T> Point<T> {
pub enum PointType {
I64(Point<i64>),
F64(Point<f64>),
String(Point<String>),
Bool(Point<bool>),
}
impl PointType {
@ -76,10 +78,30 @@ impl PointType {
})
}
pub fn new_string(series: String, value: impl Into<String>, time: i64) -> Self {
Self::String(Point {
series,
series_id: None,
value: value.into(),
time,
})
}
pub fn new_bool(series: String, value: bool, time: i64) -> Self {
Self::Bool(Point {
series,
series_id: None,
value,
time,
})
}
pub fn series(&self) -> &String {
match self {
Self::I64(p) => &p.series,
Self::F64(p) => &p.series,
Self::String(p) => &p.series,
Self::Bool(p) => &p.series,
}
}
@ -87,6 +109,8 @@ impl PointType {
match self {
Self::I64(p) => p.time,
Self::F64(p) => p.time,
Self::String(p) => p.time,
Self::Bool(p) => p.time,
}
}
@ -94,6 +118,8 @@ impl PointType {
match self {
Self::I64(p) => p.time = t,
Self::F64(p) => p.time = t,
Self::String(p) => p.time = t,
Self::Bool(p) => p.time = t,
}
}
@ -101,6 +127,8 @@ impl PointType {
match self {
Self::I64(p) => p.series_id,
Self::F64(p) => p.series_id,
Self::String(p) => p.series_id,
Self::Bool(p) => p.series_id,
}
}
@ -108,6 +136,8 @@ impl PointType {
match self {
Self::I64(p) => p.series_id = Some(id),
Self::F64(p) => p.series_id = Some(id),
Self::String(p) => p.series_id = Some(id),
Self::Bool(p) => p.series_id = Some(id),
}
}
@ -129,6 +159,8 @@ impl PointType {
match self {
Self::I64(p) => p.index_pairs(),
Self::F64(p) => p.index_pairs(),
Self::String(p) => p.index_pairs(),
Self::Bool(p) => p.index_pairs(),
}
}
}
@ -223,8 +255,8 @@ fn line_to_points(
match field_value {
FieldValue::I64(value) => PointType::new_i64(series, value, timestamp),
FieldValue::F64(value) => PointType::new_f64(series, value, timestamp),
FieldValue::String(_) => unimplemented!("String support for points"),
FieldValue::Boolean(_) => unimplemented!("Boolean support for points"),
FieldValue::String(value) => PointType::new_string(series, value, timestamp),
FieldValue::Boolean(value) => PointType::new_bool(series, value, timestamp),
}
}))
}

View File

@ -336,6 +336,24 @@ async fn read(req: hyper::Request<Body>, app: Arc<App>) -> Result<Option<Body>,
wtr.write_record(&vals).unwrap();
}
}
ReadValues::String(values) => {
for val in values {
let t = val.time.to_string();
let v = val.value.to_string();
vals[vcol] = v;
vals[tcol] = t;
wtr.write_record(&vals).unwrap();
}
}
ReadValues::Bool(values) => {
for val in values {
let t = val.time.to_string();
let v = val.value.to_string();
vals[vcol] = v;
vals[tcol] = t;
wtr.write_record(&vals).unwrap();
}
}
}
let mut data = wtr

View File

@ -10,7 +10,8 @@ use delorean::generated_types::{
delorean_server::Delorean,
measurement_fields_response::MessageField,
read_response::{
frame::Data, DataType, FloatPointsFrame, Frame, GroupFrame, IntegerPointsFrame, SeriesFrame,
frame::Data, BooleanPointsFrame, DataType, FloatPointsFrame, Frame, GroupFrame,
IntegerPointsFrame, SeriesFrame, StringPointsFrame,
},
storage_server::Storage,
CapabilitiesResponse, CreateBucketRequest, CreateBucketResponse, DeleteBucketRequest,
@ -627,6 +628,8 @@ impl Storage for GrpcServer {
let field_type = match field_type {
SeriesDataType::F64 => DataType::Float,
SeriesDataType::I64 => DataType::Integer,
SeriesDataType::String => DataType::String,
SeriesDataType::Bool => DataType::Boolean,
} as _;
MessageField {
@ -695,6 +698,8 @@ async fn send_series_filters(
let data_type = match batch.values {
ReadValues::F64(_) => DataType::Float,
ReadValues::I64(_) => DataType::Integer,
ReadValues::String(_) => DataType::String,
ReadValues::Bool(_) => DataType::Boolean,
} as _;
let series_frame_response_header = Ok(ReadResponse {
@ -815,6 +820,29 @@ async fn send_points(
}],
});
tx.send(data_frame_response).await.unwrap();
}
ReadValues::String(values) => {
let (timestamps, values) = values.into_iter().map(|p| (p.time, p.value)).unzip();
let data_frame_response = Ok(ReadResponse {
frames: vec![Frame {
data: Some(Data::StringPoints(StringPointsFrame { timestamps, values })),
}],
});
tx.send(data_frame_response).await.unwrap();
}
ReadValues::Bool(values) => {
let (timestamps, values) = values.into_iter().map(|p| (p.time, p.value)).unzip();
let data_frame_response = Ok(ReadResponse {
frames: vec![Frame {
data: Some(Data::BooleanPoints(BooleanPointsFrame {
timestamps,
values,
})),
}],
});
tx.send(data_frame_response).await.unwrap();
}
}

View File

@ -14,12 +14,12 @@ pub struct ReadPoint<T: Clone> {
pub value: T,
}
impl<T: Copy + Clone> From<&'_ crate::line_parser::Point<T>> for ReadPoint<T> {
impl<T: Clone> From<&'_ crate::line_parser::Point<T>> for ReadPoint<T> {
fn from(other: &'_ crate::line_parser::Point<T>) -> Self {
let crate::line_parser::Point { time, value, .. } = other;
Self {
time: *time,
value: *value,
value: value.clone(),
}
}
}
@ -32,9 +32,9 @@ impl<T: Copy + Clone> From<&'_ crate::line_parser::Point<T>> for ReadPoint<T> {
pub enum SeriesDataType {
I64 = 0,
F64 = 1,
String = 2,
Bool = 3,
// U64,
// String,
// Bool,
}
impl From<SeriesDataType> for u8 {
@ -52,6 +52,8 @@ impl TryFrom<u8> for SeriesDataType {
match other {
v if v == I64 as u8 => Ok(I64),
v if v == F64 as u8 => Ok(F64),
v if v == String as u8 => Ok(String),
v if v == Bool as u8 => Ok(Bool),
_ => Err(other),
}
}

View File

@ -43,6 +43,8 @@ struct SeriesData {
current_size: usize,
i64_series: HashMap<u64, SeriesBuffer<i64>>,
f64_series: HashMap<u64, SeriesBuffer<f64>>,
string_series: HashMap<u64, SeriesBuffer<String>>,
bool_series: HashMap<u64, SeriesBuffer<bool>>,
}
#[derive(Debug, Clone)]
@ -73,6 +75,8 @@ impl StoreInSeriesData for PointType {
match self {
Self::I64(inner) => inner.write(series_data),
Self::F64(inner) => inner.write(series_data),
Self::String(inner) => inner.write(series_data),
Self::Bool(inner) => inner.write(series_data),
}
}
}
@ -111,6 +115,44 @@ impl StoreInSeriesData for Point<f64> {
}
}
impl StoreInSeriesData for Point<String> {
fn write(&self, series_data: &mut SeriesData) {
let point: ReadPoint<_> = self.into();
series_data.current_size += std::mem::size_of::<Self>();
match series_data.string_series.get_mut(&self.series_id.unwrap()) {
Some(buff) => buff.values.push(point),
None => {
let buff = SeriesBuffer {
values: vec![point],
};
series_data
.string_series
.insert(self.series_id.unwrap(), buff);
}
}
}
}
impl StoreInSeriesData for Point<bool> {
fn write(&self, series_data: &mut SeriesData) {
let point: ReadPoint<_> = self.into();
series_data.current_size += std::mem::size_of::<Self>();
match series_data.bool_series.get_mut(&self.series_id.unwrap()) {
Some(buff) => buff.values.push(point),
None => {
let buff = SeriesBuffer {
values: vec![point],
};
series_data
.bool_series
.insert(self.series_id.unwrap(), buff);
}
}
}
}
#[derive(Debug, Default, Clone)]
struct SeriesMap {
current_size: usize,
@ -150,6 +192,8 @@ impl SeriesMap {
let series_type = match point {
PointType::I64(_) => SeriesDataType::I64,
PointType::F64(_) => SeriesDataType::F64,
PointType::String(_) => SeriesDataType::String,
PointType::Bool(_) => SeriesDataType::Bool,
};
self.series_id_to_key_and_type
.insert(self.last_id, (point.series().clone(), series_type));
@ -267,6 +311,14 @@ impl MemDB {
let buff = self.series_data.f64_series.get(&id).unwrap();
ReadValues::F64(buff.read(range))
}
SeriesDataType::String => {
let buff = self.series_data.string_series.get(&id).unwrap();
ReadValues::String(buff.read(range))
}
SeriesDataType::Bool => {
let buff = self.series_data.bool_series.get(&id).unwrap();
ReadValues::Bool(buff.read(range))
}
};
// TODO: Encode in the type system that `ReadBatch`es will never be created with an
@ -393,6 +445,20 @@ impl MemDB {
.map(|point| point.time)
.unwrap_or(std::i64::MIN)
}
SeriesDataType::String => {
let buff = self.series_data.string_series.get(&series_id).unwrap();
buff.read(&range)
.last()
.map(|point| point.time)
.unwrap_or(std::i64::MIN)
}
SeriesDataType::Bool => {
let buff = self.series_data.bool_series.get(&series_id).unwrap();
buff.read(&range)
.last()
.map(|point| point.time)
.unwrap_or(std::i64::MIN)
}
};
match fields.entry(field_pair.value) {

View File

@ -428,6 +428,41 @@ fn points_to_flatbuffer(points: &[PointType]) -> flatbuffers::FlatBufferBuilder<
},
)
}
PointType::String(inner_point) => {
let string_value = builder.create_string(&inner_point.value);
let value = wal::StringValue::create(
&mut builder,
&wal::StringValueArgs {
value: Some(string_value),
},
);
wal::Point::create(
&mut builder,
&wal::PointArgs {
key: Some(key),
time: p.time(),
value_type: wal::PointValue::StringValue,
value: Some(value.as_union_value()),
},
)
}
PointType::Bool(inner_point) => {
let value = wal::BoolValue::create(
&mut builder,
&wal::BoolValueArgs {
value: inner_point.value,
},
);
wal::Point::create(
&mut builder,
&wal::PointArgs {
key: Some(key),
time: p.time(),
value_type: wal::PointValue::BoolValue,
value: Some(value.as_union_value()),
},
)
}
}
})
.collect();
@ -483,6 +518,22 @@ impl From<wal::Point<'_>> for PointType {
.value();
Self::new_f64(key, value, time)
}
wal::PointValue::StringValue => {
let value = other
.value_as_string_value()
.expect("Value should match value type")
.value()
.expect("Value should have a string value")
.to_string();
Self::new_string(key, value, time)
}
wal::PointValue::BoolValue => {
let value = other
.value_as_bool_value()
.expect("Value should match value type")
.value();
Self::new_bool(key, value, time)
}
_ => unimplemented!(),
}
}
@ -728,6 +779,8 @@ impl Stream for ReadMergeStream<'_> {
pub enum ReadValues {
I64(Vec<ReadPoint<i64>>),
F64(Vec<ReadPoint<f64>>),
String(Vec<ReadPoint<String>>),
Bool(Vec<ReadPoint<bool>>),
}
impl ReadValues {
@ -735,6 +788,8 @@ impl ReadValues {
match self {
Self::I64(vals) => vals.is_empty(),
Self::F64(vals) => vals.is_empty(),
Self::String(vals) => vals.is_empty(),
Self::Bool(vals) => vals.is_empty(),
}
}
}
@ -755,6 +810,8 @@ impl ReadBatch {
match &self.values {
ReadValues::I64(vals) => (vals.first().unwrap().time, vals.last().unwrap().time),
ReadValues::F64(vals) => (vals.first().unwrap().time, vals.last().unwrap().time),
ReadValues::String(vals) => (vals.first().unwrap().time, vals.last().unwrap().time),
ReadValues::Bool(vals) => (vals.first().unwrap().time, vals.last().unwrap().time),
}
}
@ -762,6 +819,8 @@ impl ReadBatch {
match &mut self.values {
ReadValues::I64(vals) => vals.sort_by_key(|v| v.time),
ReadValues::F64(vals) => vals.sort_by_key(|v| v.time),
ReadValues::String(vals) => vals.sort_by_key(|v| v.time),
ReadValues::Bool(vals) => vals.sort_by_key(|v| v.time),
}
}

View File

@ -178,6 +178,14 @@ async fn read_and_write_data() -> Result<()> {
.field("out", 4)
.timestamp(ns_since_epoch + 6)
.build()?,
influxdb2_client::DataPoint::builder("status")
.field("active", true)
.timestamp(ns_since_epoch + 7)
.build()?,
influxdb2_client::DataPoint::builder("attributes")
.field("color", "blue")
.timestamp(ns_since_epoch + 8)
.build()?,
];
write_data(&client2, org_id_str, bucket_id_str, points).await?;
@ -519,7 +527,10 @@ swap,server01,disk0,out,{},4
let values = &responses[0].values;
let values: Vec<_> = values.iter().map(|s| str::from_utf8(s).unwrap()).collect();
assert_eq!(values, vec!["cpu_load_short", "swap", "system"]);
assert_eq!(
values,
vec!["attributes", "cpu_load_short", "status", "swap", "system"]
);
let measurement_tag_keys_request = tonic::Request::new(MeasurementTagKeysRequest {
source: read_source.clone(),