Merge branch 'main' into crepererum/issue1821-cleanup-lock

pull/24376/head
kodiakhq[bot] 2021-06-29 10:48:43 +00:00 committed by GitHub
commit eda9532eb2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 535 additions and 444 deletions

View File

@ -1,11 +1,15 @@
//! Contains a structure to map from strings to integer symbols based on
//! string interning.
use hashbrown::HashMap;
use std::convert::TryFrom;
use crate::string::PackedStringArray;
use arrow::array::{Array, ArrayDataBuilder, DictionaryArray};
use arrow::buffer::Buffer;
use arrow::datatypes::{DataType, Int32Type};
use hashbrown::HashMap;
use num_traits::{AsPrimitive, FromPrimitive, Zero};
use snafu::Snafu;
use std::convert::TryFrom;
use crate::string::PackedStringArray;
#[derive(Debug, Snafu)]
pub enum Error {
@ -45,6 +49,14 @@ impl<K: AsPrimitive<usize> + FromPrimitive + Zero> StringDictionary<K> {
Default::default()
}
pub fn with_capacity(keys: usize, values: usize) -> StringDictionary<K> {
Self {
hash: Default::default(),
dedup: HashMap::with_capacity_and_hasher(keys, ()),
storage: PackedStringArray::with_capacity(keys, values),
}
}
/// Returns the id corresponding to value, adding an entry for the
/// id if it is not yet present in the dictionary.
pub fn lookup_value_or_insert(&mut self, value: &str) -> K {
@ -109,6 +121,31 @@ fn hash_str(hasher: &ahash::RandomState, value: &str) -> u64 {
state.finish()
}
impl StringDictionary<i32> {
/// Convert to an arrow representation with the provided set of
/// keys and an optional null bitmask
pub fn to_arrow<I>(&self, keys: I, nulls: Option<Buffer>) -> DictionaryArray<Int32Type>
where
I: IntoIterator<Item = i32>,
I::IntoIter: ExactSizeIterator,
{
let keys = keys.into_iter();
let mut array_builder = ArrayDataBuilder::new(DataType::Dictionary(
Box::new(DataType::Int32),
Box::new(DataType::Utf8),
))
.len(keys.len())
.add_buffer(keys.collect())
.add_child_data(self.storage.to_arrow().data().clone());
if let Some(nulls) = nulls {
array_builder = array_builder.null_bit_buffer(nulls);
}
DictionaryArray::<Int32Type>::from(array_builder.build())
}
}
impl<K> TryFrom<PackedStringArray<K>> for StringDictionary<K>
where
K: AsPrimitive<usize> + FromPrimitive + Zero,
@ -155,9 +192,10 @@ where
#[cfg(test)]
mod test {
use super::*;
use std::convert::TryInto;
use super::*;
#[test]
fn test_dictionary() {
let mut dictionary = StringDictionary::<i32>::new();

View File

@ -3,6 +3,7 @@
pub mod bitset;
pub mod dictionary;
pub mod optimize;
pub mod string;
pub mod util;

247
arrow_util/src/optimize.rs Normal file
View File

@ -0,0 +1,247 @@
use std::collections::BTreeSet;
use std::sync::Arc;
use arrow::array::{Array, ArrayRef, DictionaryArray, StringArray};
use arrow::datatypes::{DataType, Int32Type};
use arrow::error::{ArrowError, Result};
use arrow::record_batch::RecordBatch;
use hashbrown::HashMap;
use crate::dictionary::StringDictionary;
/// Takes a record batch and returns a new record batch with dictionaries
/// optimized to contain no duplicate or unreferenced values
///
/// Where the input dictionaries are sorted, the output dictionaries will also be
pub fn optimize_dictionaries(batch: &RecordBatch) -> Result<RecordBatch> {
let schema = batch.schema();
let new_columns = batch
.columns()
.iter()
.zip(schema.fields())
.map(|(col, field)| match field.data_type() {
DataType::Dictionary(key, value) => optimize_dict_col(col, &key, &value),
_ => Ok(Arc::clone(col)),
})
.collect::<Result<Vec<_>>>()?;
RecordBatch::try_new(schema, new_columns)
}
/// Optimizes the dictionaries for a column
fn optimize_dict_col(
col: &ArrayRef,
key_type: &DataType,
value_type: &DataType,
) -> Result<ArrayRef> {
if key_type != &DataType::Int32 {
return Err(ArrowError::NotYetImplemented(format!(
"truncating non-Int32 dictionaries not supported: {}",
key_type
)));
}
if value_type != &DataType::Utf8 {
return Err(ArrowError::NotYetImplemented(format!(
"truncating non-string dictionaries not supported: {}",
value_type
)));
}
let col = col
.as_any()
.downcast_ref::<DictionaryArray<Int32Type>>()
.expect("unexpected datatype");
let keys = col.keys();
let values = col.values();
let values = values
.as_any()
.downcast_ref::<StringArray>()
.expect("unexpected datatype");
// The total length of the resulting values array
let mut values_len = 0_usize;
// Keys that appear in the values array
// Use a BTreeSet to preserve the order of the dictionary
let mut used_keys = BTreeSet::new();
for key in keys.iter().flatten() {
if used_keys.insert(key) {
values_len += values.value_length(key as usize) as usize;
}
}
// Then perform deduplication
let mut new_dictionary = StringDictionary::with_capacity(used_keys.len(), values_len);
let mut old_to_new_idx: HashMap<i32, i32> = HashMap::with_capacity(used_keys.len());
for key in used_keys {
let new_key = new_dictionary.lookup_value_or_insert(values.value(key as usize));
old_to_new_idx.insert(key, new_key);
}
let new_keys = keys.iter().map(|x| match x {
Some(x) => *old_to_new_idx.get(&x).expect("no mapping found"),
None => -1,
});
Ok(Arc::new(
new_dictionary.to_arrow(new_keys, keys.data().null_buffer().cloned()),
))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::assert_batches_eq;
use arrow::array::{ArrayDataBuilder, DictionaryArray, Float64Array, Int32Array, StringArray};
use arrow::compute::concat;
use std::iter::FromIterator;
#[test]
fn test_optimize() {
let values = StringArray::from(vec![
"duplicate",
"duplicate",
"foo",
"boo",
"unused",
"duplicate",
]);
let keys = Int32Array::from(vec![
Some(0),
Some(1),
None,
Some(1),
Some(2),
Some(5),
Some(3),
]);
let data = ArrayDataBuilder::new(DataType::Dictionary(
Box::new(DataType::Int32),
Box::new(DataType::Utf8),
))
.len(keys.len())
.add_buffer(keys.data().buffers()[0].clone())
.null_bit_buffer(keys.data().null_buffer().unwrap().clone())
.add_child_data(values.data().clone())
.build();
let batch = RecordBatch::try_from_iter(vec![(
"foo",
Arc::new(DictionaryArray::<Int32Type>::from(data)) as ArrayRef,
)])
.unwrap();
let optimized = optimize_dictionaries(&batch).unwrap();
let col = optimized
.column(0)
.as_any()
.downcast_ref::<DictionaryArray<Int32Type>>()
.unwrap();
let values = col.values();
let values = values.as_any().downcast_ref::<StringArray>().unwrap();
let values = values.iter().flatten().collect::<Vec<_>>();
assert_eq!(values, vec!["duplicate", "foo", "boo"]);
assert_batches_eq!(
vec![
"+-----------+",
"| foo |",
"+-----------+",
"| duplicate |",
"| duplicate |",
"| |",
"| duplicate |",
"| foo |",
"| duplicate |",
"| boo |",
"+-----------+",
],
&[optimized]
);
}
#[test]
fn test_concat() {
let f1_1 = Float64Array::from(vec![Some(1.0), Some(2.0), Some(3.0), Some(4.0)]);
let t2_1 = DictionaryArray::<Int32Type>::from_iter(vec![
Some("a"),
Some("g"),
Some("a"),
Some("b"),
]);
let t1_1 = DictionaryArray::<Int32Type>::from_iter(vec![
Some("a"),
Some("a"),
Some("b"),
Some("b"),
]);
let f1_2 = Float64Array::from(vec![Some(1.0), Some(5.0), Some(2.0), Some(46.0)]);
let t2_2 = DictionaryArray::<Int32Type>::from_iter(vec![
Some("a"),
Some("b"),
Some("a"),
Some("a"),
]);
let t1_2 = DictionaryArray::<Int32Type>::from_iter(vec![
Some("a"),
Some("d"),
Some("a"),
Some("b"),
]);
let concat = RecordBatch::try_from_iter(vec![
("f1", concat(&[&f1_1, &f1_2]).unwrap()),
("t2", concat(&[&t2_1, &t2_2]).unwrap()),
("t1", concat(&[&t1_1, &t1_2]).unwrap()),
])
.unwrap();
let optimized = optimize_dictionaries(&concat).unwrap();
let col = optimized
.column(optimized.schema().column_with_name("t2").unwrap().0)
.as_any()
.downcast_ref::<DictionaryArray<Int32Type>>()
.unwrap();
let values = col.values();
let values = values.as_any().downcast_ref::<StringArray>().unwrap();
let values = values.iter().flatten().collect::<Vec<_>>();
assert_eq!(values, vec!["a", "g", "b"]);
let col = optimized
.column(optimized.schema().column_with_name("t1").unwrap().0)
.as_any()
.downcast_ref::<DictionaryArray<Int32Type>>()
.unwrap();
let values = col.values();
let values = values.as_any().downcast_ref::<StringArray>().unwrap();
let values = values.iter().flatten().collect::<Vec<_>>();
assert_eq!(values, vec!["a", "b", "d"]);
assert_batches_eq!(
vec![
"+----+----+----+",
"| f1 | t2 | t1 |",
"+----+----+----+",
"| 1 | a | a |",
"| 2 | g | a |",
"| 3 | a | b |",
"| 4 | b | b |",
"| 1 | a | a |",
"| 5 | b | d |",
"| 2 | a | a |",
"| 46 | a | b |",
"+----+----+----+",
],
&[optimized]
);
}
}

View File

@ -37,6 +37,16 @@ impl<K: AsPrimitive<usize> + FromPrimitive + Zero> PackedStringArray<K> {
}
}
pub fn with_capacity(keys: usize, values: usize) -> Self {
let mut offsets = Vec::with_capacity(keys + 1);
offsets.push(K::zero());
Self {
offsets,
storage: String::with_capacity(values),
}
}
/// Append a value
///
/// Returns the index of the appended data

View File

@ -1241,7 +1241,10 @@ pub enum SequencedEntryError {
#[derive(Debug)]
pub struct SequencedEntry {
entry: Entry,
sequence: Sequence,
/// The (optional) sequence for this entry. At the time of
/// writing, sequences will not be present when there is no
/// configured mechanism to define the order of all writes.
sequence: Option<Sequence>,
}
#[derive(Debug, Copy, Clone)]
@ -1250,6 +1253,15 @@ pub struct Sequence {
pub number: u64,
}
impl Sequence {
pub fn new(sequencer_id: u32, sequence_number: u64) -> Self {
Self {
id: sequencer_id,
number: sequence_number,
}
}
}
impl SequencedEntry {
pub fn new_from_process_clock(
process_clock: ClockValue,
@ -1258,10 +1270,10 @@ impl SequencedEntry {
) -> Result<Self, SequencedEntryError> {
Ok(Self {
entry,
sequence: Sequence {
sequence: Some(Sequence {
id: server_id.get_u32(),
number: process_clock.get_u64(),
},
}),
})
}
@ -1269,15 +1281,21 @@ impl SequencedEntry {
sequence: Sequence,
entry: Entry,
) -> Result<Self, SequencedEntryError> {
let sequence = Some(sequence);
Ok(Self { entry, sequence })
}
pub fn new_unsequenced(entry: Entry) -> Self {
let sequence = None;
Self { entry, sequence }
}
pub fn partition_writes(&self) -> Option<Vec<PartitionWrite<'_>>> {
self.entry.partition_writes()
}
pub fn sequence(&self) -> &Sequence {
&self.sequence
pub fn sequence(&self) -> Option<&Sequence> {
self.sequence.as_ref()
}
}

View File

@ -6,7 +6,7 @@ use parking_lot::Mutex;
use snafu::{ensure, OptionExt, ResultExt, Snafu};
use data_types::partition_metadata::{ColumnSummary, InfluxDbType, TableSummary};
use entry::TableBatch;
use entry::{Sequence, TableBatch};
use internal_types::{
schema::{builder::SchemaBuilder, InfluxColumnType, Schema},
selection::Selection,
@ -108,8 +108,7 @@ impl MBChunk {
/// Panics if the batch specifies a different name for the table in this Chunk
pub fn write_table_batch(
&mut self,
sequencer_id: u32,
sequence_number: u64,
sequence: Option<&Sequence>,
batch: TableBatch<'_>,
) -> Result<()> {
let table_name = batch.name();
@ -120,7 +119,7 @@ impl MBChunk {
);
let columns = batch.columns();
self.write_columns(sequencer_id, sequence_number, columns)?;
self.write_columns(sequence, columns)?;
// Invalidate chunk snapshot
*self
@ -275,8 +274,7 @@ impl MBChunk {
/// the associated columns in the table and updates summary statistics.
pub fn write_columns(
&mut self,
_sequencer_id: u32,
_sequence_number: u64,
_sequence: Option<&Sequence>,
columns: Vec<entry::Column<'_>>,
) -> Result<()> {
let row_count_before_insert = self.rows();
@ -357,7 +355,8 @@ pub mod test_helpers {
);
for batch in table_batches {
chunk.write_table_batch(1, 5, batch)?;
let seq = Some(Sequence::new(1, 5));
chunk.write_table_batch(seq.as_ref(), batch)?;
}
}
@ -598,13 +597,13 @@ mod tests {
let mut table = MBChunk::new("table_name", ChunkMetrics::new_unregistered());
let sequencer_id = 1;
let sequence_number = 5;
let sequence = Some(Sequence::new(sequencer_id, sequence_number));
let lp = "foo,t1=asdf iv=1i,uv=1u,fv=1.0,bv=true,sv=\"hi\" 1";
let entry = lp_to_entry(&lp);
table
.write_columns(
sequencer_id,
sequence_number,
sequence.as_ref(),
entry
.partition_writes()
.unwrap()
@ -621,8 +620,7 @@ mod tests {
let entry = lp_to_entry(&lp);
let response = table
.write_columns(
sequencer_id,
sequence_number,
sequence.as_ref(),
entry
.partition_writes()
.unwrap()
@ -654,8 +652,7 @@ mod tests {
let entry = lp_to_entry(&lp);
let response = table
.write_columns(
sequencer_id,
sequence_number,
sequence.as_ref(),
entry
.partition_writes()
.unwrap()
@ -687,8 +684,7 @@ mod tests {
let entry = lp_to_entry(&lp);
let response = table
.write_columns(
sequencer_id,
sequence_number,
sequence.as_ref(),
entry
.partition_writes()
.unwrap()
@ -720,8 +716,7 @@ mod tests {
let entry = lp_to_entry(&lp);
let response = table
.write_columns(
sequencer_id,
sequence_number,
sequence.as_ref(),
entry
.partition_writes()
.unwrap()
@ -753,8 +748,7 @@ mod tests {
let entry = lp_to_entry(&lp);
let response = table
.write_columns(
sequencer_id,
sequence_number,
sequence.as_ref(),
entry
.partition_writes()
.unwrap()
@ -786,8 +780,7 @@ mod tests {
let entry = lp_to_entry(&lp);
let response = table
.write_columns(
sequencer_id,
sequence_number,
sequence.as_ref(),
entry
.partition_writes()
.unwrap()
@ -821,6 +814,7 @@ mod tests {
let lp_data = lp_lines.join("\n");
let entry = lp_to_entry(&lp_data);
let sequence = Some(Sequence::new(1, 5));
for batch in entry
.partition_writes()
.unwrap()
@ -828,7 +822,9 @@ mod tests {
.unwrap()
.table_batches()
{
table.write_columns(1, 5, batch.columns()).unwrap();
table
.write_columns(sequence.as_ref(), batch.columns())
.unwrap();
}
}
}

View File

@ -1,12 +1,13 @@
use std::convert::TryInto;
use std::mem;
use std::sync::Arc;
use arrow::{
array::{
Array, ArrayDataBuilder, ArrayRef, BooleanArray, DictionaryArray, Float64Array, Int64Array,
ArrayDataBuilder, ArrayRef, BooleanArray, Float64Array, Int64Array,
TimestampNanosecondArray, UInt64Array,
},
datatypes::{DataType, Int32Type},
datatypes::DataType,
};
use snafu::{ensure, Snafu};
@ -17,7 +18,6 @@ use entry::Column as EntryColumn;
use internal_types::schema::{InfluxColumnType, InfluxFieldType, TIME_DATA_TYPE};
use crate::dictionary::{Dictionary, DID, INVALID_DID};
use std::convert::TryInto;
#[derive(Debug, Snafu)]
#[allow(missing_copy_implementations)]
@ -345,21 +345,7 @@ impl Column {
Arc::new(BooleanArray::from(data))
}
ColumnData::Tag(data, dictionary, _) => {
let dictionary = dictionary.values().to_arrow();
let data = ArrayDataBuilder::new(DataType::Dictionary(
Box::new(DataType::Int32),
Box::new(DataType::Utf8),
))
.len(data.len())
.add_buffer(data.iter().cloned().collect())
.null_bit_buffer(nulls)
.add_child_data(dictionary.data().clone())
.build();
let array = DictionaryArray::<Int32Type>::from(data);
Arc::new(array)
Arc::new(dictionary.to_arrow(data.iter().cloned(), Some(nulls)))
}
};

View File

@ -58,7 +58,7 @@ impl PersistenceWindows {
/// is triggered (either by crossing a row count threshold or time).
pub fn add_range(
&mut self,
sequence: &Sequence,
sequence: Option<&Sequence>,
row_count: usize,
min_time: DateTime<Utc>,
max_time: DateTime<Utc>,
@ -80,6 +80,16 @@ impl PersistenceWindows {
};
}
/// Returns the min time of the open persistence window, if any
pub fn open_min_time(&self) -> Option<DateTime<Utc>> {
self.open.as_ref().map(|open| open.min_time)
}
/// Returns the max time of the open persistence window, if any
pub fn open_max_time(&self) -> Option<DateTime<Utc>> {
self.open.as_ref().map(|open| open.max_time)
}
/// Returns the number of rows that are persistable. These rows could be duplicates and there
/// are other rows that may fall in closed and open that would be pulled into a persistence
/// operation. This number is used to determine if persistence should be triggered, not as
@ -190,19 +200,21 @@ pub struct MinMaxSequence {
impl Window {
fn new(
created_at: Instant,
sequence: &Sequence,
sequence: Option<&Sequence>,
row_count: usize,
min_time: DateTime<Utc>,
max_time: DateTime<Utc>,
) -> Self {
let mut sequencer_numbers = BTreeMap::new();
sequencer_numbers.insert(
sequence.id,
MinMaxSequence {
min: sequence.number,
max: sequence.number,
},
);
if let Some(sequence) = sequence {
sequencer_numbers.insert(
sequence.id,
MinMaxSequence {
min: sequence.number,
max: sequence.number,
},
);
}
Self {
created_at,
@ -217,7 +229,7 @@ impl Window {
/// are always increasing.
fn add_range(
&mut self,
sequence: &Sequence,
sequence: Option<&Sequence>,
row_count: usize,
min_time: DateTime<Utc>,
max_time: DateTime<Utc>,
@ -229,19 +241,21 @@ impl Window {
if self.max_time < max_time {
self.max_time = max_time;
}
match self.sequencer_numbers.get_mut(&sequence.id) {
Some(n) => {
assert!(sequence.number > n.max);
n.max = sequence.number;
}
None => {
self.sequencer_numbers.insert(
sequence.id,
MinMaxSequence {
min: sequence.number,
max: sequence.number,
},
);
if let Some(sequence) = sequence {
match self.sequencer_numbers.get_mut(&sequence.id) {
Some(n) => {
assert!(sequence.number > n.max);
n.max = sequence.number;
}
None => {
self.sequencer_numbers.insert(
sequence.id,
MinMaxSequence {
min: sequence.number,
max: sequence.number,
},
);
}
}
}
}
@ -280,16 +294,22 @@ mod tests {
let i = Instant::now();
let start_time = Utc::now();
w.add_range(&Sequence { id: 1, number: 2 }, 1, start_time, Utc::now(), i);
w.add_range(
&Sequence { id: 1, number: 4 },
Some(&Sequence { id: 1, number: 2 }),
1,
start_time,
Utc::now(),
i,
);
w.add_range(
Some(&Sequence { id: 1, number: 4 }),
2,
Utc::now(),
Utc::now(),
Instant::now(),
);
w.add_range(
&Sequence { id: 1, number: 10 },
Some(&Sequence { id: 1, number: 10 }),
1,
Utc::now(),
Utc::now(),
@ -297,7 +317,7 @@ mod tests {
);
let last_time = Utc::now();
w.add_range(
&Sequence { id: 2, number: 23 },
Some(&Sequence { id: 2, number: 23 }),
10,
Utc::now(),
last_time,
@ -330,14 +350,14 @@ mod tests {
let last_time = Utc::now();
w.add_range(
&Sequence { id: 1, number: 2 },
Some(&Sequence { id: 1, number: 2 }),
1,
start_time,
start_time,
created_at,
);
w.add_range(
&Sequence { id: 1, number: 3 },
Some(&Sequence { id: 1, number: 3 }),
1,
last_time,
last_time,
@ -348,7 +368,7 @@ mod tests {
.unwrap();
let open_time = Utc::now();
w.add_range(
&Sequence { id: 1, number: 6 },
Some(&Sequence { id: 1, number: 6 }),
2,
last_time,
open_time,
@ -384,7 +404,7 @@ mod tests {
let first_end = Utc::now();
w.add_range(
&Sequence { id: 1, number: 2 },
Some(&Sequence { id: 1, number: 2 }),
2,
start_time,
first_end,
@ -396,7 +416,7 @@ mod tests {
.unwrap();
let second_end = Utc::now();
w.add_range(
&Sequence { id: 1, number: 3 },
Some(&Sequence { id: 1, number: 3 }),
3,
first_end,
second_end,
@ -408,7 +428,7 @@ mod tests {
.unwrap();
let third_end = Utc::now();
w.add_range(
&Sequence { id: 1, number: 4 },
Some(&Sequence { id: 1, number: 4 }),
4,
second_end,
third_end,
@ -440,7 +460,7 @@ mod tests {
.unwrap();
let fourth_end = Utc::now();
w.add_range(
&Sequence { id: 1, number: 5 },
Some(&Sequence { id: 1, number: 5 }),
1,
fourth_end,
fourth_end,
@ -468,7 +488,7 @@ mod tests {
.checked_add(DEFAULT_CLOSED_WINDOW_PERIOD * 100)
.unwrap();
w.add_range(
&Sequence { id: 1, number: 9 },
Some(&Sequence { id: 1, number: 9 }),
2,
Utc::now(),
Utc::now(),
@ -512,21 +532,21 @@ mod tests {
let third_end = third_start + chrono::Duration::seconds(1);
w.add_range(
&Sequence { id: 1, number: 2 },
Some(&Sequence { id: 1, number: 2 }),
2,
start_time,
first_end,
created_at,
);
w.add_range(
&Sequence { id: 1, number: 3 },
Some(&Sequence { id: 1, number: 3 }),
3,
second_start,
second_end,
second_created_at,
);
w.add_range(
&Sequence { id: 1, number: 5 },
Some(&Sequence { id: 1, number: 5 }),
2,
third_start,
third_end,
@ -590,21 +610,21 @@ mod tests {
let third_end = third_start + chrono::Duration::seconds(1);
w.add_range(
&Sequence { id: 1, number: 2 },
Some(&Sequence { id: 1, number: 2 }),
2,
start_time,
first_end,
created_at,
);
w.add_range(
&Sequence { id: 1, number: 3 },
Some(&Sequence { id: 1, number: 3 }),
3,
second_start,
second_end,
second_created_at,
);
w.add_range(
&Sequence { id: 1, number: 5 },
Some(&Sequence { id: 1, number: 5 }),
2,
third_start,
third_end,
@ -666,21 +686,21 @@ mod tests {
let third_end = second_end + chrono::Duration::seconds(1);
w.add_range(
&Sequence { id: 1, number: 2 },
Some(&Sequence { id: 1, number: 2 }),
2,
start_time,
first_end,
created_at,
);
w.add_range(
&Sequence { id: 1, number: 3 },
Some(&Sequence { id: 1, number: 3 }),
3,
first_end,
second_end,
second_created_at,
);
w.add_range(
&Sequence { id: 1, number: 5 },
Some(&Sequence { id: 1, number: 5 }),
2,
third_start,
third_end,
@ -744,21 +764,21 @@ mod tests {
let third_end = second_end + chrono::Duration::seconds(1);
w.add_range(
&Sequence { id: 1, number: 2 },
Some(&Sequence { id: 1, number: 2 }),
2,
start_time,
first_end,
created_at,
);
w.add_range(
&Sequence { id: 1, number: 3 },
Some(&Sequence { id: 1, number: 3 }),
3,
second_start,
second_end,
second_created_at,
);
w.add_range(
&Sequence { id: 1, number: 5 },
Some(&Sequence { id: 1, number: 5 }),
2,
third_start,
third_end,

View File

@ -1,9 +1,5 @@
//! Contains code to rebuild a catalog from files.
use std::{
collections::{hash_map::Entry, HashMap},
fmt::Debug,
sync::Arc,
};
use std::{fmt::Debug, sync::Arc};
use data_types::server_id::ServerId;
use futures::TryStreamExt;
@ -13,11 +9,10 @@ use object_store::{
};
use observability_deps::tracing::error;
use snafu::{ResultExt, Snafu};
use uuid::Uuid;
use crate::{
catalog::{CatalogParquetInfo, CatalogState, CheckpointData, PreservedCatalog},
metadata::{IoxMetadata, IoxParquetMetaData},
catalog::{CatalogParquetInfo, CatalogState, PreservedCatalog},
metadata::IoxParquetMetaData,
};
#[derive(Debug, Snafu)]
pub enum Error {
@ -33,24 +28,6 @@ pub enum Error {
path: Path,
},
#[snafu(display(
"Found multiple transaction for revision {}: {} and {}",
revision_counter,
uuid1,
uuid2
))]
MultipleTransactionsFailure {
revision_counter: u64,
uuid1: Uuid,
uuid2: Uuid,
},
#[snafu(display(
"Internal error: Revision cannot be zero (this transaction is always empty): {:?}",
path
))]
RevisionZeroFailure { path: Path },
#[snafu(display("Cannot add file to transaction: {}", source))]
FileRecordFailure { source: crate::catalog::Error },
@ -67,32 +44,25 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Users are required to [wipe](PreservedCatalog::wipe) the existing catalog before running this
/// procedure (**after creating a backup!**).
///
/// This will create a catalog checkpoint for the very last transaction.
///
/// # Limitations
/// Compared to an intact catalog, wiping a catalog and rebuilding it from Parquet files has the following drawbacks:
///
/// - **Garbage Susceptibility:** The rebuild process will stumble over garbage parquet files (i.e. files being present
/// in the object store but that were not part of the catalog). For files that where not written by IOx it will likely
/// report [`Error::MetadataReadFailure`]. For files that are left-overs from previous transactions it will likely
/// report [`Error::MultipleTransactionsFailure`]. Crafted files (i.e. files with the correct metadata and matching
/// transaction UUIDs) will blindly be included into the new catalog, because we have no way to distinguish them from
/// the actual catalog content.
/// report [`Error::MetadataReadFailure`].
/// - **No Removals:** The rebuild system cannot recover the fact that files where removed from the catalog during some
/// transaction. This might not always be an issue due to "deduplicate while read"-logic in the query engine, but also
/// might have unwanted side effects (e.g. performance issues).
/// - **Single Transaction:** All files are added in a single transaction. Hence time-traveling is NOT possible using
/// the resulting catalog.
/// - **Fork Detection:** The rebuild procedure does NOT detects forks (i.e. files written for the same server ID by
/// multiple IOx instances).
///
/// # Error Handling
/// This routine will fail if:
///
/// - **Metadata Read Failure:** There is a parquet file with metadata that cannot be read. Set
/// `ignore_metadata_read_failure` to `true` to ignore these cases.
/// - **Parquet With Revision Zero:** One of the parquet files reports it belongs to revision `0`. This should never
/// happen since the first transaction is always an empty one. This was likely causes by a bug or a file created by
/// 3rd party tooling.
/// - **Multiple Transactions:** If there are multiple transaction with the same revision but different UUIDs, this
/// routine cannot reconstruct a single linear revision history. Make sure to
// [clean up](crate::cleanup::cleanup_unreferenced_parquet_files) regularly to avoid this case.
pub async fn rebuild_catalog<S>(
object_store: Arc<ObjectStore>,
search_location: &Path,
@ -105,8 +75,7 @@ where
S: CatalogState + Debug + Send + Sync,
{
// collect all revisions from parquet files
let mut revisions =
collect_revisions(&object_store, search_location, ignore_metadata_read_failure).await?;
let files = collect_files(&object_store, search_location, ignore_metadata_read_failure).await?;
// create new empty catalog
let (catalog, mut state) = PreservedCatalog::new_empty::<S>(
@ -118,56 +87,25 @@ where
.await
.context(NewEmptyFailure)?;
// trace all files for final checkpoint
let mut collected_files = HashMap::new();
// simulate all transactions
if let Some(max_revision) = revisions.keys().max().cloned() {
for revision_counter in 1..=max_revision {
assert_eq!(
catalog.revision_counter() + 1,
revision_counter,
"revision counter during transaction simulation out-of-sync"
);
if let Some((uuid, entries)) = revisions.remove(&revision_counter) {
// we have files for this particular transaction
let mut transaction = catalog.open_transaction_with_uuid(uuid).await;
for (path, metadata) in entries {
let path: DirsAndFileName = path.clone().into();
state
.add(
Arc::clone(&object_store),
CatalogParquetInfo {
path: path.clone(),
metadata: Arc::new(metadata.clone()),
},
)
.context(FileRecordFailure)?;
transaction
.add_parquet(&path, &metadata)
.context(FileRecordFailure)?;
collected_files.insert(path, Arc::new(metadata));
}
let ckpt_handle = transaction.commit().await.context(CommitFailure)?;
if revision_counter == max_revision {
ckpt_handle
.create_checkpoint(CheckpointData {
files: collected_files.clone(),
})
.await
.context(CommitFailure)?;
}
} else {
// we do not have any files for this transaction (there might have been other actions though or it was
// an empty transaction) => create new empty transaction
// Note that this can never be the last transaction, so we don't need to create a checkpoint here.
let transaction = catalog.open_transaction().await;
transaction.commit().await.context(CheckpointFailure)?;
}
// create single transaction with all files
if !files.is_empty() {
let mut transaction = catalog.open_transaction().await;
for (path, parquet_md) in files {
let path: DirsAndFileName = path.into();
state
.add(
Arc::clone(&object_store),
CatalogParquetInfo {
path: path.clone(),
metadata: Arc::new(parquet_md.clone()),
},
)
.context(FileRecordFailure)?;
transaction
.add_parquet(&path, &parquet_md)
.context(FileRecordFailure)?;
}
transaction.commit().await.context(CheckpointFailure)?;
}
Ok((catalog, state))
@ -175,70 +113,37 @@ where
/// Collect all files under the given locations.
///
/// Returns a map of revisions to their UUIDs and a vector of file-metadata tuples.
/// Returns a vector of file-metadata tuples.
///
/// The file listing is recursive.
async fn collect_revisions(
async fn collect_files(
object_store: &ObjectStore,
search_location: &Path,
ignore_metadata_read_failure: bool,
) -> Result<HashMap<u64, (Uuid, Vec<(Path, IoxParquetMetaData)>)>> {
) -> Result<Vec<(Path, IoxParquetMetaData)>> {
let mut stream = object_store
.list(Some(search_location))
.await
.context(ReadFailure)?;
// revision -> (uuid, [file])
let mut revisions: HashMap<u64, (Uuid, Vec<(Path, IoxParquetMetaData)>)> = HashMap::new();
let mut files: Vec<(Path, IoxParquetMetaData)> = vec![];
while let Some(paths) = stream.try_next().await.context(ReadFailure)? {
for path in paths.into_iter().filter(is_parquet) {
let (iox_md, parquet_md) = match read_parquet(object_store, &path).await {
Ok(res) => res,
match read_parquet(object_store, &path).await {
Ok(parquet_md) => {
files.push((path, parquet_md));
}
Err(e @ Error::MetadataReadFailure { .. }) if ignore_metadata_read_failure => {
error!("error while reading metdata from parquet, ignoring: {}", e);
continue;
}
Err(e) => return Err(e),
};
// revision 0 can never occur because it is always empty
if iox_md.transaction_revision_counter == 0 {
return Err(Error::RevisionZeroFailure { path });
}
match revisions.entry(iox_md.transaction_revision_counter) {
Entry::Vacant(v) => {
// revision not known yet => create it
v.insert((iox_md.transaction_uuid, vec![(path, parquet_md)]));
}
Entry::Occupied(mut o) => {
// already exist => check UUID
let (uuid, entries) = o.get_mut();
if *uuid != iox_md.transaction_uuid {
// found multiple transactions for this revision => cannot rebuild cleanly
// sort UUIDs for deterministic error messages
let (uuid1, uuid2) = if *uuid < iox_md.transaction_uuid {
(*uuid, iox_md.transaction_uuid)
} else {
(iox_md.transaction_uuid, *uuid)
};
return Err(Error::MultipleTransactionsFailure {
revision_counter: iox_md.transaction_revision_counter,
uuid1,
uuid2,
});
}
entries.push((path, parquet_md));
}
}
}
}
Ok(revisions)
Ok(files)
}
/// Checks if the given path is (likely) a parquet file.
@ -252,10 +157,7 @@ fn is_parquet(path: &Path) -> bool {
}
/// Read Parquet and IOx metadata from given path.
async fn read_parquet(
object_store: &ObjectStore,
path: &Path,
) -> Result<(IoxMetadata, IoxParquetMetaData)> {
async fn read_parquet(object_store: &ObjectStore, path: &Path) -> Result<IoxParquetMetaData> {
let data = object_store
.get(path)
.await
@ -267,10 +169,13 @@ async fn read_parquet(
let parquet_metadata = IoxParquetMetaData::from_file_bytes(data)
.context(MetadataReadFailure { path: path.clone() })?;
let iox_metadata = parquet_metadata
// validate IOxMetadata
parquet_metadata
.read_iox_metadata()
.context(MetadataReadFailure { path: path.clone() })?;
Ok((iox_metadata, parquet_metadata))
Ok(parquet_metadata)
}
#[cfg(test)]
@ -280,10 +185,12 @@ mod tests {
use datafusion_util::MemoryStream;
use parquet::arrow::ArrowWriter;
use tokio_stream::StreamExt;
use uuid::Uuid;
use super::*;
use std::num::NonZeroU32;
use crate::metadata::IoxMetadata;
use crate::{catalog::test_helpers::TestCatalogState, storage::MemWriter};
use crate::{
catalog::PreservedCatalog,
@ -397,7 +304,7 @@ mod tests {
tmp
};
assert_eq!(paths_actual, paths_expected);
assert_eq!(catalog.revision_counter(), 3);
assert_eq!(catalog.revision_counter(), 1);
}
#[tokio::test]
@ -440,111 +347,6 @@ mod tests {
assert_eq!(catalog.revision_counter(), 0);
}
#[tokio::test]
async fn test_rebuild_fail_transaction_zero() {
let object_store = make_object_store();
let server_id = make_server_id();
let db_name = "db1";
// build catalog with same data
let catalog = PreservedCatalog::new_empty::<TestCatalogState>(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
(),
)
.await
.unwrap();
// file with illegal revision counter (zero is always an empty transaction)
create_parquet_file(&object_store, server_id, db_name, 0, Uuid::new_v4(), 0).await;
// wipe catalog
drop(catalog);
PreservedCatalog::wipe(&object_store, server_id, db_name)
.await
.unwrap();
// rebuild
let path = object_store.new_path();
let res = rebuild_catalog::<TestCatalogState>(
object_store,
&path,
server_id,
db_name.to_string(),
(),
false,
)
.await;
assert!(dbg!(res.unwrap_err().to_string()).starts_with(
"Internal error: Revision cannot be zero (this transaction is always empty):"
));
}
#[tokio::test]
async fn test_rebuild_fail_duplicate_transaction_uuid() {
let object_store = make_object_store();
let server_id = make_server_id();
let db_name = "db1";
// build catalog with same data
let (catalog, _state) = PreservedCatalog::new_empty::<TestCatalogState>(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
(),
)
.await
.unwrap();
{
let mut transaction = catalog.open_transaction().await;
let (path, md) = create_parquet_file(
&object_store,
server_id,
db_name,
transaction.revision_counter(),
transaction.uuid(),
0,
)
.await;
transaction.add_parquet(&path, &md).unwrap();
// create parquet file with wrong UUID
create_parquet_file(
&object_store,
server_id,
db_name,
transaction.revision_counter(),
Uuid::new_v4(),
1,
)
.await;
transaction.commit().await.unwrap();
}
// wipe catalog
drop(catalog);
PreservedCatalog::wipe(&object_store, server_id, db_name)
.await
.unwrap();
// rebuild
let path = object_store.new_path();
let res = rebuild_catalog::<TestCatalogState>(
object_store,
&path,
server_id,
db_name.to_string(),
(),
false,
)
.await;
assert!(dbg!(res.unwrap_err().to_string())
.starts_with("Found multiple transaction for revision 1:"));
}
#[tokio::test]
async fn test_rebuild_no_metadata() {
let object_store = make_object_store();
@ -600,13 +402,20 @@ mod tests {
}
#[tokio::test]
async fn test_rebuild_creates_file_checkpoint() {
async fn test_rebuild_creates_no_checkpoint() {
// the rebuild method will create a catalog with the following transactions:
// 1. an empty one (done by `PreservedCatalog::new_empty`)
// 2. an "add all the files"
//
// There is no real need to create a checkpoint in this case. So here we delete all transaction files and then
// check that rebuilt catalog will be gone afterwards. Note the difference to the `test_rebuild_empty` case
// where we can indeed proof the existence of a catalog (even though it is empty aka has no files).
let object_store = make_object_store();
let server_id = make_server_id();
let db_name = "db1";
// build catalog with some data (2 transactions + initial empty one)
let (catalog, mut state) = PreservedCatalog::new_empty::<TestCatalogState>(
let (catalog, _state) = PreservedCatalog::new_empty::<TestCatalogState>(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
@ -614,52 +423,7 @@ mod tests {
)
.await
.unwrap();
{
let mut transaction = catalog.open_transaction().await;
let (path, md) = create_parquet_file(
&object_store,
server_id,
db_name,
transaction.revision_counter(),
transaction.uuid(),
0,
)
.await;
state
.parquet_files
.insert(path.clone(), Arc::new(md.clone()));
transaction.add_parquet(&path, &md).unwrap();
transaction.commit().await.unwrap();
}
{
let mut transaction = catalog.open_transaction().await;
let (path, md) = create_parquet_file(
&object_store,
server_id,
db_name,
transaction.revision_counter(),
transaction.uuid(),
2,
)
.await;
state
.parquet_files
.insert(path.clone(), Arc::new(md.clone()));
transaction.add_parquet(&path, &md).unwrap();
transaction.commit().await.unwrap();
}
assert_eq!(catalog.revision_counter(), 2);
// store catalog state
let paths_expected = {
let mut tmp: Vec<_> = state.parquet_files.keys().cloned().collect();
tmp.sort();
tmp
};
assert_eq!(catalog.revision_counter(), 0);
// wipe catalog
drop(catalog);
@ -702,25 +466,12 @@ mod tests {
}
assert!(deleted);
// load catalog
let (catalog, state) = PreservedCatalog::load::<TestCatalogState>(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
(),
)
.await
.unwrap()
.unwrap();
// check match
let paths_actual = {
let mut tmp: Vec<_> = state.parquet_files.keys().cloned().collect();
tmp.sort();
tmp
};
assert_eq!(paths_actual, paths_expected);
assert_eq!(catalog.revision_counter(), 2);
// catalog gone
assert!(
!PreservedCatalog::exists(&object_store, server_id, db_name,)
.await
.unwrap()
);
}
/// Creates new test server ID

View File

@ -851,16 +851,9 @@ impl Db {
DatabaseNotWriteable {}.fail()
}
(None, false) => {
// If no write buffer is configured, use the process clock and send to the mutable
// buffer.
let sequenced_entry = Arc::new(
SequencedEntry::new_from_process_clock(
self.process_clock.next(),
self.server_id,
entry,
)
.context(SequencedEntryError)?,
);
// If no write buffer is configured, nothing is
// sequencing entries so skip doing so here
let sequenced_entry = Arc::new(SequencedEntry::new_unsequenced(entry));
self.store_sequenced_entry(sequenced_entry)
}
@ -891,6 +884,8 @@ impl Db {
}
if let Some(partitioned_writes) = sequenced_entry.partition_writes() {
let sequence = sequenced_entry.as_ref().sequence();
// Protect against DoS by limiting the number of errors we might collect
const MAX_ERRORS_PER_SEQUENCED_ENTRY: usize = 10;
@ -925,11 +920,7 @@ impl Db {
chunk.mutable_buffer().expect("cannot mutate open chunk");
if let Err(e) = mb_chunk
.write_table_batch(
sequenced_entry.sequence().id,
sequenced_entry.sequence().number,
table_batch,
)
.write_table_batch(sequence, table_batch)
.context(WriteEntry {
partition_key,
chunk_id,
@ -957,11 +948,7 @@ impl Db {
);
if let Err(e) = mb_chunk
.write_table_batch(
sequenced_entry.sequence().id,
sequenced_entry.sequence().number,
table_batch,
)
.write_table_batch(sequence, table_batch)
.context(WriteEntryInitial { partition_key })
{
if errors.len() < MAX_ERRORS_PER_SEQUENCED_ENTRY {
@ -978,7 +965,7 @@ impl Db {
match partition.persistence_windows() {
Some(windows) => {
windows.add_range(
sequenced_entry.as_ref().sequence(),
sequence,
row_count,
min_time,
max_time,
@ -988,7 +975,7 @@ impl Db {
None => {
let mut windows = PersistenceWindows::new(late_arrival_window);
windows.add_range(
sequenced_entry.as_ref().sequence(),
sequence,
row_count,
min_time,
max_time,
@ -1121,6 +1108,7 @@ mod tests {
use bytes::Bytes;
use chrono::Utc;
use futures::{stream, StreamExt, TryStreamExt};
use mutable_buffer::persistence_windows::MinMaxSequence;
use tokio_util::sync::CancellationToken;
use ::test_helpers::assert_contains;
@ -1966,22 +1954,48 @@ mod tests {
#[tokio::test]
async fn write_updates_persistence_windows() {
let db = Arc::new(make_db().await.db);
let now = Utc::now().timestamp_nanos() as u64;
// Writes should update the persistence windows when there
// is a write buffer configured.
let write_buffer = Arc::new(MockBuffer::default());
let db = TestDb::builder()
.write_buffer(Arc::clone(&write_buffer) as _)
.build()
.await
.db;
let partition_key = "1970-01-01T00";
write_lp(&db, "cpu bar=1 10").await;
let later = Utc::now().timestamp_nanos() as u64;
write_lp(&db, "cpu bar=1 10").await; // seq 0
write_lp(&db, "cpu bar=1 20").await; // seq 1
write_lp(&db, "cpu bar=1 30").await; // seq 2
let partition = db.catalog.partition("cpu", partition_key).unwrap();
let mut partition = partition.write();
let windows = partition.persistence_windows().unwrap();
let seq = windows.minimum_unpersisted_sequence().unwrap();
let seq = seq.get(&1).unwrap();
assert!(now < seq.min && later > seq.min);
let seq = seq.get(&0).unwrap();
assert_eq!(seq, &MinMaxSequence { min: 0, max: 2 });
}
#[tokio::test]
async fn write_with_no_write_buffer_updates_sequence() {
let db = Arc::new(make_db().await.db);
let partition_key = "1970-01-01T00";
write_lp(&db, "cpu bar=1 10").await;
write_lp(&db, "cpu bar=1 20").await;
let partition = db.catalog.partition("cpu", partition_key).unwrap();
let mut partition = partition.write();
// validate it has data
let table_summary = partition.summary().table;
assert_eq!(&table_summary.name, "cpu");
assert_eq!(table_summary.count(), 2);
let windows = partition.persistence_windows().unwrap();
let open_min = windows.open_min_time().unwrap();
let open_max = windows.open_max_time().unwrap();
assert_eq!(open_min.timestamp_nanos(), 10);
assert_eq!(open_max.timestamp_nanos(), 20);
}
#[tokio::test]

View File

@ -334,7 +334,7 @@ impl Catalog {
#[cfg(test)]
mod tests {
use entry::test_helpers::lp_to_entry;
use entry::{test_helpers::lp_to_entry, Sequence};
use super::*;
@ -350,7 +350,10 @@ mod tests {
mutable_buffer::chunk::ChunkMetrics::new_unregistered(),
);
mb_chunk.write_table_batch(1, 5, batch).unwrap();
let sequence = Some(Sequence::new(1, 5));
mb_chunk
.write_table_batch(sequence.as_ref(), batch)
.unwrap();
partition.create_open_chunk(mb_chunk);
}

View File

@ -819,7 +819,7 @@ impl CatalogChunk {
#[cfg(test)]
mod tests {
use entry::test_helpers::lp_to_entry;
use entry::{test_helpers::lp_to_entry, Sequence};
use mutable_buffer::chunk::{ChunkMetrics as MBChunkMetrics, MBChunk};
use parquet_file::{
chunk::ParquetChunk,
@ -911,7 +911,10 @@ mod tests {
let entry = lp_to_entry(&format!("{} bar=1 10", table_name));
let write = entry.partition_writes().unwrap().remove(0);
let batch = write.table_batches().remove(0);
mb_chunk.write_table_batch(sequencer_id, 1, batch).unwrap();
let sequence = Some(Sequence::new(sequencer_id, 1));
mb_chunk
.write_table_batch(sequence.as_ref(), batch)
.unwrap();
mb_chunk
}

View File

@ -27,6 +27,7 @@ impl ProcessClock {
///
/// We expect that updates to the process clock are not so frequent and the system is slow
/// enough that the returned value will be incremented by at least 1.
#[allow(dead_code)]
pub fn next(&self) -> ClockValue {
let next = loop {
if let Ok(next) = self.try_update() {
@ -37,6 +38,7 @@ impl ProcessClock {
ClockValue::try_from(next).expect("process clock should not be 0")
}
#[allow(dead_code)]
fn try_update(&self) -> Result<u64, u64> {
let now = system_clock_now();
let current_process_clock = self.inner.load(Ordering::SeqCst);

View File

@ -1,5 +1,5 @@
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
use entry::test_helpers::lp_to_entries;
use entry::{test_helpers::lp_to_entries, Sequence};
use flate2::read::GzDecoder;
use mutable_buffer::chunk::{ChunkMetrics, MBChunk};
use std::io::Read;
@ -18,11 +18,12 @@ fn chunk(count: usize) -> MBChunk {
let mut lp = String::new();
gz.read_to_string(&mut lp).unwrap();
let sequence = Some(Sequence::new(1, 5));
for _ in 0..count {
for entry in lp_to_entries(&lp) {
for write in entry.partition_writes().iter().flatten() {
for batch in write.table_batches() {
chunk.write_table_batch(1, 5, batch).unwrap();
chunk.write_table_batch(sequence.as_ref(), batch).unwrap();
}
}
}

View File

@ -1,5 +1,5 @@
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
use entry::{test_helpers::lp_to_entries, Entry};
use entry::{test_helpers::lp_to_entries, Entry, Sequence};
use flate2::read::GzDecoder;
use mutable_buffer::chunk::{ChunkMetrics, MBChunk};
use std::io::Read;
@ -9,11 +9,12 @@ fn write_chunk(count: usize, entries: &[Entry]) {
// m0 is hard coded into tag_values.lp.gz
let mut chunk = MBChunk::new("m0", ChunkMetrics::new_unregistered());
let sequence = Some(Sequence::new(1, 5));
for _ in 0..count {
for entry in entries {
for write in entry.partition_writes().iter().flatten() {
for batch in write.table_batches() {
chunk.write_table_batch(1, 5, batch).unwrap();
chunk.write_table_batch(sequence.as_ref(), batch).unwrap();
}
}
}