feat: add sequenced entry builder

pull/24376/head
Paul Dix 2021-04-07 17:22:41 -04:00
parent 1bd4ef592f
commit 0c082e2347
1 changed files with 333 additions and 1 deletions

View File

@ -11,7 +11,7 @@ use std::{collections::BTreeMap, convert::TryFrom};
use chrono::{DateTime, Utc};
use flatbuffers::{FlatBufferBuilder, Follow, ForwardsUOffset, Vector, VectorIter, WIPOffset};
use ouroboros::self_referencing;
use snafu::{ResultExt, Snafu};
use snafu::{OptionExt, ResultExt, Snafu};
use std::fmt::Formatter;
#[derive(Debug, Snafu)]
@ -35,6 +35,9 @@ pub enum Error {
line_number: usize,
source: ColumnError,
},
#[snafu(display("invalid flatbuffers: field {} is required", field))]
FlatbufferFieldMissing { field: String },
}
#[derive(Debug, Snafu)]
@ -1079,6 +1082,253 @@ enum ColumnRaw<'a> {
Bool(Vec<bool>),
}
#[self_referencing]
#[derive(Debug)]
pub struct SequencedEntry {
data: Vec<u8>,
#[borrows(data)]
#[covariant]
fb: entry_fb::SequencedEntry<'this>,
}
impl SequencedEntry {
pub fn new_from_entry(clock_value: u64, writer_id: u32, entry: &Entry) -> Result<Self> {
let mut fbb = FlatBufferBuilder::new_with_capacity(1024);
let entry = match entry.fb().operation_type() {
entry_fb::Operation::write => {
let partition_writes =
entry.partition_writes().context(FlatbufferFieldMissing {
field: "partition writes",
})?;
let partition_writes = partition_writes.iter().map(|w|{
let table_batches = w.table_batches().iter().map(|t| {
let columns = t.columns().iter().map(|c| {
let null_mask = c.fb.null_mask().map(|n| {
fbb.create_vector_direct(n)
});
let name = fbb.create_string(c.fb.name().context(FlatbufferFieldMissing{field: "column name"})?);
let values = match c.fb.values_type() {
entry_fb::ColumnValues::StringValues => {
let values = c
.fb
.values_as_string_values()
.expect("invalid flatbuffers")
.values()
.expect("flatbuffers StringValues must have string values set")
.iter()
.map(|v| fbb.create_string(v))
.collect::<Vec<_>>();
let values = fbb.create_vector(&values);
let values = entry_fb::StringValues::create(
&mut fbb,
&entry_fb::StringValuesArgs{
values: Some(values),
},
);
values.as_union_value()
},
entry_fb::ColumnValues::I64Values => {
let values = c
.fb
.values_as_i64values()
.expect("invalid flatbuffers")
.values()
.expect("flatbuffers I64Values must have i64 values set")
.iter()
.collect::<Vec<_>>();
let values = fbb.create_vector(&values);
let values = entry_fb::I64Values::create(
&mut fbb,
&entry_fb::I64ValuesArgs{
values: Some(values),
}
);
values.as_union_value()
},
entry_fb::ColumnValues::F64Values => {
let values = c
.fb
.values_as_f64values()
.expect("invalid flatbuffers")
.values()
.expect("flatbuffers F64Values must have f64 values set")
.iter()
.collect::<Vec<_>>();
let values = fbb.create_vector(&values);
let values = entry_fb::F64Values::create(
&mut fbb,
&entry_fb::F64ValuesArgs{
values: Some(values),
}
);
values.as_union_value()
},
entry_fb::ColumnValues::U64Values => {
let values = c
.fb
.values_as_u64values()
.expect("invalid flatbuffers")
.values()
.expect("flatbuffers U64Values must have u64 values set")
.iter()
.collect::<Vec<_>>();
let values = fbb.create_vector(&values);
let values = entry_fb::U64Values::create(
&mut fbb,
&entry_fb::U64ValuesArgs{
values: Some(values),
}
);
values.as_union_value()
},
entry_fb::ColumnValues::BoolValues => {
let values = c
.fb
.values_as_bool_values()
.expect("invalid flatbuffers")
.values()
.expect("flatbuffers BoolValues must have bool values set")
.iter()
.copied()
.collect::<Vec<_>>();
let values = fbb.create_vector(&values);
let values = entry_fb::BoolValues::create(
&mut fbb,
&entry_fb::BoolValuesArgs{
values: Some(values),
}
);
values.as_union_value()
}
_ => panic!("unsupported column"),
};
Ok(entry_fb::Column::create(
&mut fbb,
&entry_fb::ColumnArgs{
name: Some(name),
logical_column_type: c.fb.logical_column_type(),
values_type: c.fb.values_type(),
values: Some(values),
null_mask,
}
))
}).collect::<Result<Vec<_>>>()?;
let columns = fbb.create_vector(&columns);
let table_name = fbb.create_string(t.name().context(FlatbufferFieldMissing{field: "table name"})?);
Ok(entry_fb::TableWriteBatch::create(
&mut fbb,
&entry_fb::TableWriteBatchArgs{
name: Some(table_name),
columns: Some(columns),
},
))
}).collect::<Result<Vec<_>>>()?;
let table_batches = fbb.create_vector(&table_batches);
let key = fbb.create_string(w.key().context(FlatbufferFieldMissing{field: "partition key"})?);
Ok(entry_fb::PartitionWrite::create(
&mut fbb,
&entry_fb::PartitionWriteArgs{
key: Some(key),
table_batches: Some(table_batches),
}
))
} ).collect::<Result<Vec<_>>>()?;
let partition_writes = fbb.create_vector(&partition_writes);
let write_operations = entry_fb::WriteOperations::create(
&mut fbb,
&entry_fb::WriteOperationsArgs {
partition_writes: Some(partition_writes),
},
);
entry_fb::Entry::create(
&mut fbb,
&entry_fb::EntryArgs {
operation_type: entry_fb::Operation::write,
operation: Some(write_operations.as_union_value()),
},
)
}
_ => panic!("unsupported"),
};
let sequenced_entry = entry_fb::SequencedEntry::create(
&mut fbb,
&entry_fb::SequencedEntryArgs {
clock_value,
writer_id,
entry: Some(entry),
},
);
fbb.finish(sequenced_entry, None);
let (mut data, idx) = fbb.collapse();
let sequenced_entry = Self::try_from(data.split_off(idx))
.expect("Flatbuffer data just constructed should be valid");
Ok(sequenced_entry)
}
/// Returns the Flatbuffers struct for the SequencedEntry
pub fn fb(&self) -> &entry_fb::SequencedEntry<'_> {
self.borrow_fb()
}
pub fn partition_writes(&self) -> Option<Vec<PartitionWrite<'_>>> {
match self.fb().entry().as_ref() {
Some(e) => match e.operation_as_write().as_ref() {
Some(w) => w
.partition_writes()
.as_ref()
.map(|w| w.iter().map(|fb| PartitionWrite { fb }).collect::<Vec<_>>()),
None => None,
},
None => None,
}
}
pub fn clock_value(&self) -> u64 {
self.fb().clock_value()
}
pub fn writer_id(&self) -> u32 {
self.fb().writer_id()
}
}
impl TryFrom<Vec<u8>> for SequencedEntry {
type Error = flatbuffers::InvalidFlatbuffer;
fn try_from(data: Vec<u8>) -> Result<Self, Self::Error> {
SequencedEntryTryBuilder {
data,
fb_builder: |data| flatbuffers::root::<entry_fb::SequencedEntry<'_>>(data),
}
.try_build()
}
}
#[cfg(test)]
mod tests {
use super::*;
@ -1543,6 +1793,88 @@ mod tests {
assert!(sharded_entries.is_err());
}
#[test]
fn sequenced_entry() {
let lp = vec![
"a,host=a val=23i 983",
"a,host=a,region=west val2=23.2 2343",
"a val=21i,bool=true,string=\"hello\" 222",
]
.join("\n");
let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect();
let sharded_entries =
lines_to_sharded_entries(&lines, &sharder(1), &partitioner(1)).unwrap();
let sequenced_entry =
SequencedEntry::new_from_entry(23, 2, &sharded_entries.first().unwrap().entry).unwrap();
assert_eq!(sequenced_entry.clock_value(), 23);
assert_eq!(sequenced_entry.writer_id(), 2);
let partition_writes = sequenced_entry.partition_writes().unwrap();
let table_batches = partition_writes.first().unwrap().table_batches();
let batch = table_batches.first().unwrap();
let columns = batch.columns();
assert_eq!(batch.row_count(), 3);
assert_eq!(columns.len(), 7);
let col = columns.get(0).unwrap();
assert_eq!(col.name().unwrap(), "bool");
assert_eq!(col.logical_type(), entry_fb::LogicalColumnType::Field);
let values = col.values().bool_values().unwrap();
assert_eq!(&values, &[None, None, Some(true)]);
let col = columns.get(1).unwrap();
assert_eq!(col.name().unwrap(), "host");
assert_eq!(col.logical_type(), entry_fb::LogicalColumnType::Tag);
let values = match col.values() {
TypedValuesIterator::String(v) => v,
_ => panic!("wrong type"),
};
let values = values.collect::<Vec<_>>();
assert_eq!(&values, &[Some("a"), Some("a"), None]);
let col = columns.get(2).unwrap();
assert_eq!(col.name().unwrap(), "region");
assert_eq!(col.logical_type(), entry_fb::LogicalColumnType::Tag);
let values = match col.values() {
TypedValuesIterator::String(v) => v,
_ => panic!("wrong type"),
};
let values = values.collect::<Vec<_>>();
assert_eq!(&values, &[None, Some("west"), None]);
let col = columns.get(3).unwrap();
assert_eq!(col.name().unwrap(), "string");
assert_eq!(col.logical_type(), entry_fb::LogicalColumnType::Field);
let values = match col.values() {
TypedValuesIterator::String(v) => v,
_ => panic!("wrong type"),
};
let values = values.collect::<Vec<_>>();
assert_eq!(&values, &[None, None, Some("hello")]);
let col = columns.get(4).unwrap();
assert_eq!(col.name().unwrap(), TIME_COLUMN_NAME);
assert_eq!(col.logical_type(), entry_fb::LogicalColumnType::Time);
let values = col.values().i64_values().unwrap();
assert_eq!(&values, &[Some(983), Some(2343), Some(222)]);
let col = columns.get(5).unwrap();
assert_eq!(col.name().unwrap(), "val");
assert_eq!(col.logical_type(), entry_fb::LogicalColumnType::Field);
let values = col.values().i64_values().unwrap();
assert_eq!(&values, &[Some(23), None, Some(21)]);
let col = columns.get(6).unwrap();
assert_eq!(col.name().unwrap(), "val2");
assert_eq!(col.logical_type(), entry_fb::LogicalColumnType::Field);
let values = col.values().f64_values().unwrap();
assert_eq!(&values, &[None, Some(23.2), None]);
}
fn sharder(count: u16) -> TestSharder {
TestSharder {
count,