feat: column last and filter by time

pull/24376/head
Edd Robinson 2020-08-05 20:15:59 +01:00
parent 527083f7a0
commit a5384d1771
3 changed files with 69 additions and 8 deletions

View File

@ -38,7 +38,14 @@ fn main() {
// time_column_min_time(&store);
// time_column_max_time(&store);
time_column_first(&store);
// time_column_first(&store);
let segments = store.segments();
let res = segments.last("host").unwrap();
println!("{:?}", res);
let segments = segments.filter_by_time(1590036110000000, 1590044410000000);
let res = segments.first("env").unwrap();
println!("{:?}", res);
}
fn build_store(
@ -52,7 +59,7 @@ fn build_store(
Ok(())
}
fn convert_record_batch(rb: RecordBatch) -> Result<Segment, Error> {
fn convert_record_batch<'a>(rb: RecordBatch) -> Result<Segment, Error> {
let mut segment = Segment::default();
// println!("cols {:?} rows {:?}", rb.num_columns(), rb.num_rows());

View File

@ -27,6 +27,6 @@ impl Store {
}
pub fn segments(&self) -> Segments {
Segments::new(&self.segments)
Segments::new(self.segments.iter().collect::<Vec<&Segment>>())
}
}

View File

@ -88,14 +88,24 @@ impl Segment {
}
pub struct Segments<'a> {
segments: &'a [Segment],
segments: Vec<&'a Segment>,
}
impl<'a> Segments<'a> {
pub fn new(segments: &'a [Segment]) -> Self {
pub fn new(segments: Vec<&'a Segment>) -> Self {
Self { segments }
}
pub fn filter_by_time(&self, min: i64, max: i64) -> Segments<'a> {
let mut segments: Vec<&Segment> = vec![];
for segment in &self.segments {
if segment.meta.overlaps_time_range(min, max) {
segments.push(segment);
}
}
Self::new(segments)
}
/// Returns the minimum value for a column in a set of segments.
pub fn column_min(&self, column_name: &str) -> Option<column::Scalar> {
if self.segments.is_empty() {
@ -103,7 +113,7 @@ impl<'a> Segments<'a> {
}
let mut min_min: Option<column::Scalar> = None;
for segment in self.segments {
for segment in &self.segments {
if let Some(i) = segment.column_names().iter().position(|c| c == column_name) {
let min = Some(segment.columns[i].min());
if min_min.is_none() {
@ -124,7 +134,7 @@ impl<'a> Segments<'a> {
}
let mut max_max: Option<column::Scalar> = None;
for segment in self.segments {
for segment in &self.segments {
if let Some(i) = segment.column_names().iter().position(|c| c == column_name) {
let max = Some(segment.columns[i].max());
if max_max.is_none() {
@ -147,7 +157,7 @@ impl<'a> Segments<'a> {
}
let mut first_first: Option<(i64, Option<column::Scalar>)> = None;
for segment in self.segments {
for segment in &self.segments {
// first find the logical row id of the minimum timestamp value
if let Column::Integer(ts_col) = &segment.columns[segment.time_column_idx] {
// TODO(edd): clean up unwr
@ -170,6 +180,39 @@ impl<'a> Segments<'a> {
first_first
}
/// Returns the last value for a column in a set of segments.
///
/// TODO(edd): could return NULL value..
pub fn last(&self, column_name: &str) -> Option<(i64, Option<column::Scalar>)> {
if self.segments.is_empty() {
return None;
}
let mut last_last: Option<(i64, Option<column::Scalar>)> = None;
for segment in &self.segments {
// first find the logical row id of the minimum timestamp value
if let Column::Integer(ts_col) = &segment.columns[segment.time_column_idx] {
// TODO(edd): clean up unwr
let max_ts = ts_col.column_range().1;
let max_ts_id = ts_col.row_id_for_value(max_ts).unwrap();
// now we have row id we can get value for that row id
let value = segment.column(column_name).unwrap().value(max_ts_id);
match &last_last {
Some(prev) => {
if prev.0 < max_ts {
last_last = Some((max_ts, value));
}
}
None => last_last = Some((max_ts, value)),
}
}
}
last_last
}
}
/// Meta data for a segment. This data is mainly used to determine if a segment
@ -184,5 +227,16 @@ pub struct SegmentMetaData {
// TODO column sort order
}
impl SegmentMetaData {
pub fn overlaps_time_range(&self, from: i64, to: i64) -> bool {
let result = self.time_range.0 <= to && from <= self.time_range.1;
println!(
"segment with ({:?}) overlaps ({:?}, {:?}) -- {:?}",
self.time_range, from, to, result
);
result
}
}
#[cfg(test)]
mod test {}