feat: group by sorting
parent
231f429a56
commit
a5a8667a42
|
@ -769,6 +769,8 @@ dependencies = [
|
|||
"croaring",
|
||||
"crossbeam",
|
||||
"delorean_table",
|
||||
"env_logger",
|
||||
"log",
|
||||
"snafu",
|
||||
]
|
||||
|
||||
|
@ -1596,9 +1598,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "log"
|
||||
version = "0.4.8"
|
||||
version = "0.4.11"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "14b6052be84e6b71ab17edffc2eeabf5c2c3ae1fdb464aae35ac50c67a44e1f7"
|
||||
checksum = "4fabed175da42fed1fa0746b0ea71f412aa9d35e76e95e59b192c64b9dc2bf8b"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
]
|
||||
|
|
|
@ -7,9 +7,6 @@ edition = "2018"
|
|||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
|
||||
[profile.release]
|
||||
debug = true
|
||||
|
||||
[dependencies]
|
||||
delorean_table = { path = "../delorean_table" }
|
||||
arrow = { git = "https://github.com/apache/arrow.git", rev="aa6889a74c57d6faea0d27ea8013d9b0c7ef809a", version = "2.0.0-SNAPSHOT" }
|
||||
|
@ -17,6 +14,8 @@ snafu = "0.6.8"
|
|||
croaring = "0.4.5"
|
||||
crossbeam = "0.7.3"
|
||||
chrono = "0.4"
|
||||
log = "0.4.11"
|
||||
env_logger = "0.7.1"
|
||||
|
||||
[dev-dependencies]
|
||||
|
||||
|
|
|
@ -24,6 +24,8 @@ pub enum Error {
|
|||
}
|
||||
|
||||
fn main() {
|
||||
env_logger::init();
|
||||
|
||||
let r = File::open(Path::new("/Users/edd/work/InfluxData/delorean_misc/in-memory-sort/env_role_path_time/http_api_requests_total.arrow")).unwrap();
|
||||
let reader = ipc::reader::StreamReader::try_new(r).unwrap();
|
||||
|
||||
|
@ -36,7 +38,7 @@ fn main() {
|
|||
store.size(),
|
||||
);
|
||||
|
||||
// time_group_by_agg(&store);
|
||||
time_group_by_agg(&store);
|
||||
|
||||
// time_column_min_time(&store);
|
||||
// time_column_max_time(&store);
|
||||
|
@ -107,20 +109,20 @@ fn main() {
|
|||
// println!("ROWS ({}) {:?}", v, v.len());
|
||||
// }
|
||||
|
||||
loop {
|
||||
let now = std::time::Instant::now();
|
||||
let segments = store.segments();
|
||||
let groups = segments.read_group_eq(
|
||||
(0, 1890040790000000),
|
||||
&[],
|
||||
vec!["env".to_string(), "role".to_string()],
|
||||
vec![
|
||||
("counter".to_string(), Aggregate::Sum),
|
||||
// ("counter".to_string(), Aggregate::Count),
|
||||
],
|
||||
);
|
||||
println!("{:?} {:?}", groups, now.elapsed());
|
||||
}
|
||||
// loop {
|
||||
// let now = std::time::Instant::now();
|
||||
// let segments = store.segments();
|
||||
// let groups = segments.read_group_eq(
|
||||
// (0, 1590044410000000),
|
||||
// &[],
|
||||
// vec!["env".to_string(), "role".to_string()],
|
||||
// vec![
|
||||
// ("counter".to_string(), Aggregate::Sum),
|
||||
// // ("counter".to_string(), Aggregate::Count),
|
||||
// ],
|
||||
// );
|
||||
// println!("{:?} {:?}", groups, now.elapsed());
|
||||
// }
|
||||
|
||||
// loop {
|
||||
// let mut total_count = 0.0;
|
||||
|
@ -159,12 +161,12 @@ fn build_store(
|
|||
mut reader: arrow::ipc::reader::StreamReader<File>,
|
||||
store: &mut Store,
|
||||
) -> Result<(), Error> {
|
||||
let mut i = 0;
|
||||
// let mut i = 0;
|
||||
while let Some(rb) = reader.next_batch().unwrap() {
|
||||
if i < 363 {
|
||||
i += 1;
|
||||
continue;
|
||||
}
|
||||
// if i < 363 {
|
||||
// i += 1;
|
||||
// continue;
|
||||
// }
|
||||
let segment = convert_record_batch(rb)?;
|
||||
store.add_segment(segment);
|
||||
}
|
||||
|
@ -402,7 +404,7 @@ fn time_row_by_preds(store: &Store) {
|
|||
}
|
||||
|
||||
fn time_group_by_agg(store: &Store) {
|
||||
let repeat = 100;
|
||||
let repeat = 10;
|
||||
let mut total_time: std::time::Duration = std::time::Duration::new(0, 0);
|
||||
let mut total_max = 0;
|
||||
let segments = store.segments();
|
||||
|
@ -411,8 +413,8 @@ fn time_group_by_agg(store: &Store) {
|
|||
|
||||
let groups = segments.read_group_eq(
|
||||
(0, 1590044410000000),
|
||||
&[("method", Some(&column::Scalar::String("GET")))],
|
||||
vec!["env".to_string(), "status".to_string()],
|
||||
&[],
|
||||
vec!["status".to_string(), "method".to_string()],
|
||||
vec![
|
||||
("counter".to_string(), Aggregate::Sum),
|
||||
// ("counter".to_string(), Aggregate::Count),
|
||||
|
|
|
@ -9,6 +9,44 @@ pub enum Scalar<'a> {
|
|||
Integer(i64),
|
||||
}
|
||||
|
||||
impl<'a> Scalar<'a> {
|
||||
pub fn reset(&mut self) {
|
||||
match self {
|
||||
Scalar::String(s) => {
|
||||
panic!("not supported");
|
||||
}
|
||||
Scalar::Float(v) => {
|
||||
*v = 0.0;
|
||||
}
|
||||
Scalar::Integer(v) => {
|
||||
*v = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn add(&mut self, other: Scalar<'a>) {
|
||||
match self {
|
||||
Self::Float(v) => {
|
||||
if let Self::Float(other) = other {
|
||||
*v += other;
|
||||
} else {
|
||||
panic!("invalid");
|
||||
};
|
||||
}
|
||||
Self::Integer(v) => {
|
||||
if let Self::Integer(other) = other {
|
||||
*v += other;
|
||||
} else {
|
||||
panic!("invalid");
|
||||
};
|
||||
}
|
||||
Self::String(_) => {
|
||||
unreachable!("not possible to add strings");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> std::ops::Add<&Scalar<'a>> for Scalar<'a> {
|
||||
type Output = Scalar<'a>;
|
||||
|
||||
|
@ -65,6 +103,30 @@ pub enum Aggregate<'a> {
|
|||
Sum(Scalar<'a>),
|
||||
}
|
||||
|
||||
impl<'a> Aggregate<'a> {
|
||||
pub fn update_with(&mut self, other: Scalar<'a>) {
|
||||
match self {
|
||||
Self::Count(v) => {
|
||||
*v = *v + 1;
|
||||
}
|
||||
Self::Sum(v) => {
|
||||
v.add(other);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> std::ops::Add<Scalar<'a>> for Aggregate<'a> {
|
||||
type Output = Aggregate<'a>;
|
||||
|
||||
fn add(self, _rhs: Scalar<'a>) -> Self::Output {
|
||||
match self {
|
||||
Self::Count(c) => Self::Count(c + 1),
|
||||
Self::Sum(s) => Self::Sum(s + &_rhs),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> std::ops::Add<&Aggregate<'a>> for Aggregate<'a> {
|
||||
type Output = Aggregate<'a>;
|
||||
|
||||
|
@ -170,17 +232,31 @@ impl<'a> Vector<'a> {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn swap(&mut self, a: usize, b: usize) {
|
||||
match self {
|
||||
Self::String(v) => {
|
||||
v.swap(a, b);
|
||||
}
|
||||
Self::Float(v) => {
|
||||
v.swap(a, b);
|
||||
}
|
||||
Self::Integer(v) => {
|
||||
v.swap(a, b);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// VectorIterator allows a `Vector` to be iterated. Until vectors are drained
|
||||
/// Scalar values are emitted.
|
||||
pub struct VectorIterator<'a> {
|
||||
v: Vector<'a>,
|
||||
v: &'a Vector<'a>,
|
||||
next_i: usize,
|
||||
}
|
||||
|
||||
impl<'a> VectorIterator<'a> {
|
||||
pub fn new(v: Vector<'a>) -> Self {
|
||||
pub fn new(v: &'a Vector<'a>) -> Self {
|
||||
Self { v, next_i: 0 }
|
||||
}
|
||||
}
|
||||
|
@ -290,7 +366,7 @@ impl Column {
|
|||
|
||||
let now = std::time::Instant::now();
|
||||
let v = c.values(row_ids);
|
||||
println!("time getting decoded values for float {:?}", now.elapsed());
|
||||
log::debug!("time getting decoded values for float {:?}", now.elapsed());
|
||||
|
||||
Vector::Float(v)
|
||||
}
|
||||
|
@ -301,7 +377,7 @@ impl Column {
|
|||
|
||||
let now = std::time::Instant::now();
|
||||
let v = c.values(row_ids);
|
||||
println!("time getting decoded values for int {:?}", now.elapsed());
|
||||
log::debug!("time getting decoded values for int {:?}", now.elapsed());
|
||||
Vector::Integer(v)
|
||||
}
|
||||
}
|
||||
|
@ -359,7 +435,7 @@ impl Column {
|
|||
.iter()
|
||||
.map(|v| *v as usize)
|
||||
.collect::<Vec<_>>();
|
||||
println!("time unpacking bitmap {:?}", now.elapsed());
|
||||
log::debug!("time unpacking bitmap {:?}", now.elapsed());
|
||||
|
||||
match self {
|
||||
Column::String(c) => {
|
||||
|
@ -369,7 +445,7 @@ impl Column {
|
|||
|
||||
let now = std::time::Instant::now();
|
||||
let v = c.encoded_values(&row_ids_vec);
|
||||
println!("time getting encoded values {:?}", now.elapsed());
|
||||
log::debug!("time getting encoded values {:?}", now.elapsed());
|
||||
Vector::Integer(v)
|
||||
}
|
||||
Column::Float(c) => {
|
||||
|
@ -400,7 +476,9 @@ impl Column {
|
|||
|
||||
let now = std::time::Instant::now();
|
||||
let v = c.encoded_values(&row_ids);
|
||||
println!("time getting encoded values {:?}", now.elapsed());
|
||||
log::debug!("time getting encoded values {:?}", now.elapsed());
|
||||
|
||||
log::debug!("dictionary {:?}", c.data.dictionary());
|
||||
Vector::Integer(v)
|
||||
}
|
||||
Column::Float(c) => {
|
||||
|
@ -448,7 +526,7 @@ impl Column {
|
|||
.iter()
|
||||
.map(|v| *v as usize)
|
||||
.collect::<Vec<_>>();
|
||||
println!("time unpacking bitmap {:?}", now.elapsed());
|
||||
log::debug!("time unpacking bitmap {:?}", now.elapsed());
|
||||
|
||||
assert!(
|
||||
row_ids_vec.len() == 1 || row_ids_vec[row_ids_vec.len() - 1] > row_ids_vec[0],
|
||||
|
@ -1060,9 +1138,11 @@ pub mod metadata {
|
|||
|
||||
pub fn maybe_contains_value(&self, v: f64) -> bool {
|
||||
let res = self.range.0 <= v && v <= self.range.1;
|
||||
println!(
|
||||
log::debug!(
|
||||
"column with ({:?}) maybe contain {:?} -- {:?}",
|
||||
self.range, v, res
|
||||
self.range,
|
||||
v,
|
||||
res
|
||||
);
|
||||
res
|
||||
}
|
||||
|
|
|
@ -297,7 +297,6 @@ impl DictionaryRLE {
|
|||
|
||||
_self.run_lengths.push((next_idx, 0)); // could this cause a bug?ta
|
||||
}
|
||||
|
||||
_self
|
||||
}
|
||||
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
pub mod column;
|
||||
pub mod encoding;
|
||||
pub mod segment;
|
||||
pub mod sorter;
|
||||
|
||||
use segment::{Segment, Segments};
|
||||
|
||||
|
|
|
@ -238,7 +238,7 @@ impl Segment {
|
|||
// are aggregating on. For columns that have no matching rows from the
|
||||
// filtering stage we will just emit None.
|
||||
let mut aggregate_itrs = aggregate_column_decoded_values
|
||||
.into_iter()
|
||||
.iter()
|
||||
.map(|(col_name, values)| match values {
|
||||
Some(values) => (col_name.as_str(), Some(column::VectorIterator::new(values))),
|
||||
None => (col_name.as_str(), None),
|
||||
|
@ -329,7 +329,7 @@ impl Segment {
|
|||
}
|
||||
processed_rows += 1;
|
||||
}
|
||||
// println!("{:?}", hash_table.len());
|
||||
log::debug!("{:?}", hash_table);
|
||||
BTreeMap::new()
|
||||
}
|
||||
|
||||
|
@ -339,7 +339,7 @@ impl Segment {
|
|||
predicates: &[(&str, Option<&column::Scalar>)],
|
||||
group_columns: &[String],
|
||||
aggregates: &[(String, Aggregate)],
|
||||
) -> BTreeMap<Vec<String>, Vec<(String, Option<column::Aggregate>)>> {
|
||||
) -> BTreeMap<Vec<&i64>, Vec<(String, column::Aggregate)>> {
|
||||
// filter on predicates and time
|
||||
let filtered_row_ids: croaring::Bitmap;
|
||||
if let Some(row_ids) = self.filter_by_predicates_eq(time_range, predicates) {
|
||||
|
@ -361,24 +361,17 @@ impl Segment {
|
|||
let mut group_column_encoded_values = Vec::with_capacity(group_columns.len());
|
||||
for group_column in group_columns {
|
||||
if let Some(column) = self.column(&group_column) {
|
||||
let encoded_values: delorean_table::Packer<i64>;
|
||||
if let column::Vector::Integer(vector) =
|
||||
column.encoded_values(&filtered_row_ids_vec)
|
||||
{
|
||||
encoded_values = delorean_table::Packer::from(vector);
|
||||
} else {
|
||||
unimplemented!("currently you can only group on encoded string columns");
|
||||
}
|
||||
|
||||
let encoded_values = column.encoded_values(&filtered_row_ids_vec);
|
||||
assert_eq!(
|
||||
filtered_row_ids.cardinality() as usize,
|
||||
encoded_values.num_rows()
|
||||
encoded_values.len()
|
||||
);
|
||||
group_column_encoded_values.push(Some(encoded_values));
|
||||
} else {
|
||||
group_column_encoded_values.push(None);
|
||||
}
|
||||
}
|
||||
let group_col_sort_order = &(0..group_columns.len()).collect::<Vec<_>>();
|
||||
// println!("grouped columns {:?}", group_column_encoded_values);
|
||||
|
||||
// TODO(edd): we could do this with an iterator I expect.
|
||||
|
@ -388,15 +381,10 @@ impl Segment {
|
|||
let mut aggregate_column_decoded_values = Vec::with_capacity(aggregates.len());
|
||||
for (column_name, _) in aggregates {
|
||||
if let Some(column) = self.column(&column_name) {
|
||||
let decoded_values = match column.values(&filtered_row_ids_vec) {
|
||||
column::Vector::String(_) => unreachable!("not supported"),
|
||||
column::Vector::Float(v) => delorean_table::Packers::from(v),
|
||||
column::Vector::Integer(v) => delorean_table::Packers::from(v),
|
||||
};
|
||||
|
||||
let decoded_values = column.values(&filtered_row_ids_vec);
|
||||
assert_eq!(
|
||||
filtered_row_ids.cardinality() as usize,
|
||||
decoded_values.num_rows()
|
||||
decoded_values.len()
|
||||
);
|
||||
aggregate_column_decoded_values.push((column_name, Some(decoded_values)));
|
||||
} else {
|
||||
|
@ -410,7 +398,7 @@ impl Segment {
|
|||
|
||||
for gc in group_column_encoded_values {
|
||||
if let Some(p) = gc {
|
||||
all_columns.push(delorean_table::Packers::Integer(p));
|
||||
all_columns.push(p);
|
||||
} else {
|
||||
panic!("need to handle no results for filtering/grouping...");
|
||||
}
|
||||
|
@ -426,142 +414,106 @@ impl Segment {
|
|||
|
||||
// now sort on the first grouping columns. Right now the order doesn't matter...
|
||||
let now = std::time::Instant::now();
|
||||
delorean_table::sorter::sort(&mut all_columns, &[0, 1]).unwrap();
|
||||
println!("time checking sort {:?}", now.elapsed());
|
||||
super::sorter::sort(&mut all_columns, group_col_sort_order).unwrap();
|
||||
log::debug!("time checking sort {:?}", now.elapsed());
|
||||
|
||||
let mut group_itrs = all_columns
|
||||
.iter()
|
||||
.take(group_columns.len()) // only use grouping columns
|
||||
.map(|x| match x {
|
||||
delorean_table::Packers::Integer(p) => p.iter(),
|
||||
_ => {
|
||||
panic!("not here {:?} ", x);
|
||||
.map(|vector| {
|
||||
if let column::Vector::Integer(v) = vector {
|
||||
v.iter()
|
||||
} else {
|
||||
panic!("don't support grouping on non-encoded values");
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let mut aggregate_itrs = all_columns
|
||||
.iter()
|
||||
.skip(group_columns.len()) // only use grouping columns
|
||||
.map(|v| column::VectorIterator::new(v))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// this tracks the last seen group key row. When it changes we can emit
|
||||
// the grouped aggregates.
|
||||
let mut last_group_row = group_itrs
|
||||
.iter_mut()
|
||||
.map(|itr| itr.next().unwrap())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// now we have all the matching rows for each grouping column and each aggregation
|
||||
// column. Materialised values for grouping are in encoded form.
|
||||
//
|
||||
// Next we sort all columns according to the group key.
|
||||
// let mut all_columns = vec![];
|
||||
// for
|
||||
// delorean_table::packers::Packers
|
||||
// // First we will build a collection of iterators over the columns we
|
||||
// // are grouping on. For columns that have no matching rows from the
|
||||
// // filtering stage we will just emit None.
|
||||
let mut curr_group_row = last_group_row.clone();
|
||||
|
||||
// // Next we will build a collection of iterators over the columns we
|
||||
// // are aggregating on. For columns that have no matching rows from the
|
||||
// // filtering stage we will just emit None.
|
||||
// let mut aggregate_itrs = aggregate_column_decoded_values
|
||||
// .into_iter()
|
||||
// .map(|(col_name, values)| match values {
|
||||
// Some(values) => (col_name.as_str(), Some(column::VectorIterator::new(values))),
|
||||
// None => (col_name.as_str(), None),
|
||||
// })
|
||||
// .collect::<Vec<_>>();
|
||||
|
||||
// let mut hash_table: HashMap<
|
||||
// Vec<Option<&i64>>,
|
||||
// Vec<(&String, &Aggregate, Option<column::Aggregate>)>,
|
||||
// > = HashMap::with_capacity(30000);
|
||||
|
||||
let mut aggregate_row: Vec<(&str, Option<column::Scalar>)> = agg
|
||||
|
||||
let mut processed_rows = 0;
|
||||
let last_group_row: Vec<Option<&i64>> = group_itrs
|
||||
// this tracks the last row for each column we are aggregating.
|
||||
let last_agg_row: Vec<column::Scalar> = aggregate_itrs
|
||||
.iter_mut()
|
||||
.map(|itr| itr.next().unwrap())
|
||||
.collect();
|
||||
|
||||
while processed_rows < *total_rows {
|
||||
// let group_row: Vec<Option<&i64>> = group_itrs
|
||||
// .iter_mut()
|
||||
// .map(|x| match x {
|
||||
// Some(itr) => itr.next().unwrap(),
|
||||
// None => None,
|
||||
// })
|
||||
// .collect();
|
||||
// this keeps the current cumulative aggregates for the columns we
|
||||
// are aggregating.
|
||||
let mut cum_aggregates: Vec<(String, column::Aggregate)> = aggregates
|
||||
.iter()
|
||||
.zip(last_agg_row.iter())
|
||||
.map(|((col_name, agg_type), curr_agg)| {
|
||||
let agg = match agg_type {
|
||||
Aggregate::Count => column::Aggregate::Count(1),
|
||||
Aggregate::Sum => column::Aggregate::Sum(curr_agg.clone()),
|
||||
};
|
||||
(col_name.clone(), agg)
|
||||
})
|
||||
.collect();
|
||||
|
||||
// check if group key has changed
|
||||
for (&curr_v, itr) in last_group_row.iter().zip(&mut group_itrs) {
|
||||
if curr_v != itr.next().unwrap() {
|
||||
// group key changed
|
||||
let mut results = BTreeMap::new();
|
||||
let mut processed_rows = 1;
|
||||
while processed_rows < *total_rows {
|
||||
// update next group key.
|
||||
let mut group_key_changed = false;
|
||||
for (curr_v, itr) in curr_group_row.iter_mut().zip(group_itrs.iter_mut()) {
|
||||
let next_v = itr.next().unwrap();
|
||||
if curr_v != &next_v {
|
||||
group_key_changed = true;
|
||||
}
|
||||
*curr_v = next_v;
|
||||
}
|
||||
|
||||
// group key changed - emit group row and aggregates.
|
||||
if group_key_changed {
|
||||
let key = last_group_row.clone();
|
||||
results.insert(key, cum_aggregates.clone());
|
||||
|
||||
// update group key
|
||||
last_group_row = curr_group_row.clone();
|
||||
|
||||
// reset cumulative aggregates
|
||||
for (_, agg) in cum_aggregates.iter_mut() {
|
||||
match agg {
|
||||
column::Aggregate::Count(c) => {
|
||||
*c = 0;
|
||||
}
|
||||
column::Aggregate::Sum(s) => s.reset(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// group key is the same - update aggregates
|
||||
// update aggregates
|
||||
for bind in cum_aggregates.iter_mut().zip(&mut aggregate_itrs) {
|
||||
let (_, curr_agg) = bind.0;
|
||||
let next_value = bind.1.next().unwrap();
|
||||
curr_agg.update_with(next_value);
|
||||
}
|
||||
|
||||
// // let aggregate_row: Vec<(&str, Option<column::Scalar>)> = aggregate_itrs
|
||||
// // .iter_mut()
|
||||
// // .map(|&mut (col_name, ref mut itr)| match itr {
|
||||
// // Some(itr) => (col_name, itr.next()),
|
||||
// // None => (col_name, None),
|
||||
// // })
|
||||
// // .collect();
|
||||
|
||||
// // re-use aggregate_row vector.
|
||||
// for (i, &mut (col_name, ref mut itr)) in aggregate_itrs.iter_mut().enumerate() {
|
||||
// match itr {
|
||||
// Some(itr) => aggregate_row[i] = (col_name, itr.next()),
|
||||
// None => aggregate_row[i] = (col_name, None),
|
||||
// }
|
||||
// }
|
||||
|
||||
// // Lookup the group key in the hash map - if it's empty then insert
|
||||
// // a place-holder for each aggregate being executed.
|
||||
// let group_key_entry = hash_table.entry(group_row).or_insert_with(|| {
|
||||
// // TODO COULD BE MAP/COLLECT
|
||||
// let mut agg_results: Vec<(&String, &Aggregate, Option<column::Aggregate>)> =
|
||||
// Vec::with_capacity(aggregates.len());
|
||||
// for (col_name, agg_type) in aggregates {
|
||||
// agg_results.push((col_name, agg_type, None)); // switch out Aggregate for Option<column::Aggregate>
|
||||
// }
|
||||
// agg_results
|
||||
// });
|
||||
|
||||
// // Update aggregates - we process each row value and for each one
|
||||
// // check which aggregates apply to it.
|
||||
// //
|
||||
// // TODO(edd): this is probably a bit of a perf suck.
|
||||
// for (col_name, row_value) in &aggregate_row {
|
||||
// for &mut (cum_col_name, agg_type, ref mut cum_agg_value) in
|
||||
// group_key_entry.iter_mut()
|
||||
// {
|
||||
// if col_name != cum_col_name {
|
||||
// continue;
|
||||
// }
|
||||
|
||||
// // TODO(edd): remove unwrap - it should work because we are
|
||||
// // tracking iteration count in loop.
|
||||
// let row_value = row_value.as_ref().unwrap();
|
||||
|
||||
// match cum_agg_value {
|
||||
// Some(agg) => match agg {
|
||||
// column::Aggregate::Count(cum_count) => {
|
||||
// *cum_count += 1;
|
||||
// }
|
||||
// column::Aggregate::Sum(cum_sum) => {
|
||||
// *cum_sum += row_value;
|
||||
// }
|
||||
// },
|
||||
// None => {
|
||||
// *cum_agg_value = match agg_type {
|
||||
// Aggregate::Count => Some(column::Aggregate::Count(0)),
|
||||
// Aggregate::Sum => Some(column::Aggregate::Sum(row_value.clone())),
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
processed_rows += 1;
|
||||
}
|
||||
// println!("{:?}", hash_table.len());
|
||||
|
||||
// Emit final row
|
||||
results.insert(last_group_row, cum_aggregates);
|
||||
|
||||
log::debug!("{:?}", results);
|
||||
// results
|
||||
BTreeMap::new()
|
||||
}
|
||||
|
||||
pub fn sum_column(&self, name: &str, row_ids: &mut croaring::Bitmap) -> Option<column::Scalar> {
|
||||
if let Some(c) = self.column(name) {
|
||||
return c.sum_by_ids(row_ids);
|
||||
|
@ -734,7 +686,7 @@ impl Segment {
|
|||
} else {
|
||||
// In this case there are grouped values in the column with no
|
||||
// rows falling into time-range/predicate set.
|
||||
println!(
|
||||
log::error!(
|
||||
"grouped value {:?} has no rows in time-range/predicate set",
|
||||
group_key_value
|
||||
);
|
||||
|
@ -742,7 +694,7 @@ impl Segment {
|
|||
}
|
||||
} else {
|
||||
// segment doesn't have the column so can't group on it.
|
||||
println!("don't have column - can't group");
|
||||
log::error!("don't have column - can't group");
|
||||
}
|
||||
grouped_results
|
||||
}
|
||||
|
@ -868,26 +820,36 @@ impl<'a> Segments<'a> {
|
|||
panic!("max <= min");
|
||||
}
|
||||
|
||||
//
|
||||
// TODO - just need to sum up the aggregates within each segment here to get
|
||||
// the final result.
|
||||
//
|
||||
for segment in &self.segments {
|
||||
// // segment.aggregate_by_group_with_hash(
|
||||
// // time_range,
|
||||
// // predicates,
|
||||
// // &group_columns,
|
||||
// // &aggregates,
|
||||
// // );
|
||||
|
||||
segment.aggregate_by_group_with_sort(
|
||||
let now = std::time::Instant::now();
|
||||
segment.aggregate_by_group_with_hash(
|
||||
time_range,
|
||||
predicates,
|
||||
&group_columns,
|
||||
&aggregates,
|
||||
);
|
||||
|
||||
// segment.aggregate_by_group_with_sort(
|
||||
// time_range,
|
||||
// predicates,
|
||||
// &group_columns,
|
||||
// &aggregates,
|
||||
// );
|
||||
log::info!(
|
||||
"processed segment {:?} in {:?}",
|
||||
segment.time_range(),
|
||||
now.elapsed()
|
||||
)
|
||||
}
|
||||
|
||||
// let group_columns_arc = std::sync::Arc::new(group_columns);
|
||||
// let aggregates_arc = std::sync::Arc::new(aggregates);
|
||||
|
||||
// for chunked_segments in self.segments.chunks(12) {
|
||||
// for chunked_segments in self.segments.chunks(16) {
|
||||
// crossbeam::scope(|scope| {
|
||||
// for segment in chunked_segments {
|
||||
// let group_columns = group_columns_arc.clone();
|
||||
|
@ -906,38 +868,20 @@ impl<'a> Segments<'a> {
|
|||
// .unwrap();
|
||||
// }
|
||||
|
||||
// let rem = self.segments.len() % 16;
|
||||
// for segment in &self.segments[self.segments.len() - rem..] {
|
||||
// segment.aggregate_by_group_with_sort(
|
||||
// time_range,
|
||||
// predicates,
|
||||
// &group_columns_arc.clone(),
|
||||
// &aggregates_arc.clone(),
|
||||
// );
|
||||
// }
|
||||
|
||||
// TODO(edd): merge results - not expensive really...
|
||||
let mut cum_results: BTreeMap<Vec<String>, Vec<((String, Aggregate), column::Aggregate)>> =
|
||||
BTreeMap::new();
|
||||
|
||||
// for segment in &self.segments {
|
||||
// let segment_results = segment.group_agg_by_predicate_eq(
|
||||
// time_range,
|
||||
// predicates,
|
||||
// &group_columns,
|
||||
// &aggregates,
|
||||
// );
|
||||
|
||||
// for (k, segment_aggs) in segment_results {
|
||||
// // assert_eq!(v.len(), aggregates.len());
|
||||
// let cum_result = cum_results.get_mut(&k);
|
||||
// match cum_result {
|
||||
// Some(cum) => {
|
||||
// assert_eq!(cum.len(), segment_aggs.len());
|
||||
// // In this case we need to aggregate the aggregates from
|
||||
// // each segment.
|
||||
// for i in 0..cum.len() {
|
||||
// // TODO(edd): this is more expensive than necessary
|
||||
// cum[i] = (cum[i].0.clone(), cum[i].1.clone() + &segment_aggs[i].1);
|
||||
// }
|
||||
// }
|
||||
// None => {
|
||||
// cum_results.insert(k, segment_aggs);
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
|
||||
// // columns
|
||||
cum_results
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,197 @@
|
|||
//! The sorter module provides a sort function which will sort a collection of
|
||||
//! `Packer` columns by arbitrary columns. All sorting is done in ascending
|
||||
//! order.
|
||||
//!
|
||||
//! `sorter::sort` implements Quicksort using Hoare's partitioning scheme (how
|
||||
//! you choose the pivot). This partitioning scheme typically significantly
|
||||
//! reduces the number of swaps necessary but it does have some drawbacks.
|
||||
//!
|
||||
//! Firstly, the worse case runtime of this implementation is `O(n^2)` when the
|
||||
//! input set of columns are sorted according to the desired sort order. To
|
||||
//! avoid that behaviour, a heuristic is used for inputs over a certain size;
|
||||
//! large inputs are first linearly scanned to determine if the input is already
|
||||
//! sorted.
|
||||
//!
|
||||
//! Secondly, the sort produced using this partitioning scheme is not stable.
|
||||
//!
|
||||
use std::cmp::Ordering;
|
||||
use std::collections::BTreeSet;
|
||||
use std::ops::Range;
|
||||
|
||||
use snafu::ensure;
|
||||
use snafu::Snafu;
|
||||
|
||||
use super::column;
|
||||
|
||||
#[derive(Snafu, Debug, Clone, Copy, PartialEq)]
|
||||
pub enum Error {
|
||||
#[snafu(display(r#"Too many sort columns specified"#))]
|
||||
TooManyColumns,
|
||||
|
||||
#[snafu(display(r#"Same column specified as sort column multiple times"#))]
|
||||
RepeatedColumns { index: usize },
|
||||
|
||||
#[snafu(display(r#"Specified column index is out bounds"#))]
|
||||
OutOfBoundsColumn { index: usize },
|
||||
}
|
||||
|
||||
/// Any Packers inputs with more than this many rows will have a linear
|
||||
/// comparison scan performed on them to ensure they're not already sorted.
|
||||
const SORTED_CHECK_SIZE: usize = 1000;
|
||||
|
||||
/// Sort a slice of `Packers` based on the provided column indexes.
|
||||
///
|
||||
/// All chosen columns will be sorted in ascending order; the sort is *not*
|
||||
/// stable.
|
||||
pub fn sort(vectors: &mut [column::Vector], sort_by: &[usize]) -> Result<(), Error> {
|
||||
if vectors.is_empty() || sort_by.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
ensure!(sort_by.len() <= vectors.len(), TooManyColumns);
|
||||
|
||||
let mut col_set = BTreeSet::new();
|
||||
for &index in sort_by {
|
||||
ensure!(col_set.insert(index), RepeatedColumns { index });
|
||||
}
|
||||
|
||||
// TODO(edd): map first/last still unstable https://github.com/rust-lang/rust/issues/62924
|
||||
if let Some(index) = col_set.range(vectors.len()..).next() {
|
||||
return OutOfBoundsColumn { index: *index }.fail();
|
||||
}
|
||||
|
||||
// Hoare's partitioning scheme can have quadratic runtime behaviour in
|
||||
// the worst case when the inputs are already sorted. To avoid this, a
|
||||
// check is added for large inputs.
|
||||
let n = vectors[0].len();
|
||||
if n > SORTED_CHECK_SIZE {
|
||||
let mut sorted = true;
|
||||
for i in 1..n {
|
||||
if cmp(vectors, i - 1, i, sort_by) == Ordering::Greater {
|
||||
sorted = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if sorted {
|
||||
log::debug!("columns already sorted");
|
||||
return Ok(());
|
||||
}
|
||||
// if vectors_sorted_asc(vectors, n, sort_by) {
|
||||
// return Ok(());
|
||||
// }
|
||||
}
|
||||
let now = std::time::Instant::now();
|
||||
quicksort_by(vectors, 0..n - 1, sort_by);
|
||||
log::debug!("sorted in {:?}", now.elapsed());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn quicksort_by(vectors: &mut [column::Vector], range: Range<usize>, sort_by: &[usize]) {
|
||||
if range.start >= range.end {
|
||||
return;
|
||||
}
|
||||
|
||||
let pivot = partition(vectors, &range, sort_by);
|
||||
quicksort_by(vectors, range.start..pivot, sort_by);
|
||||
quicksort_by(vectors, pivot + 1..range.end, sort_by);
|
||||
}
|
||||
|
||||
fn partition(vectors: &mut [column::Vector], range: &Range<usize>, sort_by: &[usize]) -> usize {
|
||||
let pivot = (range.start + range.end) / 2;
|
||||
let (lo, hi) = (range.start, range.end);
|
||||
if cmp(vectors, pivot as usize, lo as usize, sort_by) == Ordering::Less {
|
||||
swap(vectors, lo as usize, pivot as usize);
|
||||
}
|
||||
if cmp(vectors, hi as usize, lo as usize, sort_by) == Ordering::Less {
|
||||
swap(vectors, lo as usize, hi as usize);
|
||||
}
|
||||
if cmp(vectors, pivot as usize, hi as usize, sort_by) == Ordering::Less {
|
||||
swap(vectors, hi as usize, pivot as usize);
|
||||
}
|
||||
|
||||
let pivot = hi;
|
||||
let mut i = range.start;
|
||||
let mut j = range.end;
|
||||
|
||||
loop {
|
||||
while cmp(vectors, i as usize, pivot as usize, sort_by) == Ordering::Less {
|
||||
i += 1;
|
||||
}
|
||||
|
||||
while cmp(vectors, j as usize, pivot as usize, sort_by) == Ordering::Greater {
|
||||
j -= 1;
|
||||
}
|
||||
|
||||
if i >= j {
|
||||
return j;
|
||||
}
|
||||
|
||||
swap(vectors, i as usize, j as usize);
|
||||
i += 1;
|
||||
j -= 1;
|
||||
}
|
||||
}
|
||||
|
||||
fn cmp(vectors: &[column::Vector], a: usize, b: usize, sort_by: &[usize]) -> Ordering {
|
||||
for &idx in sort_by {
|
||||
match &vectors[idx] {
|
||||
column::Vector::String(p) => {
|
||||
let cmp = p.get(a).cmp(&p.get(b));
|
||||
if cmp != Ordering::Equal {
|
||||
return cmp;
|
||||
}
|
||||
// if cmp equal then try next vector.
|
||||
}
|
||||
column::Vector::Integer(p) => {
|
||||
let cmp = p.get(a).cmp(&p.get(b));
|
||||
if cmp != Ordering::Equal {
|
||||
return cmp;
|
||||
}
|
||||
// if cmp equal then try next vector.
|
||||
}
|
||||
_ => continue, // don't compare on non-string / timestamp cols
|
||||
}
|
||||
}
|
||||
Ordering::Equal
|
||||
}
|
||||
|
||||
fn vectors_sorted_asc(vectors: &[column::Vector], len: usize, sort_by: &[usize]) -> bool {
|
||||
'row_wise: for i in 1..len {
|
||||
for &idx in sort_by {
|
||||
match &vectors[idx] {
|
||||
column::Vector::String(vec) => {
|
||||
if vec[i - 1] < vec[i] {
|
||||
continue 'row_wise;
|
||||
} else if vec[i - 1] == vec[i] {
|
||||
// try next column
|
||||
continue;
|
||||
} else {
|
||||
// value is > so
|
||||
return false;
|
||||
}
|
||||
}
|
||||
column::Vector::Integer(vec) => {
|
||||
if vec[i - 1] < vec[i] {
|
||||
continue 'row_wise;
|
||||
} else if vec[i - 1] == vec[i] {
|
||||
// try next column
|
||||
continue;
|
||||
} else {
|
||||
// value is > so
|
||||
return false;
|
||||
}
|
||||
}
|
||||
_ => continue, // don't compare on non-string / timestamp cols
|
||||
}
|
||||
}
|
||||
}
|
||||
true
|
||||
}
|
||||
|
||||
// Swap the same pair of elements in each packer column
|
||||
fn swap(vectors: &mut [column::Vector], a: usize, b: usize) {
|
||||
for p in vectors {
|
||||
p.swap(a, b);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue