feat: nullable tags

pull/24376/head
Edd Robinson 2020-08-05 22:42:26 +01:00
parent 69bc0424bf
commit d43d7bb3d4
4 changed files with 159 additions and 96 deletions

View File

@ -45,8 +45,8 @@ fn main() {
let segments = segments
.filter_by_time(1590036110000000, 1590044410000000)
.filter_by_predicate_eq("env", &column::Scalar::String("toolsus1"));
let res = segments.first("env").unwrap();
.filter_by_predicate_eq("env", &column::Scalar::String("prod01-eu-central-1"));
let res = segments.first("env");
println!("{:?}", res);
}
@ -61,7 +61,7 @@ fn build_store(
Ok(())
}
fn convert_record_batch<'a>(rb: RecordBatch) -> Result<Segment, Error> {
fn convert_record_batch(rb: RecordBatch) -> Result<Segment, Error> {
let mut segment = Segment::default();
// println!("cols {:?} rows {:?}", rb.num_columns(), rb.num_rows());
@ -89,23 +89,31 @@ fn convert_record_batch<'a>(rb: RecordBatch) -> Result<Segment, Error> {
segment.add_column(rb.schema().field(i).name(), column);
}
datatypes::DataType::Utf8 => {
if column.null_count() > 0 {
panic!("null tag");
}
let arr = column
.as_any()
.downcast_ref::<array::StringArray>()
.unwrap();
let mut c = column::String::default();
let mut prev = arr.value(0);
let mut prev: Option<&str> = None;
if !column.is_null(0) {
prev = Some(arr.value(0));
}
let mut count = 1_u64;
for j in 1..arr.len() {
let next = arr.value(j);
let mut next = Some(arr.value(j));
if column.is_null(j) {
next = None;
}
if prev == next {
count += 1;
} else {
c.add_additional(prev, count);
match prev {
Some(x) => c.add_additional(Some(x.to_string()), count),
None => c.add_additional(None, count),
}
prev = next;
count = 1;
}

View File

@ -41,10 +41,11 @@ impl Column {
if row_id >= self.num_rows() {
return None;
}
if let Some(v) = c.value(row_id) {
return Some(Scalar::String(v));
};
None
match c.value(row_id) {
Some(v) => Some(Scalar::String(v)),
None => None,
}
}
Column::Float(c) => {
if row_id >= self.num_rows() {
@ -65,7 +66,7 @@ impl Column {
match self {
Column::String(c) => {
if let Scalar::String(v) = value {
c.meta.maybe_contains_value(v.to_string())
c.meta.maybe_contains_value(&v.to_string())
} else {
panic!("invalid value");
}
@ -87,19 +88,31 @@ impl Column {
}
}
pub fn min(&self) -> Scalar {
// FIXME(edd): Support NULL integers and floats
pub fn min(&self) -> Option<Scalar> {
match self {
Column::String(c) => Scalar::String(c.meta.range().0),
Column::Float(c) => Scalar::Float(c.meta.range().0),
Column::Integer(c) => Scalar::Integer(c.meta.range().0),
Column::String(c) => {
if let Some(min) = c.meta.range().0 {
return Some(Scalar::String(min));
}
None
}
Column::Float(c) => Some(Scalar::Float(c.meta.range().0)),
Column::Integer(c) => Some(Scalar::Integer(c.meta.range().0)),
}
}
pub fn max(&self) -> Scalar {
// FIXME(edd): Support NULL integers and floats
pub fn max(&self) -> Option<Scalar> {
match self {
Column::String(c) => Scalar::String(c.meta.range().1),
Column::Float(c) => Scalar::Float(c.meta.range().1),
Column::Integer(c) => Scalar::Integer(c.meta.range().1),
Column::String(c) => {
if let Some(max) = c.meta.range().1 {
return Some(Scalar::String(max));
}
None
}
Column::Float(c) => Some(Scalar::Float(c.meta.range().1)),
Column::Integer(c) => Some(Scalar::Integer(c.meta.range().1)),
}
}
}
@ -126,16 +139,16 @@ pub struct String {
impl String {
pub fn add(&mut self, s: &str) {
self.meta.add(s);
self.meta.add(Some(s.to_string()));
self.data.push(s);
}
pub fn add_additional(&mut self, s: &str, additional: u64) {
self.meta.add(s);
pub fn add_additional(&mut self, s: Option<std::string::String>, additional: u64) {
self.meta.add(s.clone());
self.data.push_additional(s, additional);
}
pub fn column_range(&self) -> (&str, &str) {
pub fn column_range(&self) -> (Option<&std::string::String>, Option<&std::string::String>) {
self.meta.range()
}
@ -241,21 +254,21 @@ impl From<&[i64]> for Integer {
pub mod metadata {
#[derive(Debug, Default)]
pub struct Str {
range: (String, String),
range: (Option<String>, Option<String>),
num_rows: usize,
// sparse_index: BTreeMap<String, usize>,
}
impl Str {
pub fn add(&mut self, s: &str) {
pub fn add(&mut self, s: Option<String>) {
self.num_rows += 1;
if self.range.0.as_str() > s {
self.range.0 = s.to_owned();
if self.range.0 > s {
self.range.0 = s.clone();
}
if self.range.1.as_str() < s {
self.range.1 = s.to_owned();
if self.range.1 < s {
self.range.1 = s;
}
}
@ -263,8 +276,8 @@ pub mod metadata {
self.num_rows
}
pub fn maybe_contains_value(&self, v: String) -> bool {
let res = self.range.0 <= v && v <= self.range.1;
pub fn maybe_contains_value(&self, v: &str) -> bool {
let res = self.range.0 <= Some(v.to_string()) && Some(v.to_string()) <= self.range.1;
println!(
"column with ({:?}) maybe contain {:?} -- {:?}",
self.range, v, res
@ -272,12 +285,13 @@ pub mod metadata {
res
}
pub fn range(&self) -> (&str, &str) {
(&self.range.0, &self.range.1)
pub fn range(&self) -> (Option<&String>, Option<&String>) {
(self.range.0.as_ref(), self.range.1.as_ref())
}
pub fn size(&self) -> usize {
self.range.0.len() + self.range.1.len() + std::mem::size_of::<usize>()
// TODO!!!!
0 //self.range.0.len() + self.range.1.len() + std::mem::size_of::<usize>()
}
}

View File

@ -60,10 +60,10 @@ impl From<&[f64]> for PlainFixed<f64> {
#[derive(Debug, Default)]
pub struct DictionaryRLE {
// stores the mapping between an entry and its assigned index.
entry_index: BTreeMap<String, usize>,
entry_index: BTreeMap<Option<String>, usize>,
// stores the mapping between an index and its entry.
index_entry: BTreeMap<usize, String>,
index_entry: BTreeMap<usize, Option<String>>,
map_size: usize, // TODO(edd) this isn't perfect at all
@ -88,12 +88,16 @@ impl DictionaryRLE {
}
pub fn push(&mut self, v: &str) {
self.push_additional(v, 1);
self.push_additional(Some(v.to_owned()), 1);
}
pub fn push_additional(&mut self, v: &str, additional: u64) {
pub fn push_none(&mut self) {
self.push_additional(None, 1);
}
pub fn push_additional(&mut self, v: Option<String>, additional: u64) {
self.total += additional;
let idx = self.entry_index.get(v);
let idx = self.entry_index.get(&v);
match idx {
Some(idx) => {
if let Some((last_idx, rl)) = self.run_lengths.last_mut() {
@ -112,9 +116,12 @@ impl DictionaryRLE {
if idx.is_none() {
let idx = self.entry_index.len();
self.entry_index.insert(String::from(v), idx);
self.index_entry.insert(idx, String::from(v));
self.map_size += v.len() + std::mem::size_of::<usize>();
self.entry_index.insert(v.clone(), idx);
if let Some(value) = &v {
self.map_size += value.len();
}
self.index_entry.insert(idx, v);
self.map_size += 8 + std::mem::size_of::<usize>(); // TODO(edd): clean this option size up
self.run_lengths.push((idx, additional));
self.run_length_size += std::mem::size_of::<(usize, u64)>();
@ -126,9 +133,9 @@ impl DictionaryRLE {
// row_ids returns an iterator over the set of row ids matching the provided
// value.
pub fn row_ids(&self, value: &str) -> impl iter::Iterator<Item = usize> {
pub fn row_ids(&self, value: Option<String>) -> impl iter::Iterator<Item = usize> {
let mut out: Vec<usize> = vec![];
if let Some(idx) = self.entry_index.get(value) {
if let Some(idx) = self.entry_index.get(&value) {
let mut index: usize = 0;
for (other_idx, other_rl) in &self.run_lengths {
let start = index;
@ -143,9 +150,9 @@ impl DictionaryRLE {
// row_ids returns an iterator over the set of row ids matching the provided
// value.
pub fn row_ids_roaring(&self, value: &str) -> croaring::Bitmap {
pub fn row_ids_roaring(&self, value: Option<String>) -> croaring::Bitmap {
let mut bm = croaring::Bitmap::create();
if let Some(idx) = self.entry_index.get(value) {
if let Some(idx) = self.entry_index.get(&value) {
let mut index: u64 = 0;
for (other_idx, other_rl) in &self.run_lengths {
let start = index;
@ -179,11 +186,11 @@ impl DictionaryRLE {
// unreachable!("for now");
// }
pub fn dictionary(&self) -> BTreeSet<String> {
pub fn dictionary(&self) -> BTreeSet<Option<String>> {
self.entry_index
.keys()
.cloned()
.collect::<BTreeSet<String>>()
.collect::<BTreeSet<Option<String>>>()
}
// get the logical value at the provided index, or None if there is no value
@ -193,7 +200,12 @@ impl DictionaryRLE {
let mut total = 0;
for (idx, rl) in &self.run_lengths {
if total + rl > index as u64 {
return self.index_entry.get(idx);
// TODO(edd): Can this really be idiomatic???
match self.index_entry.get(idx) {
Some(&Some(ref result)) => return Some(result),
Some(&None) => return None,
None => return None,
}
}
total += rl;
}
@ -203,19 +215,20 @@ impl DictionaryRLE {
// values materialises a vector of references to all logical values in the
// encoding.
pub fn values(&mut self) -> Vec<&str> {
let mut out = Vec::with_capacity(self.total as usize);
pub fn values(&mut self) -> Vec<Option<&String>> {
let mut out: Vec<Option<&String>> = Vec::with_capacity(self.total as usize);
// build reverse mapping.
let mut idx_value = BTreeMap::new();
for (k, v) in &self.entry_index {
idx_value.insert(v, k.as_str());
idx_value.insert(v, k);
}
assert_eq!(idx_value.len(), self.entry_index.len());
for (idx, rl) in &self.run_lengths {
let &v = idx_value.get(&idx).unwrap();
out.extend(iter::repeat(&v).take(*rl as usize));
// TODO(edd): fix unwrap - we know that the value exists in map...
let v = idx_value.get(&idx).unwrap().as_ref();
out.extend(iter::repeat(v).take(*rl as usize));
}
out
}
@ -261,17 +274,34 @@ mod test {
drle.push("world");
drle.push("hello");
drle.push("hello");
drle.push_additional("hello", 1);
drle.push_additional(Some("hello".to_string()), 1);
assert_eq!(
drle.values(),
["hello", "hello", "world", "hello", "hello", "hello",]
[
Some(&"hello".to_string()),
Some(&"hello".to_string()),
Some(&"world".to_string()),
Some(&"hello".to_string()),
Some(&"hello".to_string()),
Some(&"hello".to_string())
]
);
drle.push_additional("zoo", 3);
drle.push_additional(Some("zoo".to_string()), 3);
assert_eq!(
drle.values(),
["hello", "hello", "world", "hello", "hello", "hello", "zoo", "zoo", "zoo"]
[
Some(&"hello".to_string()),
Some(&"hello".to_string()),
Some(&"world".to_string()),
Some(&"hello".to_string()),
Some(&"hello".to_string()),
Some(&"hello".to_string()),
Some(&"zoo".to_string()),
Some(&"zoo".to_string()),
Some(&"zoo".to_string()),
]
);
assert_eq!(drle.value(0).unwrap(), "hello");
@ -288,34 +318,49 @@ mod test {
#[test]
fn row_ids() {
let mut drle = super::DictionaryRLE::new();
drle.push_additional("abc", 3);
drle.push_additional("dre", 2);
drle.push_additional(Some("abc".to_string()), 3);
drle.push_additional(Some("dre".to_string()), 2);
drle.push("abc");
let ids = drle.row_ids("abc").collect::<Vec<usize>>();
let ids = drle
.row_ids(Some("abc".to_string()))
.collect::<Vec<usize>>();
assert_eq!(ids, vec![0, 1, 2, 5]);
let ids = drle.row_ids("dre").collect::<Vec<usize>>();
let ids = drle
.row_ids(Some("dre".to_string()))
.collect::<Vec<usize>>();
assert_eq!(ids, vec![3, 4]);
let ids = drle.row_ids("foo").collect::<Vec<usize>>();
let ids = drle
.row_ids(Some("foo".to_string()))
.collect::<Vec<usize>>();
assert_eq!(ids, vec![]);
}
#[test]
fn row_ids_roaring() {
let mut drle = super::DictionaryRLE::new();
drle.push_additional("abc", 3);
drle.push_additional("dre", 2);
drle.push_additional(Some("abc".to_string()), 3);
drle.push_additional(Some("dre".to_string()), 2);
drle.push("abc");
let ids = drle.row_ids_roaring("abc").iter().collect::<Vec<u32>>();
let ids = drle
.row_ids_roaring(Some("abc".to_string()))
.iter()
.collect::<Vec<u32>>();
assert_eq!(ids, vec![0, 1, 2, 5]);
let ids = drle.row_ids_roaring("dre").iter().collect::<Vec<u32>>();
let ids = drle
.row_ids_roaring(Some("dre".to_string()))
.iter()
.collect::<Vec<u32>>();
assert_eq!(ids, vec![3, 4]);
let ids = drle.row_ids_roaring("foo").iter().collect::<Vec<u32>>();
let ids = drle
.row_ids_roaring(Some("foo".to_string()))
.iter()
.collect::<Vec<u32>>();
assert_eq!(ids, vec![]);
}
}

View File

@ -87,6 +87,24 @@ impl Segment {
}
}
/// Meta data for a segment. This data is mainly used to determine if a segment
/// may contain value for answering a query.
#[derive(Debug, Default)]
pub struct SegmentMetaData {
size: usize, // TODO
rows: usize,
column_names: Vec<String>,
time_range: (i64, i64),
// TODO column sort order
}
impl SegmentMetaData {
pub fn overlaps_time_range(&self, from: i64, to: i64) -> bool {
self.time_range.0 <= to && from <= self.time_range.1
}
}
pub struct Segments<'a> {
segments: Vec<&'a Segment>,
}
@ -100,6 +118,7 @@ impl<'a> Segments<'a> {
let mut segments: Vec<&Segment> = vec![];
for segment in &self.segments {
if segment.meta.overlaps_time_range(min, max) {
println!("Segement {:?} overlaps", segment.meta);
segments.push(segment);
}
}
@ -131,7 +150,7 @@ impl<'a> Segments<'a> {
let mut min_min: Option<column::Scalar> = None;
for segment in &self.segments {
if let Some(i) = segment.column_names().iter().position(|c| c == column_name) {
let min = Some(segment.columns[i].min());
let min = segment.columns[i].min();
if min_min.is_none() {
min_min = min
} else if min_min > min {
@ -152,7 +171,7 @@ impl<'a> Segments<'a> {
let mut max_max: Option<column::Scalar> = None;
for segment in &self.segments {
if let Some(i) = segment.column_names().iter().position(|c| c == column_name) {
let max = Some(segment.columns[i].max());
let max = segment.columns[i].max();
if max_max.is_none() {
max_max = max
} else if max_max < max {
@ -231,28 +250,5 @@ impl<'a> Segments<'a> {
}
}
/// Meta data for a segment. This data is mainly used to determine if a segment
/// may contain value for answering a query.
#[derive(Debug, Default)]
pub struct SegmentMetaData {
size: usize, // TODO
rows: usize,
column_names: Vec<String>,
time_range: (i64, i64),
// TODO column sort order
}
impl SegmentMetaData {
pub fn overlaps_time_range(&self, from: i64, to: i64) -> bool {
let result = self.time_range.0 <= to && from <= self.time_range.1;
println!(
"segment with ({:?}) overlaps ({:?}, {:?}) -- {:?}",
self.time_range, from, to, result
);
result
}
}
#[cfg(test)]
mod test {}