Merge branch 'main' into dom/wal-ref-metrics

pull/24376/head
Fraser Savage 2023-03-15 17:54:01 +00:00 committed by GitHub
commit 8529e9551f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 440 additions and 29 deletions

View File

@ -187,13 +187,13 @@ where
.await
{
Ok(v) => v,
Err(e) => {
error!(
error=%e,
%namespace_id,
%table_id,
"query error"
);
Err(e @ (QueryError::TableNotFound(_, _) | QueryError::NamespaceNotFound(_))) => {
debug!(
error=%e,
%namespace_id,
%table_id,
"query error, no buffered data found");
return Err(e)?;
}
};

View File

@ -10,6 +10,7 @@ use arrow::{
record_batch::RecordBatch,
};
use datafusion::error::{DataFusionError, Result};
use hashbrown::HashMap;
use super::{params::GapFillParams, FillStrategy};
@ -64,7 +65,7 @@ use super::{params::GapFillParams, FillStrategy};
/// - Having one additional _trailing row_ at the end ensures that `GapFiller` can
/// infer whether there is trailing gaps to produce at the beginning of the
/// next batch, since it can discover if the last row starts a new series.
#[derive(Clone, Debug, PartialEq)]
#[derive(Debug, PartialEq)]
pub(super) struct GapFiller {
/// The static parameters of gap-filling: time range start, end and the stride.
params: GapFillParams,
@ -117,9 +118,9 @@ impl GapFiller {
}
let offset = self.cursor.next_input_offset - 1;
let len = batch.num_rows() - offset;
self.cursor.next_input_offset = 1;
self.cursor.slice(offset);
let len = batch.num_rows() - offset;
Ok(batch.slice(offset, len))
}
@ -152,7 +153,7 @@ impl GapFiller {
.map_err(DataFusionError::ArrowError)?;
let mut series_ends = vec![];
let mut cursor = self.cursor.clone();
let mut cursor = self.cursor.clone_for_aggr_col(None)?;
let mut output_row_count = 0;
let start_offset = cursor.next_input_offset;
@ -196,18 +197,18 @@ impl GapFiller {
Vec::with_capacity(group_arr.len() + aggr_arr.len() + 1); // plus one for time column
// build the time column
let mut cursor = self.cursor.clone();
let mut cursor = self.cursor.clone_for_aggr_col(None)?;
let (time_idx, input_time_array) = input_time_array;
let time_vec = cursor.build_time_vec(&self.params, series_ends, input_time_array)?;
let output_time_len = time_vec.len();
output_arrays.push((time_idx, Arc::new(TimestampNanosecondArray::from(time_vec))));
// There may not be any aggregate or group columns, so use this cursor state as the new
// GapFiller cursor once this output batch is complete.
let final_cursor = cursor;
let mut final_cursor = cursor;
// build the other group columns
for (idx, ga) in group_arr.iter() {
let mut cursor = self.cursor.clone();
let mut cursor = self.cursor.clone_for_aggr_col(None)?;
let take_vec =
cursor.build_group_take_vec(&self.params, series_ends, input_time_array)?;
if take_vec.len() != output_time_len {
@ -223,13 +224,18 @@ impl GapFiller {
// Build the aggregate columns
for (idx, aa) in aggr_arr.iter() {
let mut cursor = self.cursor.clone();
let mut cursor = self.cursor.clone_for_aggr_col(Some(*idx))?;
let take_vec = match self.params.fill_strategy.get(idx) {
Some(FillStrategy::Null) => cursor.build_aggr_take_vec_fill_null(
&self.params,
series_ends,
input_time_array,
),
Some(FillStrategy::Prev) => cursor.build_aggr_take_vec_fill_prev(
&self.params,
series_ends,
input_time_array,
),
Some(fs) => Err(DataFusionError::NotImplemented(format!(
"unsupported gap fill strategy {fs:?}"
))),
@ -246,6 +252,7 @@ impl GapFiller {
}
let take_arr = UInt64Array::from(take_vec);
output_arrays.push((*idx, take::take(aa, &take_arr, None)?));
final_cursor.merge_aggr_col_cursor(cursor);
}
output_arrays.sort_by(|(a, _), (b, _)| a.cmp(b));
@ -260,7 +267,7 @@ impl GapFiller {
/// Maintains the state needed to fill gaps in output columns. Also provides methods
/// for building vectors that build time, group, and aggregate output arrays.
#[derive(Clone, Debug, PartialEq)]
#[derive(Debug, PartialEq)]
struct Cursor {
/// Where to read the next row from the input.
next_input_offset: usize,
@ -274,16 +281,24 @@ struct Cursor {
/// True if there are trailing gaps from after the last input row for a series
/// to be produced at the beginning of the next output batch.
trailing_gaps: bool,
/// State for each aggregate column, keyed on the columns offset in the schema.
aggr_col_states: HashMap<usize, AggrColState>,
}
impl Cursor {
/// Creates a new cursor.
fn new(params: &GapFillParams) -> Self {
let aggr_col_states = params
.fill_strategy
.iter()
.map(|(idx, fs)| (*idx, AggrColState::new(fs)))
.collect();
Self {
next_input_offset: 0,
next_ts: params.first_ts,
remaining_output_batch_size: 0,
trailing_gaps: false,
aggr_col_states,
}
}
@ -293,6 +308,68 @@ impl Cursor {
self.next_input_offset == buffered_input_row_count && !self.trailing_gaps
}
/// Make a clone of this cursor to be used for creating an aggregate column,
/// if `idx` is `Some`. The resulting `Cursor` will only contain [AggrColState]
/// for the indicated column.
///
/// When `idx` is `None`, return a `Cursor` with an empty [Cursor::aggr_col_states].
fn clone_for_aggr_col(&self, idx: Option<usize>) -> Result<Self> {
let mut cur = Self {
next_input_offset: self.next_input_offset,
next_ts: self.next_ts,
remaining_output_batch_size: self.remaining_output_batch_size,
trailing_gaps: self.trailing_gaps,
aggr_col_states: HashMap::default(),
};
if let Some(idx) = idx {
let state = self
.aggr_col_states
.get(&idx)
.ok_or(DataFusionError::Internal(format!(
"could not find aggr col with offset {idx}"
)))?;
cur.aggr_col_states.insert(idx, state.clone());
}
Ok(cur)
}
/// Update [Cursor::aggr_col_states] with updated state for an
/// aggregate column. `cursor` will have been created via `Cursor::clone_for_aggr_col`,
/// so [Cursor::aggr_col_states] will contain exactly one item.
///
/// # Panics
///
/// Will panic if input cursor's [Cursor::aggr_col_states] does not contain exactly one item.
fn merge_aggr_col_cursor(&mut self, cursor: Self) {
assert_eq!(1, cursor.aggr_col_states.len());
for (idx, state) in cursor.aggr_col_states.into_iter() {
self.aggr_col_states.insert(idx, state);
}
}
/// Get the [AggrColState] for this cursor. `self` will have been created via
/// `Cursor::clone_for_aggr_col`, so [Cursor::aggr_col_states] will contain exactly one item.
///
/// # Panics
///
/// Will panic if [Cursor::aggr_col_states] does not contain exactly one item.
fn get_aggr_col_state(&self) -> &AggrColState {
assert_eq!(1, self.aggr_col_states.len());
self.aggr_col_states.iter().next().unwrap().1
}
/// Set the [AggrColState] for this cursor. `self` will have been created via
/// `Cursor::clone_for_aggr_col`, so [Cursor::aggr_col_states] will contain exactly one item.
///
/// # Panics
///
/// Will panic if [Cursor::aggr_col_states] does not contain exactly one item.
fn set_aggr_col_state(&mut self, new_state: AggrColState) {
assert_eq!(1, self.aggr_col_states.len());
let (_idx, state) = self.aggr_col_states.iter_mut().next().unwrap();
*state = new_state;
}
/// Counts the number of rows that will be produced for a series that ends (exclusively)
/// at `series_end`, including rows that have a null timestamp, if any.
///
@ -327,6 +404,15 @@ impl Cursor {
Some(count)
}
/// Update this cursor to reflect that `offset` older rows are being sliced off from the
/// buffered input.
fn slice(&mut self, offset: usize) {
self.next_input_offset -= offset;
for (_idx, aggr_col_state) in self.aggr_col_states.iter_mut() {
aggr_col_state.slice(offset);
}
}
/// Attempts to assign a value to `self.next_ts` if it does not have one.
///
/// This bit of abstraction is needed because the lower bound for gap filling may be
@ -427,7 +513,8 @@ impl Cursor {
}
/// Builds a vector that can use the [`take`](take::take) kernel
/// to produce an aggregate output column.
/// to produce an aggregate output column, filling gaps with
/// null values.
fn build_aggr_take_vec_fill_null(
&mut self,
params: &GapFillParams,
@ -458,6 +545,56 @@ impl Cursor {
Ok(aggr_builder.take_idxs)
}
/// Builds a vector that can use the [`take`](take::take) kernel
/// to produce an aggregate output column, filling gaps with the
/// previous value in the column.
fn build_aggr_take_vec_fill_prev(
&mut self,
params: &GapFillParams,
series_ends: &[usize],
input_time_array: &TimestampNanosecondArray,
) -> Result<Vec<Option<u64>>> {
struct AggrBuilder {
take_idxs: Vec<Option<u64>>,
prev_offset: Option<u64>,
}
impl VecBuilder for AggrBuilder {
fn push(&mut self, row_status: RowStatus) -> Result<()> {
match row_status {
RowStatus::NullTimestamp { offset, .. } => {
self.take_idxs.push(Some(offset as u64))
}
RowStatus::Present { offset, .. } => {
self.take_idxs.push(Some(offset as u64));
self.prev_offset = Some(offset as u64);
}
RowStatus::Missing { .. } => self.take_idxs.push(self.prev_offset),
}
Ok(())
}
fn start_new_series(&mut self) -> Result<()> {
self.prev_offset = None;
Ok(())
}
}
let mut aggr_builder = AggrBuilder {
take_idxs: Vec::with_capacity(self.remaining_output_batch_size),
prev_offset: self.get_aggr_col_state().prev_offset(),
};
self.build_vec(params, input_time_array, series_ends, &mut aggr_builder)?;
let AggrBuilder {
take_idxs,
prev_offset,
} = aggr_builder;
self.set_aggr_col_state(AggrColState::Prev {
offset: prev_offset,
});
Ok(take_idxs)
}
/// Helper method that iterates over each series
/// that ends with offsets in `series_ends` and produces
/// the appropriate output values.
@ -570,6 +707,46 @@ impl Cursor {
}
}
/// Maintains the state needed to fill gaps in an aggregate column,
/// depending on the fill strategy.
#[derive(Clone, Debug, PartialEq)]
enum AggrColState {
/// For [FillStrategy::Null] there is no state to maintain.
Null,
/// For [FillStrategy::Prev] or [FillStrategy::PrevNullAsMissing].
Prev { offset: Option<u64> },
}
impl AggrColState {
/// Create a new [AggrColState] based on the [FillStrategy] for the column.
fn new(fill_strategy: &FillStrategy) -> Self {
match fill_strategy {
FillStrategy::Null => Self::Null,
FillStrategy::Prev | FillStrategy::PrevNullAsMissing => Self::Prev { offset: None },
}
}
/// Return the offset in the input from which to fill gaps.
///
/// # Panics
///
/// This method will panic if `self` is not [AggrColState::Prev].
fn prev_offset(&self) -> Option<u64> {
match self {
Self::Prev { offset } => *offset,
_ => unreachable!(),
}
}
/// Update state to reflect that older rows in the buffered input
/// are being sliced away.
fn slice(&mut self, offset: usize) {
if let Self::Prev { offset: Some(v) } = self {
*v -= offset as u64;
}
}
}
/// A trait that lets implementors describe how to build the
/// vectors used to create Arrow arrays in the output.
trait VecBuilder {
@ -803,6 +980,143 @@ mod tests {
Ok(())
}
#[test]
fn test_cursor_append_aggr_take_prev() {
let input_times = TimestampNanosecondArray::from(vec![
// 950
1000, // 1050
1100, // 1150
1200,
// 1250
]);
let series = input_times.len();
let idx = 0;
let params = GapFillParams {
stride: 50,
first_ts: Some(950),
last_ts: 1250,
fill_strategy: prev_fill_strategy(idx),
};
let output_batch_size = 10000;
let mut cursor = new_cursor_with_batch_size(&params, output_batch_size);
let take_idxs = cursor
.build_aggr_take_vec_fill_prev(&params, &[series], &input_times)
.unwrap();
assert_eq!(
vec![
None, // 950
Some(0), // 1000
Some(0), // 1050
Some(1), // 1100
Some(1), // 1150
Some(2), // 1200
Some(2) // 1250
],
take_idxs
);
assert_cursor_end_state(&cursor, &input_times, &params);
}
#[test]
fn test_cursor_append_aggr_take_prev_with_nulls() {
let input_times = TimestampNanosecondArray::from(vec![
None,
None,
// 950,
Some(1000),
// 1050
Some(1100),
// 1150
Some(1200),
// 1250
//
]);
let series = input_times.len();
let idx = 0;
let params = GapFillParams {
stride: 50,
first_ts: Some(950),
last_ts: 1250,
fill_strategy: prev_fill_strategy(idx),
};
let output_batch_size = 10000;
let mut cursor = new_cursor_with_batch_size(&params, output_batch_size);
// Rows with null timestamps never have their aggregate values carried forward.
let take_idxs = cursor
.build_aggr_take_vec_fill_prev(&params, &[series], &input_times)
.unwrap();
assert_eq!(
vec![
Some(0), // null ts
Some(1), // null ts
None, // 950
Some(2), // 1000
Some(2), // 1050
Some(3), // 1100
Some(3), // 1150
Some(4), // 1200
Some(4) // 1250
],
take_idxs
);
assert_cursor_end_state(&cursor, &input_times, &params);
}
#[test]
fn test_cursor_append_aggr_take_prev_multi_series() {
let input_times = TimestampNanosecondArray::from(vec![
// 950
// 1000
Some(1050),
// 1100
// --- new series
// 950
// 1000
Some(1050),
// 1100
]);
let series_ends = vec![1, 2];
let idx = 0;
let params = GapFillParams {
stride: 50,
first_ts: Some(950),
last_ts: 1100,
fill_strategy: prev_fill_strategy(idx),
};
let output_batch_size = 10000;
let mut cursor = new_cursor_with_batch_size(&params, output_batch_size);
let take_idxs = cursor
.build_aggr_take_vec_fill_prev(&params, &series_ends, &input_times)
.unwrap();
assert_eq!(
vec![
None, // 950
None, // 1000
Some(0), // 1050
Some(0), // 1100
// New series
None, // 950
None, // 1000
Some(1), // 1050
Some(1), // 1100
],
take_idxs
);
assert_cursor_end_state(&cursor, &input_times, &params);
}
#[test]
fn test_cursor_multi_output_batch() -> Result<()> {
let output_batch_size = 5;
@ -1223,7 +1537,7 @@ mod tests {
series_end: usize,
input_times: &TimestampNanosecondArray,
) -> usize {
let mut cursor = cursor.clone();
let mut cursor = cursor.clone_for_aggr_col(None).unwrap();
cursor
.count_series_rows(params, input_times, series_end)
.unwrap()
@ -1233,6 +1547,10 @@ mod tests {
std::iter::once((1, FillStrategy::Null)).collect()
}
fn prev_fill_strategy(idx: usize) -> HashMap<usize, FillStrategy> {
std::iter::once((idx, FillStrategy::Prev)).collect()
}
struct Expected {
times: Vec<Option<i64>>,
group_take: Vec<u64>,
@ -1247,15 +1565,17 @@ mod tests {
series_end: usize,
expected: Expected,
) -> Result<()> {
let actual_times = cursor
.clone()
.build_time_vec(params, &[series_end], input_times)?;
let actual_times =
cursor
.clone_for_aggr_col(None)?
.build_time_vec(params, &[series_end], input_times)?;
assert_eq!(expected.times, actual_times, "{desc} times");
let actual_group_take =
cursor
.clone()
.build_group_take_vec(params, &[series_end], input_times)?;
let actual_group_take = cursor.clone_for_aggr_col(None)?.build_group_take_vec(
params,
&[series_end],
input_times,
)?;
assert_eq!(expected.group_take, actual_group_take, "{desc} group take");
let actual_aggr_take =

View File

@ -496,6 +496,83 @@ fn test_gapfill_simple_no_lower_bound() {
}}
}
#[test]
fn test_gapfill_fill_prev() {
test_helpers::maybe_start_logging();
insta::allow_duplicates! { for output_batch_size in [1, 2, 4, 8] {
for input_batch_size in [1, 2, 4] {
let records = TestRecords {
group_cols: vec![vec![
Some("a"),
Some("a"),
Some("b"),
Some("b"),
Some("b"),
]],
time_col: vec![
// 975
Some(1000),
// 1025
// 1050
Some(1075),
// 1100
// 1125
// --- new series
// 975
Some(1000),
// 1025
Some(1050),
// 1075
Some(1100),
// 1125
],
agg_cols: vec![vec![
Some(10),
Some(11),
Some(20),
None,
Some(21),
]],
input_batch_size,
};
let params = get_params_ms_with_fill_strategy(&records, 25, Some(975), 1_125, FillStrategy::Prev);
let tc = TestCase {
test_records: records,
output_batch_size,
params,
};
let batches = tc.run().unwrap();
let actual = batches_to_lines(&batches);
insta::with_settings!({
description => format!("input_batch_size: {input_batch_size}, output_batch_size: {output_batch_size}"),
}, {
insta::assert_yaml_snapshot!(actual, @r###"
---
- +----+--------------------------+----+
- "| g0 | time | a0 |"
- +----+--------------------------+----+
- "| a | 1970-01-01T00:00:00.975Z | |"
- "| a | 1970-01-01T00:00:01Z | 10 |"
- "| a | 1970-01-01T00:00:01.025Z | 10 |"
- "| a | 1970-01-01T00:00:01.050Z | 10 |"
- "| a | 1970-01-01T00:00:01.075Z | 11 |"
- "| a | 1970-01-01T00:00:01.100Z | 11 |"
- "| a | 1970-01-01T00:00:01.125Z | 11 |"
- "| b | 1970-01-01T00:00:00.975Z | |"
- "| b | 1970-01-01T00:00:01Z | 20 |"
- "| b | 1970-01-01T00:00:01.025Z | 20 |"
- "| b | 1970-01-01T00:00:01.050Z | |"
- "| b | 1970-01-01T00:00:01.075Z | |"
- "| b | 1970-01-01T00:00:01.100Z | 21 |"
- "| b | 1970-01-01T00:00:01.125Z | 21 |"
- +----+--------------------------+----+
"###)
});
assert_batch_count(&batches, output_batch_size);
}
}}
}
#[test]
fn test_gapfill_simple_no_lower_bound_with_nulls() {
test_helpers::maybe_start_logging();
@ -812,23 +889,28 @@ fn bound_included_from_option<T>(o: Option<T>) -> Bound<T> {
}
}
fn phys_fill_strategy_null(
fn phys_fill_strategies(
records: &TestRecords,
fill_strategy: FillStrategy,
) -> Result<Vec<(Arc<dyn PhysicalExpr>, FillStrategy)>> {
let start = records.group_cols.len() + 1; // 1 is for time col
let end = start + records.agg_cols.len();
let mut v = Vec::with_capacity(records.agg_cols.len());
for f in records.schema().fields()[start..end].iter() {
v.push((phys_col(f.name(), &records.schema())?, FillStrategy::Null));
v.push((
phys_col(f.name(), &records.schema())?,
fill_strategy.clone(),
));
}
Ok(v)
}
fn get_params_ms(
fn get_params_ms_with_fill_strategy(
batch: &TestRecords,
stride: i64,
start: Option<i64>,
end: i64,
fill_strategy: FillStrategy,
) -> GapFillExecParams {
GapFillExecParams {
// interval day time is milliseconds in the low 32-bit word
@ -848,6 +930,15 @@ fn get_params_ms(
None,
))),
},
fill_strategy: phys_fill_strategy_null(batch).unwrap(),
fill_strategy: phys_fill_strategies(batch, fill_strategy).unwrap(),
}
}
fn get_params_ms(
batch: &TestRecords,
stride: i64,
start: Option<i64>,
end: i64,
) -> GapFillExecParams {
get_params_ms_with_fill_strategy(batch, stride, start, end, FillStrategy::Null)
}