diff --git a/mutable_buffer/src/chunk.rs b/mutable_buffer/src/chunk.rs index 554f39e52a..b3bad788b1 100644 --- a/mutable_buffer/src/chunk.rs +++ b/mutable_buffer/src/chunk.rs @@ -41,6 +41,9 @@ pub enum Error { #[snafu(display("Column not found: {}", column))] ColumnNotFound { column: String }, + + #[snafu(display("Mask had {} rows, expected {}", expected, actual))] + IncorrectMaskLength { expected: usize, actual: usize }, } pub type Result = std::result::Result; @@ -88,7 +91,13 @@ pub struct MBChunk { impl MBChunk { /// Create a new batch and write the contents of the [`TableBatch`] into it. Chunks /// shouldn't exist without some data. - pub fn new(metrics: ChunkMetrics, batch: TableBatch<'_>) -> Result { + /// + /// If `mask` is provided, only entries that are marked w/ `true` are written. + pub fn new( + metrics: ChunkMetrics, + batch: TableBatch<'_>, + mask: Option<&[bool]>, + ) -> Result { let table_name = Arc::from(batch.name()); let mut chunk = Self { @@ -99,15 +108,21 @@ impl MBChunk { }; let columns = batch.columns(); - chunk.write_columns(columns)?; + chunk.write_columns(columns, mask)?; Ok(chunk) } /// Write the contents of a [`TableBatch`] into this Chunk. /// + /// If `mask` is provided, only entries that are marked w/ `true` are written. + /// /// Panics if the batch specifies a different name for the table in this Chunk - pub fn write_table_batch(&mut self, batch: TableBatch<'_>) -> Result<()> { + pub fn write_table_batch( + &mut self, + batch: TableBatch<'_>, + mask: Option<&[bool]>, + ) -> Result<()> { let table_name = batch.name(); assert_eq!( table_name, @@ -115,7 +130,7 @@ impl MBChunk { "can only insert table batch for a single table to chunk" ); - self.write_columns(batch.columns())?; + self.write_columns(batch.columns(), mask)?; // Invalidate chunk snapshot *self @@ -273,10 +288,28 @@ impl MBChunk { /// Validates the schema of the passed in columns, then adds their values to /// the associated columns in the table and updates summary statistics. - fn write_columns(&mut self, columns: Vec>) -> Result<()> { + /// + /// If `mask` is provided, only entries that are marked w/ `true` are written. + fn write_columns( + &mut self, + columns: Vec>, + mask: Option<&[bool]>, + ) -> Result<()> { let row_count_before_insert = self.rows(); let additional_rows = columns.first().map(|x| x.row_count).unwrap_or_default(); - let final_row_count = row_count_before_insert + additional_rows; + let masked_values = if let Some(mask) = mask { + ensure!( + additional_rows == mask.len(), + IncorrectMaskLength { + expected: additional_rows, + actual: mask.len(), + } + ); + mask.iter().filter(|x| !*x).count() + } else { + 0 + }; + let final_row_count = row_count_before_insert + additional_rows - masked_values; // get the column ids and validate schema for those that already exist columns.iter().try_for_each(|column| { @@ -313,7 +346,7 @@ impl MBChunk { }) .1; - column.append(&fb_column).context(ColumnError { + column.append(&fb_column, mask).context(ColumnError { column: fb_column.name(), })?; @@ -353,7 +386,7 @@ pub mod test_helpers { ); for batch in table_batches { - chunk.write_table_batch(batch)?; + chunk.write_table_batch(batch, None)?; } } @@ -378,9 +411,9 @@ pub mod test_helpers { for batch in table_batches { match chunk { - Some(ref mut c) => c.write_table_batch(batch)?, + Some(ref mut c) => c.write_table_batch(batch, None)?, None => { - chunk = Some(MBChunk::new(ChunkMetrics::new_unregistered(), batch)?); + chunk = Some(MBChunk::new(ChunkMetrics::new_unregistered(), batch, None)?); } } } @@ -403,7 +436,7 @@ mod tests { }; use entry::test_helpers::lp_to_entry; use internal_types::schema::{InfluxColumnType, InfluxFieldType}; - use std::num::NonZeroU64; + use std::{convert::TryFrom, num::NonZeroU64, vec}; #[test] fn writes_table_batches() { @@ -933,6 +966,7 @@ mod tests { .first() .unwrap() .columns(), + None, ) .err() .unwrap(); @@ -964,6 +998,7 @@ mod tests { .first() .unwrap() .columns(), + None, ) .err() .unwrap(); @@ -995,6 +1030,7 @@ mod tests { .first() .unwrap() .columns(), + None, ) .err() .unwrap(); @@ -1026,6 +1062,7 @@ mod tests { .first() .unwrap() .columns(), + None, ) .err() .unwrap(); @@ -1057,6 +1094,7 @@ mod tests { .first() .unwrap() .columns(), + None, ) .err() .unwrap(); @@ -1088,6 +1126,7 @@ mod tests { .first() .unwrap() .columns(), + None, ) .err() .unwrap(); @@ -1106,4 +1145,167 @@ mod tests { response ); } + + #[test] + fn test_mask() { + let mut entries = vec![]; + let mut masks = vec![]; + + let lp = [ + "table,tag=a float_field=1.1,int_field=11i,uint_field=111u,bool_field=t,string_field=\"axx\" 100", + "table,tag=b float_field=2.2,int_field=22i,uint_field=222u,bool_field=f,string_field=\"bxx\" 200", + "table,tag=c float_field=3.3,int_field=33i,uint_field=333u,bool_field=f,string_field=\"cxx\" 300", + "table,tag=d float_field=4.4,int_field=44i,uint_field=444u,bool_field=t,string_field=\"dxx\" 400", + ].join("\n"); + masks.push(vec![false, true, true, false]); + entries.push(lp_to_entry(&lp)); + + let lp = [ + "table,tag=e float_field=5.5,int_field=55i,uint_field=555u,bool_field=f,string_field=\"exx\" 500", + "table,tag=f float_field=6.6,int_field=66i,uint_field=666u,bool_field=t,string_field=\"fxx\" 600", + "table foo=1 700", + "table foo=2 800", + "table foo=3 900", + ].join("\n"); + masks.push(vec![true, false, true, false, false]); + entries.push(lp_to_entry(&lp)); + + let mut chunk: Option = None; + for (entry, mask) in entries.into_iter().zip(masks.into_iter()) { + for w in entry.partition_writes().unwrap() { + for batch in w.table_batches() { + match chunk { + Some(ref mut c) => c.write_table_batch(batch, Some(mask.as_ref())).unwrap(), + None => { + chunk = Some( + MBChunk::new( + ChunkMetrics::new_unregistered(), + batch, + Some(mask.as_ref()), + ) + .unwrap(), + ); + } + } + } + } + } + let chunk = chunk.unwrap(); + + let expected = ColumnSummary { + name: "float_field".into(), + influxdb_type: Some(InfluxDbType::Field), + stats: Statistics::F64(StatValues { + min: Some(2.2), + max: Some(5.5), + total_count: 4, + null_count: 1, + distinct_count: None, + }), + }; + assert_summary_eq!(expected, chunk, "float_field"); + + let expected = ColumnSummary { + name: "int_field".into(), + influxdb_type: Some(InfluxDbType::Field), + stats: Statistics::I64(StatValues { + min: Some(22), + max: Some(55), + total_count: 4, + null_count: 1, + distinct_count: None, + }), + }; + assert_summary_eq!(expected, chunk, "int_field"); + + let expected = ColumnSummary { + name: "uint_field".into(), + influxdb_type: Some(InfluxDbType::Field), + stats: Statistics::U64(StatValues { + min: Some(222), + max: Some(555), + total_count: 4, + null_count: 1, + distinct_count: None, + }), + }; + assert_summary_eq!(expected, chunk, "uint_field"); + + let expected = ColumnSummary { + name: "bool_field".into(), + influxdb_type: Some(InfluxDbType::Field), + stats: Statistics::Bool(StatValues { + min: Some(false), + max: Some(false), + total_count: 4, + null_count: 1, + distinct_count: None, + }), + }; + assert_summary_eq!(expected, chunk, "bool_field"); + + let expected = ColumnSummary { + name: "string_field".into(), + influxdb_type: Some(InfluxDbType::Field), + stats: Statistics::String(StatValues { + min: Some("bxx".into()), + max: Some("exx".into()), + total_count: 4, + null_count: 1, + distinct_count: None, + }), + }; + assert_summary_eq!(expected, chunk, "string_field"); + + let expected = ColumnSummary { + name: "time".into(), + influxdb_type: Some(InfluxDbType::Timestamp), + stats: Statistics::I64(StatValues { + min: Some(200), + max: Some(700), + total_count: 4, + null_count: 0, + distinct_count: None, + }), + }; + assert_summary_eq!(expected, chunk, "time"); + + let expected = ColumnSummary { + name: "tag".into(), + influxdb_type: Some(InfluxDbType::Tag), + stats: Statistics::String(StatValues { + min: Some("b".into()), + max: Some("e".into()), + total_count: 4, + null_count: 1, + distinct_count: Some(NonZeroU64::try_from(4).unwrap()), + }), + }; + assert_summary_eq!(expected, chunk, "tag"); + } + + #[test] + fn test_mask_wrong_length() { + let lp = [ + "table,tag=a float_field=1.1,int_field=11i,uint_field=111u,bool_field=t,string_field=\"axx\" 100", + "table,tag=b float_field=2.2,int_field=22i,uint_field=222u,bool_field=f,string_field=\"bxx\" 200", + ].join("\n"); + let entry = lp_to_entry(&lp); + let partition_write = entry.partition_writes().unwrap().pop().unwrap(); + let mask = vec![false, true, true, false]; + + let batch = partition_write.table_batches().pop().unwrap(); + let err = + MBChunk::new(ChunkMetrics::new_unregistered(), batch, Some(mask.as_ref())).unwrap_err(); + assert!(matches!(err, Error::IncorrectMaskLength { .. })); + + let batch = partition_write.table_batches().pop().unwrap(); + let mut chunk = MBChunk::new(ChunkMetrics::new_unregistered(), batch, None).unwrap(); + + let batch = partition_write.table_batches().pop().unwrap(); + let err = chunk + .write_table_batch(batch, Some(mask.as_ref())) + .unwrap_err(); + assert!(matches!(err, Error::IncorrectMaskLength { .. })); + } } diff --git a/mutable_buffer/src/column.rs b/mutable_buffer/src/column.rs index c398f1161b..d796195925 100644 --- a/mutable_buffer/src/column.rs +++ b/mutable_buffer/src/column.rs @@ -1,6 +1,7 @@ -use std::convert::TryInto; +use std::iter::Enumerate; use std::mem; use std::sync::Arc; +use std::{convert::TryInto, iter::Zip}; use arrow::{ array::{ @@ -125,15 +126,25 @@ impl Column { self.influx_type } - pub fn append(&mut self, entry: &EntryColumn<'_>) -> Result<()> { + pub fn append( + &mut self, + entry: &EntryColumn<'_>, + inclusion_mask: Option<&[bool]>, + ) -> Result<()> { self.validate_schema(entry)?; - let row_count = entry.row_count; + let masked_values = if let Some(mask) = inclusion_mask { + assert_eq!(entry.row_count, mask.len()); + mask.iter().map(|x| !x as usize).sum::() + } else { + 0 + }; + let row_count = entry.row_count - masked_values; if row_count == 0 { return Ok(()); } - let mask = construct_valid_mask(entry)?; + let valid_mask = construct_valid_mask(entry)?; match &mut self.data { ColumnData::Bool(col_data, stats) => { @@ -148,21 +159,28 @@ impl Column { col_data.append_unset(row_count); let initial_total_count = stats.total_count; - let to_add = entry_data.len(); - let null_count = row_count - to_add; + let mut added = 0; - for (idx, value) in iter_set_positions(&mask).zip(entry_data) { + for (idx, value) in MaskedIter::new( + iter_set_positions(&valid_mask), + entry_data.iter(), + inclusion_mask, + ) { stats.update(value); if *value { col_data.set(data_offset + idx); } + + added += 1; } + + let null_count = row_count - added; stats.update_for_nulls(null_count as u64); assert_eq!( stats.total_count - initial_total_count - null_count as u64, - to_add as u64 + added as u64 ); } ColumnData::U64(col_data, stats) => { @@ -174,7 +192,14 @@ impl Column { .expect("invalid payload") .into_iter(); - handle_write(row_count, &mask, entry_data, col_data, stats); + handle_write( + row_count, + &valid_mask, + entry_data, + col_data, + stats, + inclusion_mask, + ); } ColumnData::F64(col_data, stats) => { let entry_data = entry @@ -185,7 +210,14 @@ impl Column { .expect("invalid payload") .into_iter(); - handle_write(row_count, &mask, entry_data, col_data, stats); + handle_write( + row_count, + &valid_mask, + entry_data, + col_data, + stats, + inclusion_mask, + ); } ColumnData::I64(col_data, stats) => { let entry_data = entry @@ -196,7 +228,14 @@ impl Column { .expect("invalid payload") .into_iter(); - handle_write(row_count, &mask, entry_data, col_data, stats); + handle_write( + row_count, + &valid_mask, + entry_data, + col_data, + stats, + inclusion_mask, + ); } ColumnData::String(col_data, stats) => { let entry_data = entry @@ -208,21 +247,26 @@ impl Column { let data_offset = col_data.len(); let initial_total_count = stats.total_count; - let to_add = entry_data.len(); - let null_count = row_count - to_add; + let mut added = 0; - for (str, idx) in entry_data.iter().zip(iter_set_positions(&mask)) { + for (idx, str) in MaskedIter::new( + iter_set_positions(&valid_mask), + entry_data.iter(), + inclusion_mask, + ) { col_data.extend(data_offset + idx - col_data.len()); stats.update(str); col_data.append(str); + added += 1; } - col_data.extend(data_offset + row_count - col_data.len()); + + let null_count = row_count - added; stats.update_for_nulls(null_count as u64); assert_eq!( stats.total_count - initial_total_count - null_count as u64, - to_add as u64 + added as u64 ); } ColumnData::Tag(col_data, dictionary, stats) => { @@ -237,23 +281,29 @@ impl Column { col_data.resize(data_offset + row_count, INVALID_DID); let initial_total_count = stats.total_count; - let to_add = entry_data.len(); - let null_count = row_count - to_add; + let mut added = 0; - for (idx, value) in iter_set_positions(&mask).zip(entry_data) { + for (idx, value) in MaskedIter::new( + iter_set_positions(&valid_mask), + entry_data.iter(), + inclusion_mask, + ) { stats.update(value); col_data[data_offset + idx] = dictionary.lookup_value_or_insert(value); + added += 1; } + + let null_count = row_count - added; stats.update_for_nulls(null_count as u64); assert_eq!( stats.total_count - initial_total_count - null_count as u64, - to_add as u64 + added as u64 ); } }; - self.valid.append_bits(entry.row_count, &mask); + self.valid.append_bits(row_count, &valid_mask); Ok(()) } @@ -404,6 +454,102 @@ impl Column { } } +/// Iterator that masks out set positions and data. +/// +/// The iterator outputs set position and data. It assumes that set positions are increasing. +/// +/// If no mask is provided set positions and data will just be zipped. +/// +/// If a mask is provided, only elements that are marked as `true` are considered. Note that this is independent of the +/// inclusion or exclusion via set positions. Set positions are shifted to accommodate for excluded data. Here is an +/// example input: +/// +/// | Mask | Set Positions | Data | +/// | ----- | ------------- | ---- | +/// | false | | | +/// | true | | | +/// | false | 2 | a | +/// | true | 3 | b | +/// | true | | | +/// | true | 5 | c | +/// | true | 6 | d | +/// | false | | | +/// | true | 8 | e | +/// +/// This results in the following output: +/// +/// | Set Positions | Data | +/// | ------------- | ---- | +/// | 1 | b | +/// | 3 | c | +/// | 4 | d | +/// | 5 | e | +struct MaskedIter<'a, I1, I2> +where + I1: Iterator, + I2: Iterator, +{ + /// Zipped iterator yielding unshifted (aka w/o accounting for excluded items) set positions and data. + it: Zip, + + /// If the mask was provided this includes an enumerated iterator over the mask. The enumeration is used to align + /// the mask with the set positions. + mask: Option>>, + + /// Number of excluded items up to the current point. + exluded_count: usize, +} + +impl<'a, I1, I2> MaskedIter<'a, I1, I2> +where + I1: Iterator, + I2: Iterator, +{ + fn new(it_set_positions: I1, it_data: I2, mask: Option<&'a [bool]>) -> Self { + Self { + it: it_set_positions.zip(it_data), + mask: mask.map(|mask| mask.iter().enumerate()), + exluded_count: 0, + } + } +} + +impl<'a, I1, I2> Iterator for MaskedIter<'a, I1, I2> +where + I1: Iterator, + I2: Iterator, +{ + type Item = (usize, I2::Item); + + fn next(&mut self) -> Option { + if let Some(mask) = self.mask.as_mut() { + for (set_position, value) in &mut self.it { + loop { + let (idx_mask, included) = mask.next().expect("inclusion mask too short"); + if !included { + self.exluded_count += 1; + } + if idx_mask == set_position { + if *included { + let set_position = set_position + .checked_sub(self.exluded_count) + .expect("set positions broken"); + return Some((set_position, value)); + } else { + // exclude this value + break; + } + } + } + } + + None + } else { + self.it.next() + } + } +} + /// Construct a validity mask from the given column's null mask fn construct_valid_mask(column: &EntryColumn<'_>) -> Result> { let buf_len = (column.row_count + 7) >> 3; @@ -441,6 +587,7 @@ fn handle_write( entry_data: E, col_data: &mut Vec, stats: &mut StatValues, + inclusion_mask: Option<&[bool]>, ) where T: Clone + Default + PartialOrd + IsNan, E: Iterator + ExactSizeIterator, @@ -449,18 +596,194 @@ fn handle_write( col_data.resize(data_offset + row_count, Default::default()); let initial_total_count = stats.total_count; - let to_add = entry_data.len(); - let null_count = row_count - to_add; + let mut added = 0; - for (idx, value) in iter_set_positions(valid_mask).zip(entry_data) { + for (idx, value) in MaskedIter::new(iter_set_positions(valid_mask), entry_data, inclusion_mask) + { stats.update(&value); col_data[data_offset + idx] = value; + added += 1; } + let null_count = row_count - added; stats.update_for_nulls(null_count as u64); assert_eq!( stats.total_count - initial_total_count - null_count as u64, - to_add as u64 + added as u64 ); } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_masked_iterator_empty() { + // empty, no mask + let actual: Vec<(usize, u32)> = MaskedIter::new( + IntoIterator::into_iter([]), + IntoIterator::into_iter([]), + None, + ) + .collect(); + let expected: Vec<(usize, u32)> = vec![]; + assert_eq!(actual, expected); + + // empty, w/ mask + let actual: Vec<(usize, u32)> = MaskedIter::new( + IntoIterator::into_iter([]), + IntoIterator::into_iter([]), + Some(&[]), + ) + .collect(); + let expected: Vec<(usize, u32)> = vec![]; + assert_eq!(actual, expected); + } + + #[test] + fn test_masked_iterator_no_nulls() { + // data w/o nulls, no mask + let actual: Vec<(usize, u32)> = MaskedIter::new( + IntoIterator::into_iter([0, 1, 2]), + IntoIterator::into_iter([1, 2, 3]), + None, + ) + .collect(); + let expected: Vec<(usize, u32)> = vec![(0, 1), (1, 2), (2, 3)]; + assert_eq!(actual, expected); + + // data w/o nulls, all-true mask + let actual: Vec<(usize, u32)> = MaskedIter::new( + IntoIterator::into_iter([0, 1, 2]), + IntoIterator::into_iter([1, 2, 3]), + Some(&[true, true, true]), + ) + .collect(); + let expected: Vec<(usize, u32)> = vec![(0, 1), (1, 2), (2, 3)]; + assert_eq!(actual, expected); + + // data w/o nulls, all-false mask + let actual: Vec<(usize, u32)> = MaskedIter::new( + IntoIterator::into_iter([0, 1, 2]), + IntoIterator::into_iter([1, 2, 3]), + Some(&[false, false, false]), + ) + .collect(); + let expected: Vec<(usize, u32)> = vec![]; + assert_eq!(actual, expected); + + // data w/o nulls, true-true-false mask + let actual: Vec<(usize, u32)> = MaskedIter::new( + IntoIterator::into_iter([0, 1, 2]), + IntoIterator::into_iter([1, 2, 3]), + Some(&[true, true, false]), + ) + .collect(); + let expected: Vec<(usize, u32)> = vec![(0, 1), (1, 2)]; + assert_eq!(actual, expected); + + // data w/o nulls, true-false-true mask + let actual: Vec<(usize, u32)> = MaskedIter::new( + IntoIterator::into_iter([0, 1, 2]), + IntoIterator::into_iter([1, 2, 3]), + Some(&[true, false, true]), + ) + .collect(); + let expected: Vec<(usize, u32)> = vec![(0, 1), (1, 3)]; + assert_eq!(actual, expected); + + // data w/o nulls, false-true-false mask + let actual: Vec<(usize, u32)> = MaskedIter::new( + IntoIterator::into_iter([0, 1, 2]), + IntoIterator::into_iter([1, 2, 3]), + Some(&[false, true, false]), + ) + .collect(); + let expected: Vec<(usize, u32)> = vec![(0, 2)]; + assert_eq!(actual, expected); + } + + #[test] + fn test_masked_iterator_nulls() { + // data w/ nulls, no mask + let actual: Vec<(usize, u32)> = MaskedIter::new( + IntoIterator::into_iter([0, 2, 4]), + IntoIterator::into_iter([1, 2, 3]), + None, + ) + .collect(); + let expected: Vec<(usize, u32)> = vec![(0, 1), (2, 2), (4, 3)]; + assert_eq!(actual, expected); + + // data w/ nulls, all-true mask + let actual: Vec<(usize, u32)> = MaskedIter::new( + IntoIterator::into_iter([0, 2, 4]), + IntoIterator::into_iter([1, 2, 3]), + Some(&[true, true, true, true, true]), + ) + .collect(); + let expected: Vec<(usize, u32)> = vec![(0, 1), (2, 2), (4, 3)]; + assert_eq!(actual, expected); + + // data w/ nulls, all-false mask + let actual: Vec<(usize, u32)> = MaskedIter::new( + IntoIterator::into_iter([0, 2, 4]), + IntoIterator::into_iter([1, 2, 3]), + Some(&[false, false, false, false, false]), + ) + .collect(); + let expected: Vec<(usize, u32)> = vec![]; + assert_eq!(actual, expected); + + // data w/ nulls, true-true-true-true-false mask + let actual: Vec<(usize, u32)> = MaskedIter::new( + IntoIterator::into_iter([0, 2, 4]), + IntoIterator::into_iter([1, 2, 3]), + Some(&[true, true, true, true, false]), + ) + .collect(); + let expected: Vec<(usize, u32)> = vec![(0, 1), (2, 2)]; + assert_eq!(actual, expected); + + // data w/ nulls, true-true-false-true-true mask + let actual: Vec<(usize, u32)> = MaskedIter::new( + IntoIterator::into_iter([0, 2, 4]), + IntoIterator::into_iter([1, 2, 3]), + Some(&[true, true, false, true, true]), + ) + .collect(); + let expected: Vec<(usize, u32)> = vec![(0, 1), (3, 3)]; + assert_eq!(actual, expected); + + // data w/ nulls, true-false-false-true-true mask + let actual: Vec<(usize, u32)> = MaskedIter::new( + IntoIterator::into_iter([0, 2, 4]), + IntoIterator::into_iter([1, 2, 3]), + Some(&[true, false, false, true, true]), + ) + .collect(); + let expected: Vec<(usize, u32)> = vec![(0, 1), (2, 3)]; + assert_eq!(actual, expected); + + // data w/ nulls, false-true-true-true-false mask + let actual: Vec<(usize, u32)> = MaskedIter::new( + IntoIterator::into_iter([0, 2, 4]), + IntoIterator::into_iter([1, 2, 3]), + Some(&[false, true, true, true, false]), + ) + .collect(); + let expected: Vec<(usize, u32)> = vec![(1, 2)]; + assert_eq!(actual, expected); + + // data w/ nulls, false-false-true-false-false mask + let actual: Vec<(usize, u32)> = MaskedIter::new( + IntoIterator::into_iter([0, 2, 4]), + IntoIterator::into_iter([1, 2, 3]), + Some(&[false, false, true, false, false]), + ) + .collect(); + let expected: Vec<(usize, u32)> = vec![(0, 2)]; + assert_eq!(actual, expected); + } +} diff --git a/server/src/db.rs b/server/src/db.rs index 20e2bf8fc0..d92bfeda21 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -1191,14 +1191,15 @@ impl Db { /// 2. the partition key /// 3. the table batch (which also contains the table name) /// - /// It shall return `true` if the batch should be stored and `false` otherwise. + /// It shall return `(true, _)` if the batch should be stored and `(false, _)` otherwise. In the first case the + /// second element in the tuple is a row-wise mask. If it is provided only rows marked with `true` are stored. pub fn store_sequenced_entry( &self, sequenced_entry: Arc, filter_table_batch: F, ) -> Result<()> where - F: Fn(Option<&Sequence>, &str, &TableBatch<'_>) -> bool, + F: Fn(Option<&Sequence>, &str, &TableBatch<'_>) -> (bool, Option>), { // Get all needed database rule values, then release the lock let rules = self.rules.read(); @@ -1242,7 +1243,9 @@ impl Db { None => continue, }; - if !filter_table_batch(sequence, partition_key, &table_batch) { + let (store_batch, mask) = + filter_table_batch(sequence, partition_key, &table_batch); + if !store_batch { continue; } @@ -1301,8 +1304,9 @@ impl Db { let mb_chunk = chunk.mutable_buffer().expect("cannot mutate open chunk"); - if let Err(e) = - mb_chunk.write_table_batch(table_batch).context(WriteEntry { + if let Err(e) = mb_chunk + .write_table_batch(table_batch, mask.as_ref().map(|x| x.as_ref())) + .context(WriteEntry { partition_key, chunk_id, }) @@ -1320,9 +1324,12 @@ impl Db { self.metric_attributes.clone(), ); - let chunk_result = - MBChunk::new(MutableBufferChunkMetrics::new(&metrics), table_batch) - .context(WriteEntryInitial { partition_key }); + let chunk_result = MBChunk::new( + MutableBufferChunkMetrics::new(&metrics), + table_batch, + mask.as_ref().map(|x| x.as_ref()), + ) + .context(WriteEntryInitial { partition_key }); match chunk_result { Ok(mb_chunk) => { @@ -1391,8 +1398,8 @@ fn filter_table_batch_keep_all( _sequence: Option<&Sequence>, _partition_key: &str, _batch: &TableBatch<'_>, -) -> bool { - true +) -> (bool, Option>) { + (true, None) } #[async_trait] diff --git a/server/src/db/catalog.rs b/server/src/db/catalog.rs index 232f8307db..59055cdcb8 100644 --- a/server/src/db/catalog.rs +++ b/server/src/db/catalog.rs @@ -343,6 +343,7 @@ mod tests { let mb_chunk = mutable_buffer::chunk::MBChunk::new( mutable_buffer::chunk::ChunkMetrics::new_unregistered(), batch, + None, ) .unwrap(); diff --git a/server/src/db/catalog/chunk.rs b/server/src/db/catalog/chunk.rs index 74dbba9820..7e224fa92c 100644 --- a/server/src/db/catalog/chunk.rs +++ b/server/src/db/catalog/chunk.rs @@ -1116,7 +1116,7 @@ mod tests { let write = entry.partition_writes().unwrap().remove(0); let batch = write.table_batches().remove(0); - MBChunk::new(MBChunkMetrics::new_unregistered(), batch).unwrap() + MBChunk::new(MBChunkMetrics::new_unregistered(), batch, None).unwrap() } async fn make_parquet_chunk(addr: ChunkAddr) -> ParquetChunk { diff --git a/server/src/db/replay.rs b/server/src/db/replay.rs index 28bff534f2..0f1434ab07 100644 --- a/server/src/db/replay.rs +++ b/server/src/db/replay.rs @@ -312,12 +312,12 @@ fn filter_entry( partition_key: &str, table_batch: &TableBatch<'_>, replay_plan: &ReplayPlan, -) -> bool { +) -> (bool, Option>) { let sequence = sequence.expect("write buffer results must be sequenced"); let table_name = table_batch.name(); // Check if we have a partition checkpoint that contains data for this specific sequencer - let flush_ts_and_sequence_range = replay_plan + let min_unpersisted_ts_and_sequence_range = replay_plan .last_partition_checkpoint(table_name, partition_key) .map(|partition_checkpoint| { partition_checkpoint @@ -326,21 +326,28 @@ fn filter_entry( }) .flatten(); - match flush_ts_and_sequence_range { - Some((_ts, min_max)) => { + match min_unpersisted_ts_and_sequence_range { + Some((min_unpersisted_ts, min_max)) => { // Figure out what the sequence number tells us about the entire batch match SequenceNumberSection::compare(sequence.number, min_max) { SequenceNumberSection::Persisted => { // skip the entire batch - false + (false, None) } SequenceNumberSection::PartiallyPersisted => { // TODO: implement row filtering, for now replay the entire batch - true + let maybe_mask = table_batch.timestamps().ok().map(|timestamps| { + let min_unpersisted_ts = min_unpersisted_ts.timestamp_nanos(); + timestamps + .into_iter() + .map(|ts_row| ts_row >= min_unpersisted_ts) + .collect::>() + }); + (true, maybe_mask) } SequenceNumberSection::Unpersisted => { // replay entire batch - true + (true, None) } } } @@ -350,7 +357,7 @@ fn filter_entry( // - Unknown sequencer (at least from the partitions point of view). // // => Replay full batch. - true + (true, None) } } } @@ -1760,6 +1767,115 @@ mod tests { .await } + #[tokio::test] + async fn replay_prune_rows() { + ReplayTest { + steps: vec![ + Step::Ingest(vec![ + TestSequencedEntry { + sequencer_id: 0, + sequence_number: 0, + lp: "table_1,tag_partition_by=a,tag=1 bar=10 10", + }, + ]), + Step::Await(vec![ + Check::Query( + "select * from table_1 order by bar", + vec![ + "+-----+-----+------------------+--------------------------------+", + "| bar | tag | tag_partition_by | time |", + "+-----+-----+------------------+--------------------------------+", + "| 10 | 1 | a | 1970-01-01T00:00:00.000000010Z |", + "+-----+-----+------------------+--------------------------------+", + ], + ), + ]), + Step::MakeWritesPersistable, + Step::Ingest(vec![ + TestSequencedEntry { + sequencer_id: 0, + sequence_number: 1, + // same time as first entry in that partition but different tag + some later data + lp: "table_1,tag_partition_by=a,tag=2 bar=20 10\ntable_1,tag_partition_by=a,tag=3 bar=30 30", + }, + ]), + Step::Await(vec![ + Check::Query( + "select * from table_1 order by bar", + vec![ + "+-----+-----+------------------+--------------------------------+", + "| bar | tag | tag_partition_by | time |", + "+-----+-----+------------------+--------------------------------+", + "| 10 | 1 | a | 1970-01-01T00:00:00.000000010Z |", + "| 20 | 2 | a | 1970-01-01T00:00:00.000000010Z |", + "| 30 | 3 | a | 1970-01-01T00:00:00.000000030Z |", + "+-----+-----+------------------+--------------------------------+", + ], + ), + ]), + Step::Persist(vec![("table_1", "tag_partition_by_a")]), + Step::Assert(vec![ + // chunks do not overlap + Check::Query( + "select storage, min_value, max_value, row_count from system.chunk_columns where column_name = 'time' order by min_value, storage", + vec![ + "+--------------------------+-----------+-----------+-----------+", + "| storage | min_value | max_value | row_count |", + "+--------------------------+-----------+-----------+-----------+", + "| ReadBufferAndObjectStore | 10 | 10 | 2 |", + "| ReadBuffer | 30 | 30 | 1 |", + "+--------------------------+-----------+-----------+-----------+", + ], + ), + ]), + Step::Restart, + Step::Assert(vec![ + Check::Query( + "select * from table_1 order by bar", + vec![ + "+-----+-----+------------------+--------------------------------+", + "| bar | tag | tag_partition_by | time |", + "+-----+-----+------------------+--------------------------------+", + "| 10 | 1 | a | 1970-01-01T00:00:00.000000010Z |", + "| 20 | 2 | a | 1970-01-01T00:00:00.000000010Z |", + "+-----+-----+------------------+--------------------------------+", + ], + ), + ]), + Step::Replay, + Step::Assert(vec![ + Check::Query( + "select * from table_1 order by bar", + vec![ + "+-----+-----+------------------+--------------------------------+", + "| bar | tag | tag_partition_by | time |", + "+-----+-----+------------------+--------------------------------+", + "| 10 | 1 | a | 1970-01-01T00:00:00.000000010Z |", + "| 20 | 2 | a | 1970-01-01T00:00:00.000000010Z |", + "| 30 | 3 | a | 1970-01-01T00:00:00.000000030Z |", + "+-----+-----+------------------+--------------------------------+", + ], + ), + // chunks do not overlap + Check::Query( + "select storage, min_value, max_value, row_count from system.chunk_columns where column_name = 'time' order by min_value, storage", + vec![ + "+-------------------+-----------+-----------+-----------+", + "| storage | min_value | max_value | row_count |", + "+-------------------+-----------+-----------+-----------+", + "| ObjectStoreOnly | 10 | 10 | 2 |", + "| OpenMutableBuffer | 30 | 30 | 1 |", + "+-------------------+-----------+-----------+-----------+", + ], + ), + ]), + ], + ..Default::default() + } + .run() + .await + } + #[tokio::test] async fn replay_works_with_checkpoints_all_full_persisted_1() { ReplayTest { diff --git a/server_benchmarks/benches/snapshot.rs b/server_benchmarks/benches/snapshot.rs index 3c63ebd70b..2db4fbde56 100644 --- a/server_benchmarks/benches/snapshot.rs +++ b/server_benchmarks/benches/snapshot.rs @@ -23,11 +23,12 @@ fn chunk(count: usize) -> MBChunk { for batch in write.table_batches() { match chunk { Some(ref mut c) => { - c.write_table_batch(batch).unwrap(); + c.write_table_batch(batch, None).unwrap(); } None => { chunk = Some( - MBChunk::new(ChunkMetrics::new_unregistered(), batch).unwrap(), + MBChunk::new(ChunkMetrics::new_unregistered(), batch, None) + .unwrap(), ); } } diff --git a/server_benchmarks/benches/write.rs b/server_benchmarks/benches/write.rs index 51ec6b22f4..a9abf381b7 100644 --- a/server_benchmarks/benches/write.rs +++ b/server_benchmarks/benches/write.rs @@ -17,11 +17,12 @@ fn write_chunk(count: usize, entries: &[Entry]) { for batch in write.table_batches() { match chunk { Some(ref mut c) => { - c.write_table_batch(batch).unwrap(); + c.write_table_batch(batch, None).unwrap(); } None => { chunk = Some( - MBChunk::new(ChunkMetrics::new_unregistered(), batch).unwrap(), + MBChunk::new(ChunkMetrics::new_unregistered(), batch, None) + .unwrap(), ); } }