feat: skip individual rows during replay based on timestamp

pull/24376/head
Marco Neumann 2021-08-17 14:32:49 +02:00
parent c31d837915
commit 31cbb646b9
8 changed files with 710 additions and 59 deletions

View File

@ -41,6 +41,9 @@ pub enum Error {
#[snafu(display("Column not found: {}", column))]
ColumnNotFound { column: String },
#[snafu(display("Mask had {} rows, expected {}", expected, actual))]
IncorrectMaskLength { expected: usize, actual: usize },
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -88,7 +91,13 @@ pub struct MBChunk {
impl MBChunk {
/// Create a new batch and write the contents of the [`TableBatch`] into it. Chunks
/// shouldn't exist without some data.
pub fn new(metrics: ChunkMetrics, batch: TableBatch<'_>) -> Result<Self> {
///
/// If `mask` is provided, only entries that are marked w/ `true` are written.
pub fn new(
metrics: ChunkMetrics,
batch: TableBatch<'_>,
mask: Option<&[bool]>,
) -> Result<Self> {
let table_name = Arc::from(batch.name());
let mut chunk = Self {
@ -99,15 +108,21 @@ impl MBChunk {
};
let columns = batch.columns();
chunk.write_columns(columns)?;
chunk.write_columns(columns, mask)?;
Ok(chunk)
}
/// Write the contents of a [`TableBatch`] into this Chunk.
///
/// If `mask` is provided, only entries that are marked w/ `true` are written.
///
/// Panics if the batch specifies a different name for the table in this Chunk
pub fn write_table_batch(&mut self, batch: TableBatch<'_>) -> Result<()> {
pub fn write_table_batch(
&mut self,
batch: TableBatch<'_>,
mask: Option<&[bool]>,
) -> Result<()> {
let table_name = batch.name();
assert_eq!(
table_name,
@ -115,7 +130,7 @@ impl MBChunk {
"can only insert table batch for a single table to chunk"
);
self.write_columns(batch.columns())?;
self.write_columns(batch.columns(), mask)?;
// Invalidate chunk snapshot
*self
@ -273,10 +288,28 @@ impl MBChunk {
/// Validates the schema of the passed in columns, then adds their values to
/// the associated columns in the table and updates summary statistics.
fn write_columns(&mut self, columns: Vec<entry::Column<'_>>) -> Result<()> {
///
/// If `mask` is provided, only entries that are marked w/ `true` are written.
fn write_columns(
&mut self,
columns: Vec<entry::Column<'_>>,
mask: Option<&[bool]>,
) -> Result<()> {
let row_count_before_insert = self.rows();
let additional_rows = columns.first().map(|x| x.row_count).unwrap_or_default();
let final_row_count = row_count_before_insert + additional_rows;
let masked_values = if let Some(mask) = mask {
ensure!(
additional_rows == mask.len(),
IncorrectMaskLength {
expected: additional_rows,
actual: mask.len(),
}
);
mask.iter().filter(|x| !*x).count()
} else {
0
};
let final_row_count = row_count_before_insert + additional_rows - masked_values;
// get the column ids and validate schema for those that already exist
columns.iter().try_for_each(|column| {
@ -313,7 +346,7 @@ impl MBChunk {
})
.1;
column.append(&fb_column).context(ColumnError {
column.append(&fb_column, mask).context(ColumnError {
column: fb_column.name(),
})?;
@ -353,7 +386,7 @@ pub mod test_helpers {
);
for batch in table_batches {
chunk.write_table_batch(batch)?;
chunk.write_table_batch(batch, None)?;
}
}
@ -378,9 +411,9 @@ pub mod test_helpers {
for batch in table_batches {
match chunk {
Some(ref mut c) => c.write_table_batch(batch)?,
Some(ref mut c) => c.write_table_batch(batch, None)?,
None => {
chunk = Some(MBChunk::new(ChunkMetrics::new_unregistered(), batch)?);
chunk = Some(MBChunk::new(ChunkMetrics::new_unregistered(), batch, None)?);
}
}
}
@ -403,7 +436,7 @@ mod tests {
};
use entry::test_helpers::lp_to_entry;
use internal_types::schema::{InfluxColumnType, InfluxFieldType};
use std::num::NonZeroU64;
use std::{convert::TryFrom, num::NonZeroU64, vec};
#[test]
fn writes_table_batches() {
@ -933,6 +966,7 @@ mod tests {
.first()
.unwrap()
.columns(),
None,
)
.err()
.unwrap();
@ -964,6 +998,7 @@ mod tests {
.first()
.unwrap()
.columns(),
None,
)
.err()
.unwrap();
@ -995,6 +1030,7 @@ mod tests {
.first()
.unwrap()
.columns(),
None,
)
.err()
.unwrap();
@ -1026,6 +1062,7 @@ mod tests {
.first()
.unwrap()
.columns(),
None,
)
.err()
.unwrap();
@ -1057,6 +1094,7 @@ mod tests {
.first()
.unwrap()
.columns(),
None,
)
.err()
.unwrap();
@ -1088,6 +1126,7 @@ mod tests {
.first()
.unwrap()
.columns(),
None,
)
.err()
.unwrap();
@ -1106,4 +1145,167 @@ mod tests {
response
);
}
#[test]
fn test_mask() {
let mut entries = vec![];
let mut masks = vec![];
let lp = [
"table,tag=a float_field=1.1,int_field=11i,uint_field=111u,bool_field=t,string_field=\"axx\" 100",
"table,tag=b float_field=2.2,int_field=22i,uint_field=222u,bool_field=f,string_field=\"bxx\" 200",
"table,tag=c float_field=3.3,int_field=33i,uint_field=333u,bool_field=f,string_field=\"cxx\" 300",
"table,tag=d float_field=4.4,int_field=44i,uint_field=444u,bool_field=t,string_field=\"dxx\" 400",
].join("\n");
masks.push(vec![false, true, true, false]);
entries.push(lp_to_entry(&lp));
let lp = [
"table,tag=e float_field=5.5,int_field=55i,uint_field=555u,bool_field=f,string_field=\"exx\" 500",
"table,tag=f float_field=6.6,int_field=66i,uint_field=666u,bool_field=t,string_field=\"fxx\" 600",
"table foo=1 700",
"table foo=2 800",
"table foo=3 900",
].join("\n");
masks.push(vec![true, false, true, false, false]);
entries.push(lp_to_entry(&lp));
let mut chunk: Option<MBChunk> = None;
for (entry, mask) in entries.into_iter().zip(masks.into_iter()) {
for w in entry.partition_writes().unwrap() {
for batch in w.table_batches() {
match chunk {
Some(ref mut c) => c.write_table_batch(batch, Some(mask.as_ref())).unwrap(),
None => {
chunk = Some(
MBChunk::new(
ChunkMetrics::new_unregistered(),
batch,
Some(mask.as_ref()),
)
.unwrap(),
);
}
}
}
}
}
let chunk = chunk.unwrap();
let expected = ColumnSummary {
name: "float_field".into(),
influxdb_type: Some(InfluxDbType::Field),
stats: Statistics::F64(StatValues {
min: Some(2.2),
max: Some(5.5),
total_count: 4,
null_count: 1,
distinct_count: None,
}),
};
assert_summary_eq!(expected, chunk, "float_field");
let expected = ColumnSummary {
name: "int_field".into(),
influxdb_type: Some(InfluxDbType::Field),
stats: Statistics::I64(StatValues {
min: Some(22),
max: Some(55),
total_count: 4,
null_count: 1,
distinct_count: None,
}),
};
assert_summary_eq!(expected, chunk, "int_field");
let expected = ColumnSummary {
name: "uint_field".into(),
influxdb_type: Some(InfluxDbType::Field),
stats: Statistics::U64(StatValues {
min: Some(222),
max: Some(555),
total_count: 4,
null_count: 1,
distinct_count: None,
}),
};
assert_summary_eq!(expected, chunk, "uint_field");
let expected = ColumnSummary {
name: "bool_field".into(),
influxdb_type: Some(InfluxDbType::Field),
stats: Statistics::Bool(StatValues {
min: Some(false),
max: Some(false),
total_count: 4,
null_count: 1,
distinct_count: None,
}),
};
assert_summary_eq!(expected, chunk, "bool_field");
let expected = ColumnSummary {
name: "string_field".into(),
influxdb_type: Some(InfluxDbType::Field),
stats: Statistics::String(StatValues {
min: Some("bxx".into()),
max: Some("exx".into()),
total_count: 4,
null_count: 1,
distinct_count: None,
}),
};
assert_summary_eq!(expected, chunk, "string_field");
let expected = ColumnSummary {
name: "time".into(),
influxdb_type: Some(InfluxDbType::Timestamp),
stats: Statistics::I64(StatValues {
min: Some(200),
max: Some(700),
total_count: 4,
null_count: 0,
distinct_count: None,
}),
};
assert_summary_eq!(expected, chunk, "time");
let expected = ColumnSummary {
name: "tag".into(),
influxdb_type: Some(InfluxDbType::Tag),
stats: Statistics::String(StatValues {
min: Some("b".into()),
max: Some("e".into()),
total_count: 4,
null_count: 1,
distinct_count: Some(NonZeroU64::try_from(4).unwrap()),
}),
};
assert_summary_eq!(expected, chunk, "tag");
}
#[test]
fn test_mask_wrong_length() {
let lp = [
"table,tag=a float_field=1.1,int_field=11i,uint_field=111u,bool_field=t,string_field=\"axx\" 100",
"table,tag=b float_field=2.2,int_field=22i,uint_field=222u,bool_field=f,string_field=\"bxx\" 200",
].join("\n");
let entry = lp_to_entry(&lp);
let partition_write = entry.partition_writes().unwrap().pop().unwrap();
let mask = vec![false, true, true, false];
let batch = partition_write.table_batches().pop().unwrap();
let err =
MBChunk::new(ChunkMetrics::new_unregistered(), batch, Some(mask.as_ref())).unwrap_err();
assert!(matches!(err, Error::IncorrectMaskLength { .. }));
let batch = partition_write.table_batches().pop().unwrap();
let mut chunk = MBChunk::new(ChunkMetrics::new_unregistered(), batch, None).unwrap();
let batch = partition_write.table_batches().pop().unwrap();
let err = chunk
.write_table_batch(batch, Some(mask.as_ref()))
.unwrap_err();
assert!(matches!(err, Error::IncorrectMaskLength { .. }));
}
}

View File

@ -1,6 +1,7 @@
use std::convert::TryInto;
use std::iter::Enumerate;
use std::mem;
use std::sync::Arc;
use std::{convert::TryInto, iter::Zip};
use arrow::{
array::{
@ -125,15 +126,25 @@ impl Column {
self.influx_type
}
pub fn append(&mut self, entry: &EntryColumn<'_>) -> Result<()> {
pub fn append(
&mut self,
entry: &EntryColumn<'_>,
inclusion_mask: Option<&[bool]>,
) -> Result<()> {
self.validate_schema(entry)?;
let row_count = entry.row_count;
let masked_values = if let Some(mask) = inclusion_mask {
assert_eq!(entry.row_count, mask.len());
mask.iter().map(|x| !x as usize).sum::<usize>()
} else {
0
};
let row_count = entry.row_count - masked_values;
if row_count == 0 {
return Ok(());
}
let mask = construct_valid_mask(entry)?;
let valid_mask = construct_valid_mask(entry)?;
match &mut self.data {
ColumnData::Bool(col_data, stats) => {
@ -148,21 +159,28 @@ impl Column {
col_data.append_unset(row_count);
let initial_total_count = stats.total_count;
let to_add = entry_data.len();
let null_count = row_count - to_add;
let mut added = 0;
for (idx, value) in iter_set_positions(&mask).zip(entry_data) {
for (idx, value) in MaskedIter::new(
iter_set_positions(&valid_mask),
entry_data.iter(),
inclusion_mask,
) {
stats.update(value);
if *value {
col_data.set(data_offset + idx);
}
added += 1;
}
let null_count = row_count - added;
stats.update_for_nulls(null_count as u64);
assert_eq!(
stats.total_count - initial_total_count - null_count as u64,
to_add as u64
added as u64
);
}
ColumnData::U64(col_data, stats) => {
@ -174,7 +192,14 @@ impl Column {
.expect("invalid payload")
.into_iter();
handle_write(row_count, &mask, entry_data, col_data, stats);
handle_write(
row_count,
&valid_mask,
entry_data,
col_data,
stats,
inclusion_mask,
);
}
ColumnData::F64(col_data, stats) => {
let entry_data = entry
@ -185,7 +210,14 @@ impl Column {
.expect("invalid payload")
.into_iter();
handle_write(row_count, &mask, entry_data, col_data, stats);
handle_write(
row_count,
&valid_mask,
entry_data,
col_data,
stats,
inclusion_mask,
);
}
ColumnData::I64(col_data, stats) => {
let entry_data = entry
@ -196,7 +228,14 @@ impl Column {
.expect("invalid payload")
.into_iter();
handle_write(row_count, &mask, entry_data, col_data, stats);
handle_write(
row_count,
&valid_mask,
entry_data,
col_data,
stats,
inclusion_mask,
);
}
ColumnData::String(col_data, stats) => {
let entry_data = entry
@ -208,21 +247,26 @@ impl Column {
let data_offset = col_data.len();
let initial_total_count = stats.total_count;
let to_add = entry_data.len();
let null_count = row_count - to_add;
let mut added = 0;
for (str, idx) in entry_data.iter().zip(iter_set_positions(&mask)) {
for (idx, str) in MaskedIter::new(
iter_set_positions(&valid_mask),
entry_data.iter(),
inclusion_mask,
) {
col_data.extend(data_offset + idx - col_data.len());
stats.update(str);
col_data.append(str);
added += 1;
}
col_data.extend(data_offset + row_count - col_data.len());
let null_count = row_count - added;
stats.update_for_nulls(null_count as u64);
assert_eq!(
stats.total_count - initial_total_count - null_count as u64,
to_add as u64
added as u64
);
}
ColumnData::Tag(col_data, dictionary, stats) => {
@ -237,23 +281,29 @@ impl Column {
col_data.resize(data_offset + row_count, INVALID_DID);
let initial_total_count = stats.total_count;
let to_add = entry_data.len();
let null_count = row_count - to_add;
let mut added = 0;
for (idx, value) in iter_set_positions(&mask).zip(entry_data) {
for (idx, value) in MaskedIter::new(
iter_set_positions(&valid_mask),
entry_data.iter(),
inclusion_mask,
) {
stats.update(value);
col_data[data_offset + idx] = dictionary.lookup_value_or_insert(value);
added += 1;
}
let null_count = row_count - added;
stats.update_for_nulls(null_count as u64);
assert_eq!(
stats.total_count - initial_total_count - null_count as u64,
to_add as u64
added as u64
);
}
};
self.valid.append_bits(entry.row_count, &mask);
self.valid.append_bits(row_count, &valid_mask);
Ok(())
}
@ -404,6 +454,102 @@ impl Column {
}
}
/// Iterator that masks out set positions and data.
///
/// The iterator outputs set position and data. It assumes that set positions are increasing.
///
/// If no mask is provided set positions and data will just be zipped.
///
/// If a mask is provided, only elements that are marked as `true` are considered. Note that this is independent of the
/// inclusion or exclusion via set positions. Set positions are shifted to accommodate for excluded data. Here is an
/// example input:
///
/// | Mask | Set Positions | Data |
/// | ----- | ------------- | ---- |
/// | false | | |
/// | true | | |
/// | false | 2 | a |
/// | true | 3 | b |
/// | true | | |
/// | true | 5 | c |
/// | true | 6 | d |
/// | false | | |
/// | true | 8 | e |
///
/// This results in the following output:
///
/// | Set Positions | Data |
/// | ------------- | ---- |
/// | 1 | b |
/// | 3 | c |
/// | 4 | d |
/// | 5 | e |
struct MaskedIter<'a, I1, I2>
where
I1: Iterator<Item = usize>,
I2: Iterator,
{
/// Zipped iterator yielding unshifted (aka w/o accounting for excluded items) set positions and data.
it: Zip<I1, I2>,
/// If the mask was provided this includes an enumerated iterator over the mask. The enumeration is used to align
/// the mask with the set positions.
mask: Option<Enumerate<core::slice::Iter<'a, bool>>>,
/// Number of excluded items up to the current point.
exluded_count: usize,
}
impl<'a, I1, I2> MaskedIter<'a, I1, I2>
where
I1: Iterator<Item = usize>,
I2: Iterator,
{
fn new(it_set_positions: I1, it_data: I2, mask: Option<&'a [bool]>) -> Self {
Self {
it: it_set_positions.zip(it_data),
mask: mask.map(|mask| mask.iter().enumerate()),
exluded_count: 0,
}
}
}
impl<'a, I1, I2> Iterator for MaskedIter<'a, I1, I2>
where
I1: Iterator<Item = usize>,
I2: Iterator,
{
type Item = (usize, I2::Item);
fn next(&mut self) -> Option<Self::Item> {
if let Some(mask) = self.mask.as_mut() {
for (set_position, value) in &mut self.it {
loop {
let (idx_mask, included) = mask.next().expect("inclusion mask too short");
if !included {
self.exluded_count += 1;
}
if idx_mask == set_position {
if *included {
let set_position = set_position
.checked_sub(self.exluded_count)
.expect("set positions broken");
return Some((set_position, value));
} else {
// exclude this value
break;
}
}
}
}
None
} else {
self.it.next()
}
}
}
/// Construct a validity mask from the given column's null mask
fn construct_valid_mask(column: &EntryColumn<'_>) -> Result<Vec<u8>> {
let buf_len = (column.row_count + 7) >> 3;
@ -441,6 +587,7 @@ fn handle_write<T, E>(
entry_data: E,
col_data: &mut Vec<T>,
stats: &mut StatValues<T>,
inclusion_mask: Option<&[bool]>,
) where
T: Clone + Default + PartialOrd + IsNan,
E: Iterator<Item = T> + ExactSizeIterator,
@ -449,18 +596,194 @@ fn handle_write<T, E>(
col_data.resize(data_offset + row_count, Default::default());
let initial_total_count = stats.total_count;
let to_add = entry_data.len();
let null_count = row_count - to_add;
let mut added = 0;
for (idx, value) in iter_set_positions(valid_mask).zip(entry_data) {
for (idx, value) in MaskedIter::new(iter_set_positions(valid_mask), entry_data, inclusion_mask)
{
stats.update(&value);
col_data[data_offset + idx] = value;
added += 1;
}
let null_count = row_count - added;
stats.update_for_nulls(null_count as u64);
assert_eq!(
stats.total_count - initial_total_count - null_count as u64,
to_add as u64
added as u64
);
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_masked_iterator_empty() {
// empty, no mask
let actual: Vec<(usize, u32)> = MaskedIter::new(
IntoIterator::into_iter([]),
IntoIterator::into_iter([]),
None,
)
.collect();
let expected: Vec<(usize, u32)> = vec![];
assert_eq!(actual, expected);
// empty, w/ mask
let actual: Vec<(usize, u32)> = MaskedIter::new(
IntoIterator::into_iter([]),
IntoIterator::into_iter([]),
Some(&[]),
)
.collect();
let expected: Vec<(usize, u32)> = vec![];
assert_eq!(actual, expected);
}
#[test]
fn test_masked_iterator_no_nulls() {
// data w/o nulls, no mask
let actual: Vec<(usize, u32)> = MaskedIter::new(
IntoIterator::into_iter([0, 1, 2]),
IntoIterator::into_iter([1, 2, 3]),
None,
)
.collect();
let expected: Vec<(usize, u32)> = vec![(0, 1), (1, 2), (2, 3)];
assert_eq!(actual, expected);
// data w/o nulls, all-true mask
let actual: Vec<(usize, u32)> = MaskedIter::new(
IntoIterator::into_iter([0, 1, 2]),
IntoIterator::into_iter([1, 2, 3]),
Some(&[true, true, true]),
)
.collect();
let expected: Vec<(usize, u32)> = vec![(0, 1), (1, 2), (2, 3)];
assert_eq!(actual, expected);
// data w/o nulls, all-false mask
let actual: Vec<(usize, u32)> = MaskedIter::new(
IntoIterator::into_iter([0, 1, 2]),
IntoIterator::into_iter([1, 2, 3]),
Some(&[false, false, false]),
)
.collect();
let expected: Vec<(usize, u32)> = vec![];
assert_eq!(actual, expected);
// data w/o nulls, true-true-false mask
let actual: Vec<(usize, u32)> = MaskedIter::new(
IntoIterator::into_iter([0, 1, 2]),
IntoIterator::into_iter([1, 2, 3]),
Some(&[true, true, false]),
)
.collect();
let expected: Vec<(usize, u32)> = vec![(0, 1), (1, 2)];
assert_eq!(actual, expected);
// data w/o nulls, true-false-true mask
let actual: Vec<(usize, u32)> = MaskedIter::new(
IntoIterator::into_iter([0, 1, 2]),
IntoIterator::into_iter([1, 2, 3]),
Some(&[true, false, true]),
)
.collect();
let expected: Vec<(usize, u32)> = vec![(0, 1), (1, 3)];
assert_eq!(actual, expected);
// data w/o nulls, false-true-false mask
let actual: Vec<(usize, u32)> = MaskedIter::new(
IntoIterator::into_iter([0, 1, 2]),
IntoIterator::into_iter([1, 2, 3]),
Some(&[false, true, false]),
)
.collect();
let expected: Vec<(usize, u32)> = vec![(0, 2)];
assert_eq!(actual, expected);
}
#[test]
fn test_masked_iterator_nulls() {
// data w/ nulls, no mask
let actual: Vec<(usize, u32)> = MaskedIter::new(
IntoIterator::into_iter([0, 2, 4]),
IntoIterator::into_iter([1, 2, 3]),
None,
)
.collect();
let expected: Vec<(usize, u32)> = vec![(0, 1), (2, 2), (4, 3)];
assert_eq!(actual, expected);
// data w/ nulls, all-true mask
let actual: Vec<(usize, u32)> = MaskedIter::new(
IntoIterator::into_iter([0, 2, 4]),
IntoIterator::into_iter([1, 2, 3]),
Some(&[true, true, true, true, true]),
)
.collect();
let expected: Vec<(usize, u32)> = vec![(0, 1), (2, 2), (4, 3)];
assert_eq!(actual, expected);
// data w/ nulls, all-false mask
let actual: Vec<(usize, u32)> = MaskedIter::new(
IntoIterator::into_iter([0, 2, 4]),
IntoIterator::into_iter([1, 2, 3]),
Some(&[false, false, false, false, false]),
)
.collect();
let expected: Vec<(usize, u32)> = vec![];
assert_eq!(actual, expected);
// data w/ nulls, true-true-true-true-false mask
let actual: Vec<(usize, u32)> = MaskedIter::new(
IntoIterator::into_iter([0, 2, 4]),
IntoIterator::into_iter([1, 2, 3]),
Some(&[true, true, true, true, false]),
)
.collect();
let expected: Vec<(usize, u32)> = vec![(0, 1), (2, 2)];
assert_eq!(actual, expected);
// data w/ nulls, true-true-false-true-true mask
let actual: Vec<(usize, u32)> = MaskedIter::new(
IntoIterator::into_iter([0, 2, 4]),
IntoIterator::into_iter([1, 2, 3]),
Some(&[true, true, false, true, true]),
)
.collect();
let expected: Vec<(usize, u32)> = vec![(0, 1), (3, 3)];
assert_eq!(actual, expected);
// data w/ nulls, true-false-false-true-true mask
let actual: Vec<(usize, u32)> = MaskedIter::new(
IntoIterator::into_iter([0, 2, 4]),
IntoIterator::into_iter([1, 2, 3]),
Some(&[true, false, false, true, true]),
)
.collect();
let expected: Vec<(usize, u32)> = vec![(0, 1), (2, 3)];
assert_eq!(actual, expected);
// data w/ nulls, false-true-true-true-false mask
let actual: Vec<(usize, u32)> = MaskedIter::new(
IntoIterator::into_iter([0, 2, 4]),
IntoIterator::into_iter([1, 2, 3]),
Some(&[false, true, true, true, false]),
)
.collect();
let expected: Vec<(usize, u32)> = vec![(1, 2)];
assert_eq!(actual, expected);
// data w/ nulls, false-false-true-false-false mask
let actual: Vec<(usize, u32)> = MaskedIter::new(
IntoIterator::into_iter([0, 2, 4]),
IntoIterator::into_iter([1, 2, 3]),
Some(&[false, false, true, false, false]),
)
.collect();
let expected: Vec<(usize, u32)> = vec![(0, 2)];
assert_eq!(actual, expected);
}
}

View File

@ -1191,14 +1191,15 @@ impl Db {
/// 2. the partition key
/// 3. the table batch (which also contains the table name)
///
/// It shall return `true` if the batch should be stored and `false` otherwise.
/// It shall return `(true, _)` if the batch should be stored and `(false, _)` otherwise. In the first case the
/// second element in the tuple is a row-wise mask. If it is provided only rows marked with `true` are stored.
pub fn store_sequenced_entry<F>(
&self,
sequenced_entry: Arc<SequencedEntry>,
filter_table_batch: F,
) -> Result<()>
where
F: Fn(Option<&Sequence>, &str, &TableBatch<'_>) -> bool,
F: Fn(Option<&Sequence>, &str, &TableBatch<'_>) -> (bool, Option<Vec<bool>>),
{
// Get all needed database rule values, then release the lock
let rules = self.rules.read();
@ -1242,7 +1243,9 @@ impl Db {
None => continue,
};
if !filter_table_batch(sequence, partition_key, &table_batch) {
let (store_batch, mask) =
filter_table_batch(sequence, partition_key, &table_batch);
if !store_batch {
continue;
}
@ -1301,8 +1304,9 @@ impl Db {
let mb_chunk =
chunk.mutable_buffer().expect("cannot mutate open chunk");
if let Err(e) =
mb_chunk.write_table_batch(table_batch).context(WriteEntry {
if let Err(e) = mb_chunk
.write_table_batch(table_batch, mask.as_ref().map(|x| x.as_ref()))
.context(WriteEntry {
partition_key,
chunk_id,
})
@ -1320,9 +1324,12 @@ impl Db {
self.metric_attributes.clone(),
);
let chunk_result =
MBChunk::new(MutableBufferChunkMetrics::new(&metrics), table_batch)
.context(WriteEntryInitial { partition_key });
let chunk_result = MBChunk::new(
MutableBufferChunkMetrics::new(&metrics),
table_batch,
mask.as_ref().map(|x| x.as_ref()),
)
.context(WriteEntryInitial { partition_key });
match chunk_result {
Ok(mb_chunk) => {
@ -1391,8 +1398,8 @@ fn filter_table_batch_keep_all(
_sequence: Option<&Sequence>,
_partition_key: &str,
_batch: &TableBatch<'_>,
) -> bool {
true
) -> (bool, Option<Vec<bool>>) {
(true, None)
}
#[async_trait]

View File

@ -343,6 +343,7 @@ mod tests {
let mb_chunk = mutable_buffer::chunk::MBChunk::new(
mutable_buffer::chunk::ChunkMetrics::new_unregistered(),
batch,
None,
)
.unwrap();

View File

@ -1116,7 +1116,7 @@ mod tests {
let write = entry.partition_writes().unwrap().remove(0);
let batch = write.table_batches().remove(0);
MBChunk::new(MBChunkMetrics::new_unregistered(), batch).unwrap()
MBChunk::new(MBChunkMetrics::new_unregistered(), batch, None).unwrap()
}
async fn make_parquet_chunk(addr: ChunkAddr) -> ParquetChunk {

View File

@ -312,12 +312,12 @@ fn filter_entry(
partition_key: &str,
table_batch: &TableBatch<'_>,
replay_plan: &ReplayPlan,
) -> bool {
) -> (bool, Option<Vec<bool>>) {
let sequence = sequence.expect("write buffer results must be sequenced");
let table_name = table_batch.name();
// Check if we have a partition checkpoint that contains data for this specific sequencer
let flush_ts_and_sequence_range = replay_plan
let min_unpersisted_ts_and_sequence_range = replay_plan
.last_partition_checkpoint(table_name, partition_key)
.map(|partition_checkpoint| {
partition_checkpoint
@ -326,21 +326,28 @@ fn filter_entry(
})
.flatten();
match flush_ts_and_sequence_range {
Some((_ts, min_max)) => {
match min_unpersisted_ts_and_sequence_range {
Some((min_unpersisted_ts, min_max)) => {
// Figure out what the sequence number tells us about the entire batch
match SequenceNumberSection::compare(sequence.number, min_max) {
SequenceNumberSection::Persisted => {
// skip the entire batch
false
(false, None)
}
SequenceNumberSection::PartiallyPersisted => {
// TODO: implement row filtering, for now replay the entire batch
true
let maybe_mask = table_batch.timestamps().ok().map(|timestamps| {
let min_unpersisted_ts = min_unpersisted_ts.timestamp_nanos();
timestamps
.into_iter()
.map(|ts_row| ts_row >= min_unpersisted_ts)
.collect::<Vec<bool>>()
});
(true, maybe_mask)
}
SequenceNumberSection::Unpersisted => {
// replay entire batch
true
(true, None)
}
}
}
@ -350,7 +357,7 @@ fn filter_entry(
// - Unknown sequencer (at least from the partitions point of view).
//
// => Replay full batch.
true
(true, None)
}
}
}
@ -1760,6 +1767,115 @@ mod tests {
.await
}
#[tokio::test]
async fn replay_prune_rows() {
ReplayTest {
steps: vec![
Step::Ingest(vec![
TestSequencedEntry {
sequencer_id: 0,
sequence_number: 0,
lp: "table_1,tag_partition_by=a,tag=1 bar=10 10",
},
]),
Step::Await(vec![
Check::Query(
"select * from table_1 order by bar",
vec![
"+-----+-----+------------------+--------------------------------+",
"| bar | tag | tag_partition_by | time |",
"+-----+-----+------------------+--------------------------------+",
"| 10 | 1 | a | 1970-01-01T00:00:00.000000010Z |",
"+-----+-----+------------------+--------------------------------+",
],
),
]),
Step::MakeWritesPersistable,
Step::Ingest(vec![
TestSequencedEntry {
sequencer_id: 0,
sequence_number: 1,
// same time as first entry in that partition but different tag + some later data
lp: "table_1,tag_partition_by=a,tag=2 bar=20 10\ntable_1,tag_partition_by=a,tag=3 bar=30 30",
},
]),
Step::Await(vec![
Check::Query(
"select * from table_1 order by bar",
vec![
"+-----+-----+------------------+--------------------------------+",
"| bar | tag | tag_partition_by | time |",
"+-----+-----+------------------+--------------------------------+",
"| 10 | 1 | a | 1970-01-01T00:00:00.000000010Z |",
"| 20 | 2 | a | 1970-01-01T00:00:00.000000010Z |",
"| 30 | 3 | a | 1970-01-01T00:00:00.000000030Z |",
"+-----+-----+------------------+--------------------------------+",
],
),
]),
Step::Persist(vec![("table_1", "tag_partition_by_a")]),
Step::Assert(vec![
// chunks do not overlap
Check::Query(
"select storage, min_value, max_value, row_count from system.chunk_columns where column_name = 'time' order by min_value, storage",
vec![
"+--------------------------+-----------+-----------+-----------+",
"| storage | min_value | max_value | row_count |",
"+--------------------------+-----------+-----------+-----------+",
"| ReadBufferAndObjectStore | 10 | 10 | 2 |",
"| ReadBuffer | 30 | 30 | 1 |",
"+--------------------------+-----------+-----------+-----------+",
],
),
]),
Step::Restart,
Step::Assert(vec![
Check::Query(
"select * from table_1 order by bar",
vec![
"+-----+-----+------------------+--------------------------------+",
"| bar | tag | tag_partition_by | time |",
"+-----+-----+------------------+--------------------------------+",
"| 10 | 1 | a | 1970-01-01T00:00:00.000000010Z |",
"| 20 | 2 | a | 1970-01-01T00:00:00.000000010Z |",
"+-----+-----+------------------+--------------------------------+",
],
),
]),
Step::Replay,
Step::Assert(vec![
Check::Query(
"select * from table_1 order by bar",
vec![
"+-----+-----+------------------+--------------------------------+",
"| bar | tag | tag_partition_by | time |",
"+-----+-----+------------------+--------------------------------+",
"| 10 | 1 | a | 1970-01-01T00:00:00.000000010Z |",
"| 20 | 2 | a | 1970-01-01T00:00:00.000000010Z |",
"| 30 | 3 | a | 1970-01-01T00:00:00.000000030Z |",
"+-----+-----+------------------+--------------------------------+",
],
),
// chunks do not overlap
Check::Query(
"select storage, min_value, max_value, row_count from system.chunk_columns where column_name = 'time' order by min_value, storage",
vec![
"+-------------------+-----------+-----------+-----------+",
"| storage | min_value | max_value | row_count |",
"+-------------------+-----------+-----------+-----------+",
"| ObjectStoreOnly | 10 | 10 | 2 |",
"| OpenMutableBuffer | 30 | 30 | 1 |",
"+-------------------+-----------+-----------+-----------+",
],
),
]),
],
..Default::default()
}
.run()
.await
}
#[tokio::test]
async fn replay_works_with_checkpoints_all_full_persisted_1() {
ReplayTest {

View File

@ -23,11 +23,12 @@ fn chunk(count: usize) -> MBChunk {
for batch in write.table_batches() {
match chunk {
Some(ref mut c) => {
c.write_table_batch(batch).unwrap();
c.write_table_batch(batch, None).unwrap();
}
None => {
chunk = Some(
MBChunk::new(ChunkMetrics::new_unregistered(), batch).unwrap(),
MBChunk::new(ChunkMetrics::new_unregistered(), batch, None)
.unwrap(),
);
}
}

View File

@ -17,11 +17,12 @@ fn write_chunk(count: usize, entries: &[Entry]) {
for batch in write.table_batches() {
match chunk {
Some(ref mut c) => {
c.write_table_batch(batch).unwrap();
c.write_table_batch(batch, None).unwrap();
}
None => {
chunk = Some(
MBChunk::new(ChunkMetrics::new_unregistered(), batch).unwrap(),
MBChunk::new(ChunkMetrics::new_unregistered(), batch, None)
.unwrap(),
);
}
}