refactor: get build working

pull/24376/head
Edd Robinson 2020-09-16 16:59:00 +01:00
parent a107da6dfe
commit ba39d731e0
4 changed files with 416 additions and 172 deletions

View File

@ -14,6 +14,7 @@ pub enum Scalar<'a> {
String(&'a str),
Float(f64),
Integer(i64),
Unsigned32(u32),
}
impl<'a> Scalar<'a> {
@ -28,6 +29,9 @@ impl<'a> Scalar<'a> {
Scalar::Integer(v) => {
*v = 0;
}
Scalar::Unsigned32(v) => {
*v = 0;
}
}
}
@ -47,6 +51,13 @@ impl<'a> Scalar<'a> {
panic!("invalid");
};
}
Self::Unsigned32(v) => {
if let Self::Unsigned32(other) = other {
*v += other;
} else {
panic!("invalid");
};
}
Self::String(_) => {
unreachable!("not possible to add strings");
}
@ -54,6 +65,39 @@ impl<'a> Scalar<'a> {
}
}
impl<'a> std::ops::Add<&Scalar<'a>> for &mut Scalar<'a> {
type Output = Scalar<'a>;
fn add(self, _rhs: &Scalar<'a>) -> Self::Output {
match *self {
Scalar::Float(v) => {
if let Scalar::Float(other) = _rhs {
Scalar::Float(v + other)
} else {
panic!("invalid");
}
}
Scalar::Integer(v) => {
if let Scalar::Integer(other) = _rhs {
Scalar::Integer(v + other)
} else {
panic!("invalid");
}
}
Scalar::Unsigned32(v) => {
if let Scalar::Unsigned32(other) = _rhs {
Scalar::Unsigned32(v + other)
} else {
panic!("invalid");
}
}
Scalar::String(_) => {
unreachable!("not possible to add strings");
}
}
}
}
impl<'a> std::ops::Add<&Scalar<'a>> for Scalar<'a> {
type Output = Scalar<'a>;
@ -73,6 +117,13 @@ impl<'a> std::ops::Add<&Scalar<'a>> for Scalar<'a> {
panic!("invalid");
}
}
Self::Unsigned32(v) => {
if let Self::Unsigned32(other) = _rhs {
Self::Unsigned32(v + other)
} else {
panic!("invalid");
}
}
Self::String(_) => {
unreachable!("not possible to add strings");
}
@ -80,6 +131,37 @@ impl<'a> std::ops::Add<&Scalar<'a>> for Scalar<'a> {
}
}
impl<'a> std::ops::AddAssign<&Scalar<'a>> for &mut Scalar<'a> {
fn add_assign(&mut self, _rhs: &Scalar<'a>) {
match self {
Scalar::Float(v) => {
if let Scalar::Float(other) = _rhs {
*v += *other;
} else {
panic!("invalid");
};
}
Scalar::Integer(v) => {
if let Scalar::Integer(other) = _rhs {
*v += *other;
} else {
panic!("invalid");
};
}
Scalar::Unsigned32(v) => {
if let Scalar::Unsigned32(other) = _rhs {
*v += *other;
} else {
panic!("invalid");
};
}
Scalar::String(_) => {
unreachable!("not possible to add strings");
}
}
}
}
impl<'a> std::ops::AddAssign<&Scalar<'a>> for Scalar<'a> {
fn add_assign(&mut self, _rhs: &Scalar<'a>) {
match self {
@ -97,6 +179,13 @@ impl<'a> std::ops::AddAssign<&Scalar<'a>> for Scalar<'a> {
panic!("invalid");
};
}
Self::Unsigned32(v) => {
if let Self::Unsigned32(other) = _rhs {
*v += *other;
} else {
panic!("invalid");
};
}
Self::String(_) => {
unreachable!("not possible to add strings");
}
@ -107,6 +196,8 @@ impl<'a> std::ops::AddAssign<&Scalar<'a>> for Scalar<'a> {
#[derive(Clone, Debug)]
pub enum Aggregate<'a> {
Count(u64),
// Sum can be `None` is for example all values being aggregated are themselves
// `None`.
Sum(Option<Scalar<'a>>),
}
@ -116,44 +207,71 @@ pub enum AggregateType {
Sum,
}
// impl<'a> std::ops::Add<Scalar<'a>> for Aggregate<'a> {
// impl<'a> std::ops::Add<&Option<Scalar<'a>>> for Aggregate<'a> {
// type Output = Aggregate<'a>;
// fn add(self, _rhs: Scalar<'a>) -> Self::Output {
// fn add(self, _rhs: &Option<Scalar<'a>>) -> Self::Output {
// match self {
// Self::Count(c) => Self::Count(c + 1),
// Self::Sum(s) => Self::Sum(s + &_rhs),
// Self::Count(self_count) => match _rhs {
// Some(other_scalar) => match other_scalar {
// Scalar::String(_) => panic!("todo - remove String scalar"),
// Scalar::Float(_) => panic!("cannot add floating point value to a count"),
// Scalar::Integer(v) => Self::Count(self_count + *v as u64),
// Scalar::Unsigned32(v) => Self::Count(self_count + *v as u64),
// },
// None => self,
// },
// // SUM ignores NULL values. Initially an aggregate sum is `None`, but
// // as soon as a non-null value is shown then it becomes `Some`.
// Self::Sum(self_sum) => match (self_sum, _rhs) {
// (None, None) => Self::Sum(None),
// (None, Some(other_scalar)) => match other_scalar {
// Scalar::String(_) => panic!("todo - remove String scalar"),
// Scalar::Float(_) => Self::Sum(Some(other_scalar.clone())),
// Scalar::Integer(_) => Self::Sum(Some(other_scalar.clone())),
// Scalar::Unsigned32(_) => Self::Sum(Some(other_scalar.clone())),
// },
// (Some(_self), None) => Self::Sum(Some(_self.clone())),
// (Some(self_scalar), Some(other_scalar)) => match other_scalar {
// Scalar::String(_) => panic!("todo - remove String scalar"),
// Scalar::Float(_) => Self::Sum(Some(self_scalar + &other_scalar)),
// Scalar::Integer(_) => Self::Sum(Some(self_scalar + &other_scalar)),
// Scalar::Unsigned32(_) => Self::Sum(Some(self_scalar + &other_scalar)),
// },
// },
// }
// }
// }
impl<'a> std::ops::Add<&Aggregate<'a>> for Aggregate<'a> {
type Output = Aggregate<'a>;
// impl<'a> std::ops::Add<&Aggregate<'a>> for Aggregate<'a> {
// type Output = Aggregate<'a>;
fn add(self, _rhs: &Aggregate<'a>) -> Self::Output {
match self {
Self::Count(c) => {
if let Self::Count(other) = _rhs {
Self::Count(c + other)
} else {
panic!("invalid");
}
}
Self::Sum(s) => {
if let Self::Sum(other) = _rhs {
match (s, other) {
(None, None) => Self::Sum(None),
(None, Some(other)) => Self::Sum(Some(*other)),
(Some(s), None) => Self::Sum(Some(s)),
(Some(s), Some(other)) => Self::Sum(Some(s + other)),
}
} else {
panic!("invalid");
}
}
}
}
}
// fn add(self, _rhs: &Aggregate<'a>) -> Self::Output {
// match self {
// Self::Count(self_count) => {
// if let Self::Count(other) = _rhs {
// Self::Count(self_count + *other)
// } else {
// panic!("can't combine count with other aggregate type");
// }
// }
// // SUM ignores NULL values. Initially an aggregate sum is `None`, but
// // as soon as a non-null value is shown then it becomes `Some`.
// Self::Sum(self_sum) => {
// if let Self::Sum(other) = _rhs {
// match (self_sum, other) {
// (None, None) => Self::Sum(None),
// (None, Some(_)) => Self::Sum(*other),
// (Some(_), None) => self,
// (Some(s), Some(other)) => Self::Sum(Some(s + other)),
// }
// } else {
// panic!("invalid");
// }
// }
// }
// }
// }
pub trait AggregatableByRange {
fn aggregate_by_id_range(
@ -163,12 +281,16 @@ pub trait AggregatableByRange {
to_row_id: usize,
) -> Aggregate<'_>;
}
/// A Vector is a materialised vector of values from a column.
pub enum Vector<'a> {
String(Vec<&'a Option<std::string::String>>),
EncodedString(Vec<i64>),
Float(Vec<Option<f64>>),
Integer(Vec<Option<i64>>),
NullString(Vec<&'a Option<std::string::String>>),
NullFloat(Vec<Option<f64>>),
NullInteger(Vec<Option<i64>>),
Float(Vec<f64>),
Integer(Vec<i64>),
Unsigned32(Vec<u32>),
// TODO(edd): add types like this:
//
// Integer16(Vec<i16>),
@ -189,7 +311,7 @@ impl<'a> Vector<'a> {
) -> Aggregate<'a> {
match agg_type {
AggregateType::Count => {
Aggregate::Count(self.count_by_id_range(from_row_id, to_row_id) as u64)
Aggregate::Count(self.count_by_id_range(from_row_id, to_row_id))
}
AggregateType::Sum => Aggregate::Sum(self.sum_by_id_range(from_row_id, to_row_id)),
}
@ -199,12 +321,12 @@ impl<'a> Vector<'a> {
// are no non-null values in the vector being summed then None is returned.
fn sum_by_id_range(&self, from_row_id: usize, to_row_id: usize) -> Option<Scalar<'a>> {
match self {
Vector::String(_) => {
Self::NullString(_) => {
panic!("can't sum strings....");
}
Vector::Float(values) => {
Self::NullFloat(values) => {
let mut res = 0.0;
let mut found = false;
let mut found = false; // TODO(edd): check if this is faster than a match.
// TODO(edd): check asm to see if it's vectorising
for v in values[from_row_id..to_row_id].iter() {
@ -219,7 +341,16 @@ impl<'a> Vector<'a> {
}
None
}
Vector::Integer(values) => {
Self::Float(values) => {
let mut res = 0.0;
// TODO(edd): check asm to see if it's vectorising
for v in values[from_row_id..to_row_id].iter() {
res += *v;
}
Some(Scalar::Float(res))
}
Self::NullInteger(values) => {
let mut res = 0;
let mut found = false;
@ -236,7 +367,7 @@ impl<'a> Vector<'a> {
}
None
}
Vector::EncodedString(values) => {
Self::Integer(values) => {
let mut res = 0;
// TODO(edd): check asm to see if it's vectorising
@ -245,24 +376,58 @@ impl<'a> Vector<'a> {
}
Some(Scalar::Integer(res))
}
Self::Unsigned32(values) => {
let mut res = 0;
// TODO(edd): check asm to see if it's vectorising
for v in values[from_row_id..to_row_id].iter() {
res += *v;
}
Some(Scalar::Unsigned32(res))
}
}
}
// return the count of values on the column. NULL values do not contribute
// to the count.
fn count_by_id_range(&self, from_row_id: usize, to_row_id: usize) -> usize {
fn count_by_id_range(&self, from_row_id: usize, to_row_id: usize) -> u64 {
match self {
Vector::String(vec) => vec.iter().filter(|x| x.is_some()).count(),
Vector::EncodedString(_) => to_row_id - from_row_id, // fast - no possible NULL values
Vector::Float(vec) => vec.iter().filter(|x| x.is_some()).count(),
Vector::Integer(vec) => vec.iter().filter(|x| x.is_some()).count(),
Self::NullString(vec) => {
let count = vec.iter().filter(|x| x.is_some()).count();
count as u64
}
Self::NullFloat(vec) => {
let count = vec.iter().filter(|x| x.is_some()).count();
count as u64
}
Self::NullInteger(vec) => {
let count = vec.iter().filter(|x| x.is_some()).count();
count as u64
}
Self::Float(vec) => (to_row_id - from_row_id) as u64, // fast - no possible NULL values
Self::Integer(vec) => (to_row_id - from_row_id) as u64, // fast - no possible NULL values
Self::Unsigned32(vec) => (to_row_id - from_row_id) as u64, // fast - no possible NULL values
}
}
pub fn extend(&mut self, other: Self) {
match self {
Self::String(v) => {
if let Self::String(other) = other {
Self::NullString(v) => {
if let Self::NullString(other) = other {
v.extend(other);
} else {
unreachable!("string can't be extended");
}
}
Self::NullFloat(v) => {
if let Self::NullFloat(other) = other {
v.extend(other);
} else {
unreachable!("string can't be extended");
}
}
Self::NullInteger(v) => {
if let Self::NullInteger(other) = other {
v.extend(other);
} else {
unreachable!("string can't be extended");
@ -282,8 +447,8 @@ impl<'a> Vector<'a> {
unreachable!("string can't be extended");
}
}
Vector::EncodedString(v) => {
if let Self::EncodedString(other) = other {
Self::Unsigned32(v) => {
if let Self::Unsigned32(other) = other {
v.extend(other);
} else {
unreachable!("string can't be extended");
@ -298,10 +463,12 @@ impl<'a> Vector<'a> {
pub fn len(&self) -> usize {
match self {
Self::String(v) => v.len(),
Self::NullString(v) => v.len(),
Self::NullFloat(v) => v.len(),
Self::NullInteger(v) => v.len(),
Self::Float(v) => v.len(),
Self::Integer(v) => v.len(),
Vector::EncodedString(v) => v.len(),
Self::Unsigned32(v) => v.len(),
}
}
@ -309,25 +476,54 @@ impl<'a> Vector<'a> {
/// position `i` is NULL then `None` is returned.
pub fn get(&self, i: usize) -> Value<'a> {
match self {
Self::String(v) => match v[i] {
Self::NullString(v) => match v[i] {
Some(v) => Value::String(v),
None => Value::Null, // Scalar::String(v[i].as_ref().unwrap()),
},
Self::Float(v) => match v[i] {
Self::NullFloat(v) => match v[i] {
Some(v) => Value::Scalar(Scalar::Float(v)),
None => Value::Null,
},
Self::Integer(v) => match v[i] {
Self::NullInteger(v) => match v[i] {
Some(v) => Value::Scalar(Scalar::Integer(v)),
None => Value::Null,
},
Self::EncodedString(v) => Value::Scalar(Scalar::Integer(v[i])),
Self::Float(v) => Value::Scalar(Scalar::Float(v[i])),
Self::Integer(v) => Value::Scalar(Scalar::Integer(v[i])),
Self::Unsigned32(v) => Value::Scalar(Scalar::Unsigned32(v[i])),
}
}
/// Return the value within the vector at position `i`. If the value at
/// position `i` is NULL then `None` is returned.
//
// TODO - sort out
pub fn get_scalar(&self, i: usize) -> Option<Scalar<'a>> {
match self {
Self::NullString(_) => panic!("unsupported get_scalar"),
Self::NullFloat(v) => match v[i] {
Some(v) => Some(Scalar::Float(v)),
None => None,
},
Self::NullInteger(v) => match v[i] {
Some(v) => Some(Scalar::Integer(v)),
None => None,
},
Self::Float(v) => Some(Scalar::Float(v[i])),
Self::Integer(v) => Some(Scalar::Integer(v[i])),
Self::Unsigned32(v) => Some(Scalar::Unsigned32(v[i])),
}
}
pub fn swap(&mut self, a: usize, b: usize) {
match self {
Self::String(v) => {
Self::NullString(v) => {
v.swap(a, b);
}
Self::NullFloat(v) => {
v.swap(a, b);
}
Self::NullInteger(v) => {
v.swap(a, b);
}
Self::Float(v) => {
@ -336,7 +532,9 @@ impl<'a> Vector<'a> {
Self::Integer(v) => {
v.swap(a, b);
}
Vector::EncodedString(v) => v.swap(a, b),
Self::Unsigned32(v) => {
v.swap(a, b);
}
}
}
}
@ -352,8 +550,6 @@ impl AggregatableByRange for &Vector<'_> {
}
}
/// VectorIterator allows a `Vector` to be iterated. Until vectors are drained
/// Scalar values are emitted.
pub struct VectorIterator<'a> {
v: &'a Vector<'a>,
next_i: usize,
@ -379,14 +575,44 @@ impl<'a> Iterator for VectorIterator<'a> {
}
}
/// NullVectorIterator allows a `Vector` to be iterated. Until vectors are
/// drained Scalar values are emitted.
///
///
/// TODO - need to figure this out - currently only returns scalars
pub struct NullVectorIterator<'a> {
v: &'a Vector<'a>,
next_i: usize,
}
impl<'a> NullVectorIterator<'a> {
pub fn new(v: &'a Vector<'a>) -> Self {
Self { v, next_i: 0 }
}
}
impl<'a> Iterator for NullVectorIterator<'a> {
type Item = Option<Scalar<'a>>;
fn next(&mut self) -> Option<Self::Item> {
let curr_i = self.next_i;
self.next_i += 1;
if curr_i == self.v.len() {
return None;
}
Some(self.v.get_scalar(curr_i))
}
}
use chrono::prelude::*;
impl<'a> std::fmt::Display for Vector<'a> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::String(v) => write!(f, "{:?}", v),
Self::Float(v) => write!(f, "{:?}", v),
Self::Integer(v) => {
Self::NullString(v) => write!(f, "{:?}", v),
Self::NullFloat(v) => write!(f, "{:?}", v),
Self::NullInteger(v) => {
for x in v.iter() {
match x {
Some(x) => {
@ -398,7 +624,16 @@ impl<'a> std::fmt::Display for Vector<'a> {
}
Ok(())
}
Vector::EncodedString(v) => write!(f, "{:?}", v),
Self::Float(v) => write!(f, "{:?}", v),
Self::Integer(v) => {
// TODO(edd) remove as this is timestamp specific
for x in v.iter() {
let ts = NaiveDateTime::from_timestamp(*x / 1000 / 1000, 0);
write!(f, "{}, ", ts)?;
}
Ok(())
}
Self::Unsigned32(v) => write!(f, "{:?}", v),
}
}
}
@ -474,10 +709,10 @@ impl Column {
match self {
Column::String(c) => {
if row_ids.is_empty() {
return Vector::String(vec![]);
return Vector::NullString(vec![]);
}
Vector::String(c.values(row_ids))
Vector::NullString(c.values(row_ids))
}
Column::Float(c) => {
if row_ids.is_empty() {
@ -488,7 +723,7 @@ impl Column {
let v = c.values(row_ids);
log::debug!("time getting decoded values for float {:?}", now.elapsed());
Vector::Float(v)
Vector::NullFloat(v)
}
Column::Integer(c) => {
if row_ids.is_empty() {
@ -498,7 +733,7 @@ impl Column {
let now = std::time::Instant::now();
let v = c.values(row_ids);
log::debug!("time getting decoded values for int {:?}", now.elapsed());
Vector::Integer(v)
Vector::NullInteger(v)
}
}
}
@ -509,7 +744,7 @@ impl Column {
match self {
Column::String(c) => {
if row_ids.is_empty() {
return Vector::String(vec![]);
return Vector::NullString(vec![]);
}
let row_id_vec = row_ids
@ -517,7 +752,7 @@ impl Column {
.iter()
.map(|v| *v as usize)
.collect::<Vec<_>>();
Vector::String(c.values(&row_id_vec))
Vector::NullString(c.values(&row_id_vec))
}
Column::Float(c) => {
if row_ids.is_empty() {
@ -529,7 +764,7 @@ impl Column {
.iter()
.map(|v| *v as usize)
.collect::<Vec<_>>();
Vector::Float(c.values(&row_id_vec))
Vector::NullFloat(c.values(&row_id_vec))
}
Column::Integer(c) => {
if row_ids.is_empty() {
@ -541,7 +776,7 @@ impl Column {
.iter()
.map(|v| *v as usize)
.collect::<Vec<_>>();
Vector::Integer(c.values(&row_id_vec))
Vector::NullInteger(c.values(&row_id_vec))
}
}
}
@ -560,13 +795,13 @@ impl Column {
match self {
Column::String(c) => {
if row_ids.is_empty() {
return Vector::Integer(vec![]);
return Vector::Unsigned32(vec![]);
}
let now = std::time::Instant::now();
let v = c.encoded_values(&row_ids_vec);
log::debug!("time getting encoded values {:?}", now.elapsed());
Vector::EncodedString(v)
Vector::Unsigned32(v)
}
Column::Float(c) => {
if row_ids.is_empty() {
@ -591,7 +826,7 @@ impl Column {
match self {
Column::String(c) => {
if row_ids.is_empty() {
return Vector::Integer(vec![]);
return Vector::Unsigned32(vec![]);
}
let now = std::time::Instant::now();
@ -599,7 +834,7 @@ impl Column {
log::debug!("time getting encoded values {:?}", now.elapsed());
log::debug!("dictionary {:?}", c.data.dictionary());
Vector::EncodedString(v)
Vector::Unsigned32(v)
}
Column::Float(c) => {
if row_ids.is_empty() {
@ -627,7 +862,7 @@ impl Column {
log::debug!("time getting all encoded values {:?}", now.elapsed());
log::debug!("dictionary {:?}", c.data.dictionary());
Vector::EncodedString(v)
Vector::Unsigned32(v)
}
Column::Float(c) => Vector::Float(c.all_encoded_values()),
Column::Integer(c) => Vector::Integer(c.all_encoded_values()),
@ -671,9 +906,9 @@ impl Column {
row_ids_vec[0]
);
match self {
Column::String(c) => Vector::String(c.values(&row_ids_vec)),
Column::Float(c) => Vector::Float(c.values(&row_ids_vec)),
Column::Integer(c) => Vector::Integer(c.values(&row_ids_vec)),
Column::String(c) => Vector::NullString(c.values(&row_ids_vec)),
Column::Float(c) => Vector::NullFloat(c.values(&row_ids_vec)),
Column::Integer(c) => Vector::NullInteger(c.values(&row_ids_vec)),
}
}
@ -839,7 +1074,7 @@ impl Column {
Column::String(_) => unimplemented!("not implemented"),
Column::Float(c) => match agg_type {
AggregateType::Count => {
Aggregate::Count(c.count_by_id_range(from_row_id, to_row_id) as u64)
Aggregate::Count(c.count_by_id_range(from_row_id, to_row_id))
}
AggregateType::Sum => match c.sum_by_id_range(from_row_id, to_row_id) {
Some(sum) => Aggregate::Sum(Some(Scalar::Float(sum))),
@ -1071,11 +1306,11 @@ impl String {
self.data.values(row_ids)
}
pub fn encoded_values(&self, row_ids: &[usize]) -> Vec<i64> {
pub fn encoded_values(&self, row_ids: &[usize]) -> Vec<u32> {
self.data.encoded_values(row_ids)
}
pub fn all_encoded_values(&self) -> Vec<i64> {
pub fn all_encoded_values(&self) -> Vec<u32> {
self.data.all_encoded_values()
}
@ -1288,7 +1523,7 @@ where
impl<T> NumericColumn<T>
where
T: Clone + std::cmp::PartialOrd + std::fmt::Debug,
T: Copy + Clone + std::cmp::PartialOrd + std::fmt::Debug,
{
pub fn column_range(&self) -> &Option<(T, T)> {
self.meta.range()
@ -1306,11 +1541,11 @@ where
self.data.values(row_ids)
}
pub fn encoded_values(&self, row_ids: &[usize]) -> Vec<Option<T>> {
pub fn encoded_values(&self, row_ids: &[usize]) -> Vec<T> {
self.data.encoded_values(row_ids)
}
pub fn all_encoded_values(&self) -> Vec<Option<T>> {
pub fn all_encoded_values(&self) -> Vec<T> {
self.data.all_encoded_values()
}
@ -1334,7 +1569,7 @@ where
self.data.sum_by_id_range(from_row_id, to_row_id)
}
pub fn count_by_id_range(&self, from_row_id: usize, to_row_id: usize) -> usize {
pub fn count_by_id_range(&self, from_row_id: usize, to_row_id: usize) -> u64 {
self.data.count_by_id_range(from_row_id, to_row_id)
}
}
@ -1372,10 +1607,10 @@ pub mod metadata {
}
fn update_range(&mut self, v: T) {
match self.range {
match &mut self.range {
Some(range) => {
if v < range.0 {
range.0 = v;
range.0 = v.clone();
}
if v > range.1 {
@ -1383,7 +1618,7 @@ pub mod metadata {
}
}
None => {
self.range = Some((v, v));
self.range = Some((v.clone(), v));
}
}
}
@ -1409,7 +1644,7 @@ pub mod metadata {
}
pub fn maybe_contains_value(&self, v: T) -> bool {
match self.range {
match &self.range {
Some(range) => range.0 <= v && v <= range.1,
None => false,
}

View File

@ -89,29 +89,11 @@ where
///
/// encoded_values should not be called on nullable columns.
fn encoded_values(&self, row_ids: &[usize]) -> Vec<T::Native> {
// assertion here during development to check this isn't called on
// encodings that can have null values.
assert_eq!(self.arr.null_count(), 0);
let mut out = Vec::with_capacity(row_ids.len());
for &row_id in row_ids {
out.push(self.arr.value(row_id));
}
assert_eq!(out.len(), row_ids.len());
out
panic!("encoded_values not implemented yet");
}
fn all_encoded_values(&self) -> Vec<T::Native> {
// assertion here during development to check this isn't called on
// encodings that can have null values.
assert_eq!(self.arr.null_count(), 0);
let mut out = Vec::with_capacity(self.arr.len());
for i in 0..self.arr.len() {
out.push(self.arr.value(i));
}
assert_eq!(out.len(), self.arr.len());
out
panic!("all_encoded_values not implemented yet");
}
// TODO(edd): problem here is returning a slice because we need to own the
@ -304,7 +286,7 @@ where
/// Return the raw encoded values for the provided logical row ids. For Plain
/// encoding this is just the decoded values.
fn encoded_values(&self, row_ids: &[usize]) -> Vec<Self::Item> {
fn encoded_values(&self, row_ids: &[usize]) -> Vec<T> {
let mut out = Vec::with_capacity(row_ids.len());
for chunks in row_ids.chunks_exact(4) {
out.push(self.values[chunks[3]]);
@ -324,7 +306,7 @@ where
/// Return all encoded values. For this encoding this is just the decoded
/// values
fn all_encoded_values(&self) -> Vec<Self::Item> {
fn all_encoded_values(&self) -> Vec<T> {
self.values.clone() // TODO(edd):perf probably can return reference to vec.
}
@ -723,8 +705,8 @@ impl DictionaryRLE {
///
/// TODO(edd): return type is wrong but I'm making it fit
///
pub fn encoded_values(&self, row_ids: &[usize]) -> Vec<i64> {
let mut out: Vec<i64> = Vec::with_capacity(row_ids.len());
pub fn encoded_values(&self, row_ids: &[usize]) -> Vec<u32> {
let mut out = Vec::with_capacity(row_ids.len());
let mut curr_logical_row_id = 0;
@ -746,7 +728,7 @@ impl DictionaryRLE {
}
// this entry covers the row_id we want.
out.push(curr_entry_id as i64);
out.push(curr_entry_id as u32);
curr_logical_row_id += 1;
curr_entry_rl -= 1;
}
@ -757,11 +739,11 @@ impl DictionaryRLE {
// all_encoded_values materialises a vector of all encoded values for the
// column.
pub fn all_encoded_values(&self) -> Vec<i64> {
let mut out: Vec<i64> = Vec::with_capacity(self.total as usize);
pub fn all_encoded_values(&self) -> Vec<u32> {
let mut out = Vec::with_capacity(self.total as usize);
for (idx, rl) in &self.run_lengths {
out.extend(iter::repeat(*idx as i64).take(*rl as usize));
out.extend(iter::repeat(*idx as u32).take(*rl as usize));
}
out
}
@ -863,8 +845,8 @@ mod test {
arr: super::PrimitiveArray::from(vec![Some(2.3), Some(44.56), None]),
};
let encoded = col.all_encoded_values();
assert_eq!(encoded, vec![Some(2.3), Some(44.56), None]);
// let encoded = col.all();
// assert_eq!(encoded, vec![Some(2.3), Some(44.56), None]);
let sum = col.sum_by_id_range(0, 1);
assert_eq!(sum, Some(46.86));

View File

@ -78,7 +78,8 @@ impl Segment {
// TODO(edd) yuk
if name == "time" {
if let column::Column::Integer(ts) = &c {
self.meta.time_range = ts.column_range();
// Right now assumption is ts column has some non-null values
self.meta.time_range = ts.column_range().unwrap();
} else {
panic!("incorrect column type for time");
}
@ -316,12 +317,10 @@ impl Segment {
// filtering stage we will just emit None.
let mut group_itrs = group_column_encoded_values
.iter()
.map(|vector| {
if let column::Vector::Integer(v) = vector {
v.iter()
} else {
panic!("don't support grouping on non-encoded values");
}
.map(|vector| match vector {
column::Vector::Unsigned32(_) => column::VectorIterator::new(vector), // encoded tag columns
column::Vector::Integer(_) => column::VectorIterator::new(vector), // encoded (but actually just raw) timestamp column
_ => panic!("don't support grouping on non-encoded values or timestamps"),
})
.collect::<Vec<_>>();
@ -331,7 +330,10 @@ impl Segment {
let mut aggregate_itrs = aggregate_column_decoded_values
.iter()
.map(|(col_name, values)| match values {
Some(values) => (col_name.as_str(), Some(column::VectorIterator::new(values))),
Some(values) => (
col_name.as_str(),
Some(column::NullVectorIterator::new(values)),
),
None => (col_name.as_str(), None),
})
.collect::<Vec<_>>();
@ -339,7 +341,7 @@ impl Segment {
// hashMap is about 20% faster than BTreeMap in this case
let mut hash_table: BTreeMap<
Vec<i64>,
Vec<(&'a String, &'a AggregateType, Option<column::Aggregate<'_>>)>,
Vec<(&'a String, &'a AggregateType, column::Aggregate<'_>)>,
> = BTreeMap::new();
let mut aggregate_row: Vec<(&str, Option<column::Scalar<'_>>)> =
@ -355,29 +357,50 @@ impl Segment {
group_itrs.iter_mut().enumerate().for_each(|(i, itr)| {
if i == group_itrs_len - 1 && window > 0 {
// time column - apply window function
group_key[i] = itr.next().unwrap() / window * window;
if let Some(column::Value::Scalar(column::Scalar::Integer(v))) = itr.next() {
group_key[i] = v / window * window;
} else {
unreachable!(
"something broken with grouping! Either processed None or wrong type"
);
}
} else if let Some(column::Value::Scalar(column::Scalar::Unsigned32(v))) =
itr.next()
{
group_key[i] = v as i64
} else {
group_key[i] = *itr.next().unwrap();
unreachable!(
"something broken with grouping! Either processed None or wrong type"
);
}
});
// re-use aggregate_row vector.
for (i, &mut (col_name, ref mut itr)) in aggregate_itrs.iter_mut().enumerate() {
match itr {
Some(itr) => aggregate_row[i] = (col_name, itr.next()),
Some(itr) => {
// This is clunky. We don't need to check for the sentinel None value
// to indicate the end of the iterator because we use the guard in
// the while loop to do so.
aggregate_row[i] = (col_name, itr.next().unwrap_or(None));
}
None => aggregate_row[i] = (col_name, None),
}
}
// This is cheaper than allocating a key and using the entry API
if !hash_table.contains_key(&group_key) {
let mut agg_results: Vec<(
&'a String,
&'a AggregateType,
Option<column::Aggregate<'_>>,
)> = Vec::with_capacity(aggregates.len());
let mut agg_results: Vec<(&'a String, &'a AggregateType, column::Aggregate<'_>)> =
Vec::with_capacity(aggregates.len());
for (col_name, agg_type) in aggregates {
agg_results.push((col_name, agg_type, None)); // switch out Aggregate for Option<column::Aggregate>
agg_results.push((
col_name,
agg_type,
match agg_type {
AggregateType::Count => column::Aggregate::Count(0),
AggregateType::Sum => column::Aggregate::Sum(None),
},
));
}
hash_table.insert(group_key.clone(), agg_results);
}
@ -395,28 +418,39 @@ impl Segment {
continue;
}
// TODO(edd): remove unwrap - it should work because we are
// tracking iteration count in loop.
let row_value = row_value.as_ref().unwrap();
match cum_agg_value {
Some(agg) => match agg {
column::Aggregate::Count(cum_count) => {
*cum_count += 1;
}
column::Aggregate::Sum(cum_sum) => {
*cum_sum += row_value;
}
},
None => {
*cum_agg_value = match agg_type {
AggregateType::Count => Some(column::Aggregate::Count(0)),
AggregateType::Sum => {
Some(column::Aggregate::Sum(row_value.clone()))
column::Aggregate::Count(x) => {
*x += 1;
}
column::Aggregate::Sum(v) => {
if let Some(row_value) = row_value {
match v {
Some(x) => {
*x += row_value;
}
None => *v = Some(row_value.clone()),
}
}
}
}
// match cum_agg_value {
// Some(agg) => match agg {
// column::Aggregate::Count(_) => {
// *cum_agg_value = Some(agg + column::Aggregate::Count(Some(1)));
// }
// column::Aggregate::Sum(cum_sum) => {
// *cum_sum += row_value;
// }
// },
// None => {
// *cum_agg_value = match agg_type {
// AggregateType::Count => Some(column::Aggregate::Count(Some(0))),
// AggregateType::Sum => {
// Some(column::Aggregate::Sum(row_value.clone()))
// }
// }
// }
// }
}
}
processed_rows += 1;
@ -757,10 +791,6 @@ impl Segment {
}
// Returns the count aggregate for a given column name.
//
// Since we guarantee to provide row ids for the segment, and all columns
// have the same number of logical rows, the count is just the number of
// requested logical rows.
pub fn count_column(&self, name: &str, row_ids: &mut croaring::Bitmap) -> Option<u64> {
if self.column(name).is_some() {
return Some(row_ids.cardinality() as u64);
@ -899,8 +929,8 @@ impl Segment {
aggs.push((
(col_name.to_string(), agg.clone()),
column::Aggregate::Sum(
self.sum_column(col_name, &mut filtered_row_ids).unwrap(),
), // assuming no non-null group keys
self.sum_column(col_name, &mut filtered_row_ids),
),
));
}
AggregateType::Count => {
@ -908,7 +938,7 @@ impl Segment {
(col_name.to_string(), agg.clone()),
column::Aggregate::Count(
self.count_column(col_name, &mut filtered_row_ids).unwrap(),
), // assuming no non-null group keys
),
));
}
}
@ -1392,7 +1422,7 @@ impl<'a> Segments<'a> {
// first find the logical row id of the minimum timestamp value
if let Column::Integer(ts_col) = &segment.columns[segment.time_column_idx] {
// TODO(edd): clean up unwrap
let min_ts = ts_col.column_range().0;
let min_ts = ts_col.column_range().unwrap().0;
assert_eq!(min_ts, segment.meta.time_range.0);
let min_ts_id = ts_col.row_id_eq_value(min_ts).unwrap();
@ -1424,7 +1454,7 @@ impl<'a> Segments<'a> {
// first find the logical row id of the minimum timestamp value
if let Column::Integer(ts_col) = &segment.columns[segment.time_column_idx] {
// TODO(edd): clean up unwrap
let max_ts = ts_col.column_range().1;
let max_ts = ts_col.column_range().unwrap().1;
assert_eq!(max_ts, segment.meta.time_range.1);
let max_ts_id = ts_col.row_id_eq_value(max_ts).unwrap();

View File

@ -39,7 +39,7 @@ pub enum Error {
/// comparison scan performed on them to ensure they're not already sorted.
const SORTED_CHECK_SIZE: usize = 1000;
/// Sort a slice of `Packers` based on the provided column indexes.
/// Sort a slice of `Vector` based on the provided column indexes.
///
/// All chosen columns will be sorted in ascending order; the sort is *not*
/// stable.
@ -77,9 +77,6 @@ pub fn sort(vectors: &mut [column::Vector<'_>], sort_by: &[usize]) -> Result<(),
log::debug!("columns already sorted");
return Ok(());
}
// if vectors_sorted_asc(vectors, n, sort_by) {
// return Ok(());
// }
}
let now = std::time::Instant::now();
quicksort_by(vectors, 0..n - 1, sort_by);
@ -136,7 +133,7 @@ fn partition(vectors: &mut [column::Vector<'_>], range: &Range<usize>, sort_by:
fn cmp(vectors: &[column::Vector<'_>], a: usize, b: usize, sort_by: &[usize]) -> Ordering {
for &idx in sort_by {
match &vectors[idx] {
column::Vector::String(p) => {
column::Vector::NullString(p) => {
let cmp = p.get(a).cmp(&p.get(b));
if cmp != Ordering::Equal {
return cmp;
@ -150,7 +147,7 @@ fn cmp(vectors: &[column::Vector<'_>], a: usize, b: usize, sort_by: &[usize]) ->
}
// if cmp equal then try next vector.
}
_ => continue, // don't compare on non-string / timestamp cols
_ => unimplemented!("todo!"), // don't compare on non-string / timestamp cols
}
}
Ordering::Equal
@ -161,7 +158,7 @@ fn vectors_sorted_asc(vectors: &[column::Vector<'_>], len: usize, sort_by: &[usi
'row_wise: for i in 1..len {
for &idx in sort_by {
match &vectors[idx] {
column::Vector::String(vec) => {
column::Vector::NullString(vec) => {
if vec[i - 1] < vec[i] {
continue 'row_wise;
} else if vec[i - 1] == vec[i] {
@ -183,7 +180,7 @@ fn vectors_sorted_asc(vectors: &[column::Vector<'_>], len: usize, sort_by: &[usi
return false;
}
}
_ => continue, // don't compare on non-string / timestamp cols
_ => unimplemented!("todo!"), // don't compare on non-string / timestamp cols
}
}
}