feat: add filter by tag

pull/24376/head
Edd Robinson 2020-08-05 21:17:49 +01:00
parent a5384d1771
commit 69bc0424bf
3 changed files with 75 additions and 4 deletions

View File

@ -43,7 +43,9 @@ fn main() {
let res = segments.last("host").unwrap();
println!("{:?}", res);
let segments = segments.filter_by_time(1590036110000000, 1590044410000000);
let segments = segments
.filter_by_time(1590036110000000, 1590044410000000)
.filter_by_predicate_eq("env", &column::Scalar::String("toolsus1"));
let res = segments.first("env").unwrap();
println!("{:?}", res);
}
@ -66,6 +68,9 @@ fn convert_record_batch<'a>(rb: RecordBatch) -> Result<Segment, Error> {
for (i, column) in rb.columns().iter().enumerate() {
match *column.data_type() {
datatypes::DataType::Float64 => {
if column.null_count() > 0 {
panic!("null floats");
}
let arr = column
.as_any()
.downcast_ref::<array::Float64Array>()
@ -75,18 +80,24 @@ fn convert_record_batch<'a>(rb: RecordBatch) -> Result<Segment, Error> {
segment.add_column(rb.schema().field(i).name(), column);
}
datatypes::DataType::Int64 => {
if column.null_count() > 0 {
panic!("null times");
}
let arr = column.as_any().downcast_ref::<array::Int64Array>().unwrap();
let column = Column::from(arr.value_slice(0, rb.num_rows()));
segment.add_column(rb.schema().field(i).name(), column);
}
datatypes::DataType::Utf8 => {
if column.null_count() > 0 {
panic!("null tag");
}
let arr = column
.as_any()
.downcast_ref::<array::StringArray>()
.unwrap();
let mut column = column::String::default();
let mut c = column::String::default();
let mut prev = arr.value(0);
let mut count = 1_u64;
for j in 1..arr.len() {
@ -94,13 +105,13 @@ fn convert_record_batch<'a>(rb: RecordBatch) -> Result<Segment, Error> {
if prev == next {
count += 1;
} else {
column.add_additional(prev, count);
c.add_additional(prev, count);
prev = next;
count = 1;
}
}
segment.add_column(rb.schema().field(i).name(), Column::String(column));
segment.add_column(rb.schema().field(i).name(), Column::String(c));
}
datatypes::DataType::Boolean => {
panic!("unsupported");

View File

@ -61,6 +61,32 @@ impl Column {
}
}
pub fn maybe_contains(&self, value: &Scalar) -> bool {
match self {
Column::String(c) => {
if let Scalar::String(v) = value {
c.meta.maybe_contains_value(v.to_string())
} else {
panic!("invalid value");
}
}
Column::Float(c) => {
if let Scalar::Float(v) = value {
c.meta.maybe_contains_value(v.to_owned())
} else {
panic!("invalid value");
}
}
Column::Integer(c) => {
if let Scalar::Integer(v) = value {
c.meta.maybe_contains_value(v.to_owned())
} else {
panic!("invalid value");
}
}
}
}
pub fn min(&self) -> Scalar {
match self {
Column::String(c) => Scalar::String(c.meta.range().0),
@ -237,6 +263,15 @@ pub mod metadata {
self.num_rows
}
pub fn maybe_contains_value(&self, v: String) -> bool {
let res = self.range.0 <= v && v <= self.range.1;
println!(
"column with ({:?}) maybe contain {:?} -- {:?}",
self.range, v, res
);
res
}
pub fn range(&self) -> (&str, &str) {
(&self.range.0, &self.range.1)
}
@ -260,6 +295,15 @@ pub mod metadata {
}
}
pub fn maybe_contains_value(&self, v: f64) -> bool {
let res = self.range.0 <= v && v <= self.range.1;
println!(
"column with ({:?}) maybe contain {:?} -- {:?}",
self.range, v, res
);
res
}
pub fn num_rows(&self) -> usize {
self.num_rows
}

View File

@ -106,6 +106,22 @@ impl<'a> Segments<'a> {
Self::new(segments)
}
pub fn filter_by_predicate_eq(
&self,
column_name: &str,
value: &column::Scalar,
) -> Segments<'a> {
let mut segments: Vec<&Segment> = vec![];
for segment in &self.segments {
if let Some(col) = segment.column(column_name) {
if col.maybe_contains(&value) {
segments.push(segment);
}
}
}
Self::new(segments)
}
/// Returns the minimum value for a column in a set of segments.
pub fn column_min(&self, column_name: &str) -> Option<column::Scalar> {
if self.segments.is_empty() {