fix: fix some bugs

pull/24376/head
Edd Robinson 2020-09-17 21:52:50 +01:00
parent 4f12e151d6
commit 751fa013e7
4 changed files with 95 additions and 44 deletions

View File

@ -63,14 +63,14 @@ fn main() {
);
let store = Arc::new(store);
time_select_with_pred(&store);
time_datafusion_select_with_pred(store.clone());
time_first_host(&store);
time_sum_range(&store);
time_count_range(&store);
time_group_single_with_pred(&store);
time_group_by_multi_agg_count(&store);
time_group_by_multi_agg_sorted_count(&store);
// time_select_with_pred(&store);
// time_datafusion_select_with_pred(store.clone());
// time_first_host(&store);
// time_sum_range(&store);
// time_count_range(&store);
// time_group_single_with_pred(&store);
// time_group_by_multi_agg_count(&store);
// time_group_by_multi_agg_sorted_count(&store);
time_window_agg_count(&store);
// time_group_by_different_columns(&store);
}
@ -121,10 +121,10 @@ fn build_store(
match rb {
Err(e) => println!("WARNING: error reading batch: {:?}, SKIPPING", e),
Ok(Some(rb)) => {
// if i < 364 {
// i += 1;
// continue;
// }
if i < 364 {
i += 1;
continue;
}
let schema = Schema::with_sort_order(
rb.schema(),
sort_order.iter().map(|s| s.to_string()).collect(),
@ -134,7 +134,7 @@ fn build_store(
let mut segment = Segment::new(rb.num_rows(), schema);
convert_record_batch(rb, &mut segment)?;
// println!("{}", &segment);
log::debug!("{}", &segment);
store.add_segment(segment);
}
Ok(None) => {
@ -499,7 +499,7 @@ fn time_group_by_multi_agg_count(store: &Store) {
let now = std::time::Instant::now();
let groups = segments.read_group_eq(
(1589000000000001, 1590044410000000),
(1589000000000001, 1590044410000001),
&[],
vec!["status".to_string(), "method".to_string()],
vec![("counter".to_string(), AggregateType::Count)],
@ -575,7 +575,7 @@ fn time_window_agg_count(store: &Store) {
];
for strat in &strats {
let repeat = 1;
let repeat = 10;
let mut total_time: std::time::Duration = std::time::Duration::new(0, 0);
let mut total_max = 0;
let segments = store.segments();

View File

@ -393,20 +393,41 @@ impl<'a> Vector<'a> {
fn count_by_id_range(&self, from_row_id: usize, to_row_id: usize) -> u64 {
match self {
Self::NullString(vec) => {
let count = vec.iter().filter(|x| x.is_some()).count();
let mut count = 0;
for v in &vec[from_row_id..to_row_id] {
if v.is_some() {
count += 1;
}
}
count as u64
}
Self::NullFloat(vec) => {
let count = vec.iter().filter(|x| x.is_some()).count();
let mut count = 0;
for v in &vec[from_row_id..to_row_id] {
if v.is_some() {
count += 1;
}
}
count as u64
}
Self::NullInteger(vec) => {
let count = vec.iter().filter(|x| x.is_some()).count();
let mut count = 0;
for v in &vec[from_row_id..to_row_id] {
if v.is_some() {
count += 1;
}
}
count as u64
}
Self::Float(vec) => (to_row_id - from_row_id) as u64, // fast - no possible NULL values
Self::Integer(vec) => (to_row_id - from_row_id) as u64, // fast - no possible NULL values
Self::Unsigned32(vec) => (to_row_id - from_row_id) as u64, // fast - no possible NULL values
Self::Float(_) => {
(to_row_id - from_row_id) as u64 // fast - no possible NULL values
}
Self::Integer(_) => {
(to_row_id - from_row_id) as u64 // fast - no possible NULL values
}
Self::Unsigned32(_) => {
(to_row_id - from_row_id) as u64 // fast - no possible NULL values
}
}
}
@ -705,6 +726,13 @@ impl Column {
/// Materialise all of the decoded values matching the provided logical
/// row ids.
//
// FIXME(edd): we need to provide an API on an encoding to return raw_values
// so that we can return non-null vectors when we know the underlying encoding
// doesn't contain any null values. Right now we return nullable vectors, w
// which take up more memory and mean we can't do fast counts (since we need
// to check each value is non-null).
//
pub fn values(&self, row_ids: &[usize]) -> Vector<'_> {
match self {
Column::String(c) => {
@ -1227,13 +1255,13 @@ impl std::fmt::Display for Column {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match &self {
Column::String(c) => {
write!(f, "{}", c)?;
write!(f, "[String Column]: {}", c)?;
}
Column::Float(c) => {
write!(f, "{}", c)?;
write!(f, "[Float Column]:{}", c)?;
}
Column::Integer(c) => {
write!(f, "{}", c)?;
write!(f, "[Integer Column]: {}", c)?;
}
}
Ok(())
@ -1617,7 +1645,7 @@ impl From<arrow::array::Float64Array> for NumericColumn<f64> {
let v = arr.value(i);
match range {
Some(mut range) => {
Some(ref mut range) => {
range.0 = range.0.min(v);
range.1 = range.1.max(v);
}
@ -1648,7 +1676,7 @@ impl From<arrow::array::Int64Array> for NumericColumn<i64> {
let v = arr.value(i);
match range {
Some(mut range) => {
Some(ref mut range) => {
range.0 = range.0.min(v);
range.1 = range.1.max(v);
}
@ -1679,7 +1707,7 @@ impl From<arrow::array::TimestampMicrosecondArray> for NumericColumn<i64> {
let v = arr.value(i);
match range {
Some(mut range) => {
Some(ref mut range) => {
range.0 = range.0.min(v);
range.1 = range.1.max(v);
}
@ -1704,7 +1732,10 @@ impl From<&[f64]> for NumericColumn<f64> {
// calculate min/max for meta data
for &v in values {
match range {
Some(mut range) => {
// wow this ref totally confused me for a while. Without it
// the code will compile fine but the range option will never
// reflect changes because the tuple range will be a copy.
Some(ref mut range) => {
range.0 = range.0.min(v);
range.1 = range.1.max(v);
}
@ -1729,7 +1760,7 @@ impl From<&[i64]> for NumericColumn<i64> {
// calculate min/max for meta data
for &v in values {
match range {
Some(mut range) => {
Some(ref mut range) => {
range.0 = range.0.min(v);
range.1 = range.1.max(v);
}

View File

@ -217,7 +217,13 @@ where
+ std::ops::Add<Output = T::Native>,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "[PlainArrow<T>] size: {}", self.size())
write!(
f,
"[PlainArrow<T>] rows: {:?}, nulls: {:?}, size: {}",
self.arr.len(),
self.arr.null_count(),
self.size()
)
}
}
@ -245,7 +251,12 @@ where
+ std::ops::AddAssign,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "[PlainFixed<T>] size: {}", self.size(),)
write!(
f,
"[PlainFixed<T>] rows: {:?}, size: {}",
self.values.len(),
self.size()
)
}
}
@ -481,6 +492,7 @@ pub struct DictionaryRLE {
// of times the entry repeats.
run_lengths: Vec<(usize, u64)>,
nulls: u64,
total: u64,
}
@ -492,6 +504,7 @@ impl DictionaryRLE {
index_entry: BTreeMap::new(),
map_size: 0,
run_lengths: Vec::new(),
nulls: 0,
total: 0,
}
}
@ -503,6 +516,7 @@ impl DictionaryRLE {
index_entry: BTreeMap::new(),
map_size: 0,
run_lengths: Vec::new(),
nulls: 0,
total: 0,
};
@ -514,7 +528,7 @@ impl DictionaryRLE {
.index_row_ids
.insert(next_idx as u32, croaring::Bitmap::create());
_self.run_lengths.push((next_idx, 0)); // could this cause a bug?ta
_self.run_lengths.push((next_idx, 0)); // could this cause a bug?
}
_self
}
@ -568,6 +582,9 @@ impl DictionaryRLE {
}
}
self.total += additional;
if v.is_none() {
self.nulls += additional;
}
}
// row_ids returns an iterator over the set of row ids matching the provided
@ -817,7 +834,9 @@ impl std::fmt::Display for DictionaryRLE {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"[DictionaryRLE] size: {}, dict entries: {}, runs: {} ",
"[DictionaryRLE] rows: {:?} nulls: {:?}, size: {}, dict entries: {}, runs: {} ",
self.total,
self.nulls,
self.size(),
self.index_entry.len(),
self.run_lengths.len()

View File

@ -229,8 +229,7 @@ impl Segment {
group_columns: &[String],
aggregates: &'a [(String, AggregateType)],
window: i64,
) -> BTreeMap<Vec<i64>, Vec<(&'a String, &'a AggregateType, Option<column::Aggregate<'a>>)>>
{
) -> BTreeMap<Vec<i64>, Vec<(&'a String, &'a AggregateType, column::Aggregate<'a>)>> {
// Build a hash table - essentially, scan columns for matching row ids,
// emitting the encoded value for each column and track those value
// combinations in a hashmap with running aggregates.
@ -497,6 +496,7 @@ impl Segment {
.iter()
.map(|v| *v as usize)
.collect::<Vec<_>>();
log::debug!("filtered to {:?} rows.", filtered_row_ids_vec.len());
// materialise all encoded values for the matching rows in the columns
// we are grouping on and store each group as an iterator.
@ -557,15 +557,13 @@ impl Segment {
}
let now = std::time::Instant::now();
if self.group_key_sorted(group_columns) {
panic!("This shouldn't be called!!!");
} else {
// now sort on the first grouping columns. Right now the order doesn't matter...
let group_col_sort_order = &(0..group_columns.len()).collect::<Vec<_>>();
super::sorter::sort(&mut all_columns, group_col_sort_order).unwrap();
}
assert!(!self.group_key_sorted(group_columns)); // should always need a sort if in this method
log::debug!("time checking sort {:?}", now.elapsed());
// now sort on the first grouping columns. Right now the order doesn't matter...
let group_col_sort_order = &(0..group_columns.len()).collect::<Vec<_>>();
super::sorter::sort(&mut all_columns, group_col_sort_order).unwrap();
// let group_itrs = all_columns
// .iter()
// .take(group_columns.len()) // only use grouping columns
@ -582,7 +580,9 @@ impl Segment {
.iter()
.take(group_columns.len())
.map(|vector| match vector {
column::Vector::Unsigned32(_) => column::VectorIterator::new(vector), // encoded tag columns
column::Vector::Unsigned32(_) => {
column::VectorIterator::new(vector) // encoded tag columns
}
column::Vector::Integer(_) => column::VectorIterator::new(vector), // encoded (but actually just raw) timestamp column
_ => panic!("don't support grouping on non-encoded values or timestamps"),
})
@ -641,6 +641,7 @@ impl Segment {
.iter()
.map(|v| *v as usize)
.collect::<Vec<_>>();
log::debug!("filtered to {:?} rows.", filtered_row_ids_vec.len());
// materialise all encoded values for the matching rows in the columns
// we are grouping on and store each group as an iterator.
@ -663,7 +664,7 @@ impl Segment {
}
}
let mut group_itrs = group_column_encoded_values
let group_itrs = group_column_encoded_values
.iter()
.map(|vector| match vector {
column::Vector::Unsigned32(_) => column::VectorIterator::new(vector), // encoded tag columns