feat: add tag values schema API
parent
3963cf6cce
commit
d0f3cae9b3
|
@ -87,11 +87,7 @@ checksum = "cff77d8686867eceff3105329d4698d96c2391c176d5d03adc90c7389162b5b8"
|
|||
[[package]]
|
||||
name = "arrow"
|
||||
version = "2.0.0-SNAPSHOT"
|
||||
<<<<<<< HEAD
|
||||
source = "git+https://github.com/apache/arrow.git?rev=171e8bfe5fe13467a1763227e495fae6bc5d011d#171e8bfe5fe13467a1763227e495fae6bc5d011d"
|
||||
=======
|
||||
source = "git+https://github.com/alamb/arrow.git?rev=46f18c2602072e083809e0846b810e0cc3c59fdd#46f18c2602072e083809e0846b810e0cc3c59fdd"
|
||||
>>>>>>> 27b73c4... refactor: add encoding trait
|
||||
dependencies = [
|
||||
"chrono",
|
||||
"csv",
|
||||
|
@ -111,7 +107,7 @@ dependencies = [
|
|||
[[package]]
|
||||
name = "arrow"
|
||||
version = "2.0.0-SNAPSHOT"
|
||||
source = "git+https://github.com/apache/arrow.git?rev=62dfa114d6683172927fab40fa6c4ddabae8fef4#62dfa114d6683172927fab40fa6c4ddabae8fef4"
|
||||
source = "git+https://github.com/alamb/arrow.git?rev=46f18c2602072e083809e0846b810e0cc3c59fdd#46f18c2602072e083809e0846b810e0cc3c59fdd"
|
||||
dependencies = [
|
||||
"chrono",
|
||||
"csv",
|
||||
|
@ -669,27 +665,7 @@ dependencies = [
|
|||
[[package]]
|
||||
name = "datafusion"
|
||||
version = "2.0.0-SNAPSHOT"
|
||||
<<<<<<< HEAD
|
||||
source = "git+https://github.com/apache/arrow.git?rev=171e8bfe5fe13467a1763227e495fae6bc5d011d#171e8bfe5fe13467a1763227e495fae6bc5d011d"
|
||||
=======
|
||||
source = "git+https://github.com/alamb/arrow.git?rev=46f18c2602072e083809e0846b810e0cc3c59fdd#46f18c2602072e083809e0846b810e0cc3c59fdd"
|
||||
dependencies = [
|
||||
"arrow 2.0.0-SNAPSHOT (git+https://github.com/alamb/arrow.git?rev=46f18c2602072e083809e0846b810e0cc3c59fdd)",
|
||||
"clap",
|
||||
"crossbeam",
|
||||
"fnv",
|
||||
"num_cpus",
|
||||
"parquet 2.0.0-SNAPSHOT (git+https://github.com/alamb/arrow.git?rev=46f18c2602072e083809e0846b810e0cc3c59fdd)",
|
||||
"paste",
|
||||
"rustyline",
|
||||
"sqlparser",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "datafusion"
|
||||
version = "2.0.0-SNAPSHOT"
|
||||
source = "git+https://github.com/apache/arrow.git?rev=62dfa114d6683172927fab40fa6c4ddabae8fef4#62dfa114d6683172927fab40fa6c4ddabae8fef4"
|
||||
>>>>>>> 27b73c4... refactor: add encoding trait
|
||||
dependencies = [
|
||||
"arrow 2.0.0-SNAPSHOT (git+https://github.com/apache/arrow.git?rev=171e8bfe5fe13467a1763227e495fae6bc5d011d)",
|
||||
"chrono",
|
||||
|
@ -703,6 +679,22 @@ dependencies = [
|
|||
"sqlparser",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "datafusion"
|
||||
version = "2.0.0-SNAPSHOT"
|
||||
source = "git+https://github.com/alamb/arrow.git?rev=46f18c2602072e083809e0846b810e0cc3c59fdd#46f18c2602072e083809e0846b810e0cc3c59fdd"
|
||||
dependencies = [
|
||||
"arrow 2.0.0-SNAPSHOT (git+https://github.com/alamb/arrow.git?rev=46f18c2602072e083809e0846b810e0cc3c59fdd)",
|
||||
"clap",
|
||||
"crossbeam",
|
||||
"fnv",
|
||||
"num_cpus",
|
||||
"parquet 2.0.0-SNAPSHOT (git+https://github.com/alamb/arrow.git?rev=46f18c2602072e083809e0846b810e0cc3c59fdd)",
|
||||
"paste",
|
||||
"rustyline",
|
||||
"sqlparser",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "delorean"
|
||||
version = "0.1.0"
|
||||
|
@ -713,11 +705,7 @@ dependencies = [
|
|||
"clap",
|
||||
"criterion",
|
||||
"csv",
|
||||
<<<<<<< HEAD
|
||||
"delorean_arrow",
|
||||
=======
|
||||
"datafusion 2.0.0-SNAPSHOT (git+https://github.com/apache/arrow.git?rev=62dfa114d6683172927fab40fa6c4ddabae8fef4)",
|
||||
>>>>>>> 27b73c4... refactor: add encoding trait
|
||||
"delorean_generated_types",
|
||||
"delorean_ingest",
|
||||
"delorean_line_parser",
|
||||
|
@ -763,7 +751,7 @@ name = "delorean_arrow"
|
|||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"arrow 2.0.0-SNAPSHOT (git+https://github.com/apache/arrow.git?rev=171e8bfe5fe13467a1763227e495fae6bc5d011d)",
|
||||
"datafusion",
|
||||
"datafusion 2.0.0-SNAPSHOT (git+https://github.com/apache/arrow.git?rev=171e8bfe5fe13467a1763227e495fae6bc5d011d)",
|
||||
"parquet 2.0.0-SNAPSHOT (git+https://github.com/apache/arrow.git?rev=171e8bfe5fe13467a1763227e495fae6bc5d011d)",
|
||||
]
|
||||
|
||||
|
@ -2111,9 +2099,9 @@ dependencies = [
|
|||
[[package]]
|
||||
name = "parquet"
|
||||
version = "2.0.0-SNAPSHOT"
|
||||
source = "git+https://github.com/apache/arrow.git?rev=62dfa114d6683172927fab40fa6c4ddabae8fef4#62dfa114d6683172927fab40fa6c4ddabae8fef4"
|
||||
source = "git+https://github.com/alamb/arrow.git?rev=46f18c2602072e083809e0846b810e0cc3c59fdd#46f18c2602072e083809e0846b810e0cc3c59fdd"
|
||||
dependencies = [
|
||||
"arrow 2.0.0-SNAPSHOT (git+https://github.com/apache/arrow.git?rev=62dfa114d6683172927fab40fa6c4ddabae8fef4)",
|
||||
"arrow 2.0.0-SNAPSHOT (git+https://github.com/alamb/arrow.git?rev=46f18c2602072e083809e0846b810e0cc3c59fdd)",
|
||||
"brotli",
|
||||
"byteorder",
|
||||
"chrono",
|
||||
|
|
|
@ -72,7 +72,8 @@ fn main() {
|
|||
// time_group_by_multi_agg_count(&store);
|
||||
// time_group_by_multi_agg_sorted_count(&store);
|
||||
// time_window_agg_count(&store);
|
||||
time_tag_keys_with_pred(&store);
|
||||
// time_tag_keys_with_pred(&store);
|
||||
time_tag_values_with_pred(&store);
|
||||
// time_group_by_different_columns(&store);
|
||||
}
|
||||
|
||||
|
@ -621,7 +622,7 @@ fn time_tag_keys_with_pred(store: &Store) {
|
|||
|
||||
let columns = segments.tag_keys(
|
||||
(1588834080000000, 1590044410000000),
|
||||
&[("env", "prod01-eu-central-1"), ("method", "GET")],
|
||||
&[("env", "prod01-eu-central-1")],
|
||||
);
|
||||
|
||||
total_time += now.elapsed();
|
||||
|
@ -637,6 +638,34 @@ fn time_tag_keys_with_pred(store: &Store) {
|
|||
);
|
||||
}
|
||||
|
||||
//
|
||||
// SHOW TAG VALUES ON "host", "method" WHERE time >= x and time < y AND "env" = 'prod01-us-west-1'
|
||||
fn time_tag_values_with_pred(store: &Store) {
|
||||
let repeat = 10;
|
||||
let mut total_time: std::time::Duration = std::time::Duration::new(0, 0);
|
||||
let mut track = 0;
|
||||
let segments = store.segments();
|
||||
for _ in 0..repeat {
|
||||
let now = std::time::Instant::now();
|
||||
|
||||
let tag_values = segments.tag_values(
|
||||
(1588834080000000, 1590044410000000),
|
||||
&[("env", "prod01-us-west-2")],
|
||||
&["host".to_string(), "method".to_string()],
|
||||
);
|
||||
|
||||
total_time += now.elapsed();
|
||||
track += tag_values.len();
|
||||
}
|
||||
println!(
|
||||
"time_tag_values_with_pred ran {:?} in {:?} {:?} / call {:?}",
|
||||
repeat,
|
||||
total_time,
|
||||
total_time / repeat,
|
||||
track
|
||||
);
|
||||
}
|
||||
|
||||
// This is for a performance experiment where I wanted to show the performance
|
||||
// change as more columns are grouped on.
|
||||
//
|
||||
|
|
|
@ -1,7 +1,14 @@
|
|||
use std::collections::BTreeSet;
|
||||
use std::convert::From;
|
||||
|
||||
use super::encoding;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum Set<'a> {
|
||||
String(BTreeSet<&'a std::string::String>),
|
||||
Integer(BTreeSet<i64>),
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, PartialOrd, Clone)]
|
||||
pub enum Value<'a> {
|
||||
Null,
|
||||
|
@ -1261,6 +1268,39 @@ impl Column {
|
|||
_ => unreachable!("not supported at the moment"),
|
||||
}
|
||||
}
|
||||
|
||||
/// This returns the distinct set of values in the column from the set of
|
||||
/// rows provided.
|
||||
///
|
||||
/// NULL values are not included in the returned set even if present in the
|
||||
/// column at provided rows.
|
||||
///
|
||||
/// row_ids *must* be in ascending order.
|
||||
pub fn distinct_values(&self, row_ids: &[usize]) -> Set<'_> {
|
||||
match self {
|
||||
Column::String(c) => Set::String(c.data.distinct_values(row_ids)),
|
||||
_ => unreachable!("not supported at the moment"),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns true if the column contains any values other than those in
|
||||
/// `values`.
|
||||
pub fn contains_other_values(&self, values: &BTreeSet<&std::string::String>) -> bool {
|
||||
match self {
|
||||
Column::String(c) => {
|
||||
// TODO(edd):
|
||||
// had problems with ref inside of enum Set variant.
|
||||
|
||||
// if let Set::String(v) = values {
|
||||
c.data.contains_other_values(values)
|
||||
// } else {
|
||||
// panic!("incompatible set with column type");
|
||||
// }
|
||||
// Set::String(c.data.distinct_values(row_ids))
|
||||
}
|
||||
_ => unreachable!("not supported at the moment"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for Column {
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
use std::collections::{BTreeMap, BTreeSet};
|
||||
use std::collections::{BTreeMap, BTreeSet, HashSet};
|
||||
use std::iter;
|
||||
use std::mem::size_of;
|
||||
|
||||
|
@ -774,6 +774,82 @@ impl DictionaryRLE {
|
|||
out
|
||||
}
|
||||
|
||||
/// Returns the unique set of values encoded at each of the provided ids.
|
||||
/// NULL values are not returned.
|
||||
pub fn distinct_values(&self, row_ids: &[usize]) -> BTreeSet<&String> {
|
||||
// TODO(edd): can improve on this if we know encoded data is totally
|
||||
// ordered.
|
||||
let mut encoded_values = HashSet::new();
|
||||
|
||||
let mut curr_logical_row_id = 0;
|
||||
let mut run_lengths_iter = self.run_lengths.iter();
|
||||
let (mut curr_entry_id, mut curr_entry_rl) = run_lengths_iter.next().unwrap();
|
||||
|
||||
'by_row: for row_id in row_ids {
|
||||
while curr_logical_row_id + curr_entry_rl <= *row_id as u64 {
|
||||
// this encoded entry does not cover the row we need.
|
||||
// move on to next entry
|
||||
curr_logical_row_id += curr_entry_rl;
|
||||
match run_lengths_iter.next() {
|
||||
Some(res) => {
|
||||
curr_entry_id = res.0;
|
||||
curr_entry_rl = res.1;
|
||||
}
|
||||
None => panic!("shouldn't get here"),
|
||||
}
|
||||
}
|
||||
|
||||
// track encoded value
|
||||
encoded_values.insert(curr_entry_id);
|
||||
if encoded_values.len() == self.index_entry.len() {
|
||||
// all distinct values have been read
|
||||
break 'by_row;
|
||||
}
|
||||
|
||||
curr_logical_row_id += 1;
|
||||
curr_entry_rl -= 1;
|
||||
}
|
||||
|
||||
assert!(encoded_values.len() <= self.index_entry.len());
|
||||
|
||||
// Finally, materialise the decoded values for the encoded set.
|
||||
let mut results = BTreeSet::new();
|
||||
for id in encoded_values.iter() {
|
||||
let decoded_value = self.index_entry.get(id).unwrap();
|
||||
if let Some(value) = decoded_value {
|
||||
results.insert(value);
|
||||
}
|
||||
}
|
||||
results
|
||||
}
|
||||
|
||||
/// Returns true if the encoding contains values other than those provided in
|
||||
/// `values`.
|
||||
pub fn contains_other_values(&self, values: &BTreeSet<&String>) -> bool {
|
||||
let mut encoded_values = self.entry_index.len();
|
||||
if self.entry_index.contains_key(&None) {
|
||||
encoded_values -= 1;
|
||||
}
|
||||
|
||||
if encoded_values > values.len() {
|
||||
return true;
|
||||
}
|
||||
|
||||
for key in self.entry_index.keys() {
|
||||
match key {
|
||||
Some(key) => {
|
||||
if !values.contains(key) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
None => continue, // skip NULL
|
||||
}
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
/// Determines if the encoded data contains at least one non-null value at
|
||||
/// any of the provided row ids.
|
||||
pub fn has_non_null_value_in_row_ids(&self, row_ids: &[usize]) -> bool {
|
||||
let null_encoded_value = self.entry_index.get(&None);
|
||||
if null_encoded_value.is_none() {
|
||||
|
|
|
@ -130,6 +130,11 @@ impl Segment {
|
|||
&self.meta.column_names
|
||||
}
|
||||
|
||||
/// Determines if the segment contains a column with the provided name.
|
||||
pub fn has_column(&self, name: &String) -> bool {
|
||||
self.meta.column_names.contains(name)
|
||||
}
|
||||
|
||||
/// column returns the column with name
|
||||
pub fn column(&self, name: &str) -> Option<&Column> {
|
||||
if let Some(id) = &self.meta.column_names.iter().position(|c| c == name) {
|
||||
|
@ -1126,6 +1131,89 @@ impl Segment {
|
|||
|
||||
Some(results)
|
||||
}
|
||||
|
||||
pub fn tag_values(
|
||||
&self,
|
||||
time_range: (i64, i64),
|
||||
predicates: &[(&str, &str)],
|
||||
tag_keys: &[String],
|
||||
excluded_tag_values: &BTreeMap<String, BTreeSet<&String>>,
|
||||
) -> Option<BTreeMap<&String, BTreeSet<&String>>> {
|
||||
// first check if we have any columns that should be processed.
|
||||
let mut have_some_cols = false;
|
||||
for &i in &self.tag_column_idxs {
|
||||
let col_name = self.column_names().get(i).unwrap();
|
||||
if tag_keys.contains(col_name) {
|
||||
have_some_cols = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if !have_some_cols {
|
||||
log::debug!("skipping segment because no columns for tag keys present");
|
||||
return None; // we don't have any tag columns to offer.
|
||||
}
|
||||
|
||||
let (seg_min, seg_max) = self.meta.time_range;
|
||||
if predicates.is_empty() && time_range.0 <= seg_min && time_range.1 > seg_max {
|
||||
// the segment is completely overlapped by the time range of query,
|
||||
// and there are no predicates
|
||||
todo!("fast path")
|
||||
}
|
||||
|
||||
let pred_vec = predicates
|
||||
.iter()
|
||||
.map(|p| (p.0, Some(column::Scalar::String(p.1))))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let filtered_row_ids: croaring::Bitmap;
|
||||
if let Some(row_ids) = self.filter_by_predicates_eq(time_range, pred_vec.as_slice()) {
|
||||
filtered_row_ids = row_ids;
|
||||
} else {
|
||||
return None; // no matching rows for predicate + time range
|
||||
}
|
||||
|
||||
let mut results = BTreeMap::new();
|
||||
|
||||
let filtered_row_ids_vec = filtered_row_ids
|
||||
.to_vec()
|
||||
.iter()
|
||||
.map(|v| *v as usize)
|
||||
.collect::<Vec<_>>();
|
||||
log::debug!("filtered to {:?} rows.", filtered_row_ids_vec.len());
|
||||
|
||||
for &i in &self.tag_column_idxs {
|
||||
let col = &self.columns[i];
|
||||
let col_name = self.column_names().get(i).unwrap();
|
||||
|
||||
if !tag_keys.contains(col_name) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// if !col.contains_other_values(&column::Set::String(
|
||||
// *excluded_tag_values.get(col_name).unwrap(),
|
||||
// )) {
|
||||
// log::debug!("skipping!!");
|
||||
// continue;
|
||||
// }
|
||||
|
||||
if let Some(exclude_tag_values) = excluded_tag_values.get(col_name) {
|
||||
if !col.contains_other_values(exclude_tag_values) {
|
||||
log::debug!("skipping!!");
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
if let column::Set::String(values) = col.distinct_values(&filtered_row_ids_vec) {
|
||||
log::debug!("distinct values: {:?}", values);
|
||||
results.insert(col_name, values);
|
||||
} else {
|
||||
unreachable!("only works on tag columns");
|
||||
}
|
||||
}
|
||||
|
||||
Some(results)
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for Segment {
|
||||
|
@ -1630,6 +1718,8 @@ impl<'a> Segments<'a> {
|
|||
}
|
||||
}
|
||||
|
||||
/// Returns the distinct set of tag keys (column names) matching the provided
|
||||
/// predicates and time range.
|
||||
pub fn tag_keys(
|
||||
&self,
|
||||
time_range: (i64, i64),
|
||||
|
@ -1654,6 +1744,55 @@ impl<'a> Segments<'a> {
|
|||
|
||||
columns
|
||||
}
|
||||
|
||||
/// Returns the distinct set of tag values (column values) for each provided
|
||||
/// tag key, where each returned value lives in a row matching the provided
|
||||
/// predicates and time range.
|
||||
///
|
||||
/// As a special case, if no values are provided for `tag_keys` then all
|
||||
/// tag key-values are returned for the segments.
|
||||
pub fn tag_values(
|
||||
&self,
|
||||
time_range: (i64, i64),
|
||||
predicates: &[(&str, &str)],
|
||||
tag_keys: &[String],
|
||||
) -> BTreeMap<String, BTreeSet<&String>> {
|
||||
let (min, max) = time_range;
|
||||
if max <= min {
|
||||
panic!("max <= min");
|
||||
}
|
||||
|
||||
let mut results: BTreeMap<String, BTreeSet<&String>> = BTreeMap::new();
|
||||
|
||||
for segment in &self.segments {
|
||||
if !segment.meta.overlaps_time_range(min, max) {
|
||||
continue; // segment doesn't have time range
|
||||
}
|
||||
|
||||
let col_names = if tag_keys.is_empty() {
|
||||
segment.column_names()
|
||||
} else {
|
||||
tag_keys
|
||||
};
|
||||
|
||||
let segment_values = segment.tag_values(time_range, predicates, col_names, &results);
|
||||
match segment_values {
|
||||
Some(values) => {
|
||||
for (tag_key, mut tag_values) in values {
|
||||
if !results.contains_key(tag_key) {
|
||||
results.insert(tag_key.clone(), tag_values);
|
||||
} else {
|
||||
let all_values = results.get_mut(tag_key).unwrap();
|
||||
all_values.append(&mut tag_values);
|
||||
}
|
||||
}
|
||||
}
|
||||
None => continue,
|
||||
}
|
||||
}
|
||||
|
||||
results
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
|
|
Loading…
Reference in New Issue