fix: ensure all rows are emitted for each column

pull/24376/head
Edd Robinson 2020-06-24 11:52:19 +01:00
parent 305b919bac
commit 9d889828c3
5 changed files with 357 additions and 195 deletions

1
Cargo.lock generated
View File

@ -686,6 +686,7 @@ dependencies = [
"delorean_test_helpers",
"delorean_tsm",
"env_logger",
"libflate",
"log",
"snafu",
]

View File

@ -18,3 +18,4 @@ delorean_tsm = { path = "../delorean_tsm" }
[dev-dependencies]
delorean_test_helpers ={ path = "../delorean_test_helpers" }
libflate = "1.0.0"

View File

@ -21,7 +21,9 @@ use delorean_table::{
ByteArray, DeloreanTableWriter, DeloreanTableWriterSource, Error as TableError,
};
use delorean_table_schema::{DataType, Schema, SchemaBuilder};
use delorean_tsm::mapper::{map_field_columns, ColumnData, TSMMeasurementMapper};
use delorean_tsm::mapper::{
map_field_columns, BlockDecoder, ColumnData, MeasurementTable, TSMMeasurementMapper,
};
use delorean_tsm::reader::{TSMBlockReader, TSMIndexReader};
use delorean_tsm::{BlockType, TSMError};
@ -530,155 +532,8 @@ impl TSMFileConverter {
for measurement in mapper {
match measurement {
Ok(mut m) => {
let mut builder = SchemaBuilder::new(&m.name);
let mut packed_columns: Vec<Packers> = Vec::new();
let mut tks = Vec::new();
for tag in m.tag_columns() {
builder = builder.tag(tag);
tks.push(tag.clone());
packed_columns.push(Packers::String(Packer::new()));
}
let mut fks = Vec::new();
for (field_key, block_type) in m.field_columns().to_owned() {
builder = builder.field(&field_key, DataType::from(&block_type));
fks.push((field_key.clone(), block_type));
packed_columns.push(Packers::String(Packer::new())); // FIXME - will change
}
// Account for timestamp
packed_columns.push(Packers::Integer(Packer::new()));
let schema = builder.build();
// get mapping between named columns and packer indexes.
let name_packer = schema
.get_col_defs()
.iter()
.map(|c| (c.name.clone(), c.index as usize))
.collect::<BTreeMap<String, usize>>();
// For each tagset combination in the measurement I need
// to build out the table. Then for each column in the
// table I need to convert to a Packer<T> and append it
// to the packer_column.
for (tag_set_pair, blocks) in m.tag_set_fields_blocks() {
let (ts, field_cols) = map_field_columns(&mut block_reader, blocks)
.map_err(|e| Error::TSMProcessing { source: e })?;
// Start with the timestamp column.
let col_len = ts.len();
let ts_idx =
name_packer
.get(schema.timestamp())
.ok_or(Error::TSMProcessing {
// TODO clean this error up
source: TSMError {
description: "could not find ts column".to_string(),
},
})?;
packed_columns[*ts_idx] = Packers::from(ts);
// Next let's pad out all of the tag columns we know have
// repeated values.
for (tag_key, tag_value) in tag_set_pair {
let idx = name_packer.get(tag_key).ok_or(Error::TSMProcessing {
// TODO clean this error up
source: TSMError {
description: "could not find column".to_string(),
},
})?;
// this will create a column of repeated values.
packed_columns[*idx] = Packers::from_elem_str(tag_value, col_len);
}
// Next let's write out NULL values for any tag columns
// on the measurement that we don't have values for
// because they're not part of this tagset.
let tag_keys = tag_set_pair
.iter()
.map(|pair| pair.0.clone())
.collect::<BTreeSet<String>>();
for key in &tks {
if tag_keys.contains(key) {
continue;
}
let idx = name_packer.get(key).ok_or(Error::TSMProcessing {
// TODO clean this error up
source: TSMError {
description: "could not find column".to_string(),
},
})?;
// this will create a column of repeated None values.
let col: Vec<Option<Vec<u8>>> = vec![None; col_len];
packed_columns[*idx] = Packers::from(col);
}
// Next let's write out all of the field column data.
let mut got_field_cols = Vec::new();
for (field_key, field_values) in field_cols {
let idx = name_packer.get(&field_key).ok_or(Error::TSMProcessing {
// TODO clean this error up
source: TSMError {
description: "could not find column".to_string(),
},
})?;
match field_values {
ColumnData::Float(v) => packed_columns[*idx] = Packers::from(v),
ColumnData::Integer(v) => packed_columns[*idx] = Packers::from(v),
ColumnData::Str(v) => packed_columns[*idx] = Packers::from(v),
ColumnData::Bool(v) => packed_columns[*idx] = Packers::from(v),
ColumnData::Unsigned(v) => packed_columns[*idx] = Packers::from(v),
}
got_field_cols.push(field_key);
}
// Finally let's write out all of the field columns that
// we don't have values for here.
for (key, field_type) in &fks {
if got_field_cols.contains(key) {
continue;
}
let idx = name_packer.get(key).ok_or(Error::TSMProcessing {
// TODO clean this error up
source: TSMError {
description: "could not find column".to_string(),
},
})?;
// this will create a column of repeated None values.
match field_type {
BlockType::Float => {
let col: Vec<Option<f64>> = vec![None; col_len];
packed_columns[*idx] = Packers::from(col);
}
BlockType::Integer => {
let col: Vec<Option<i64>> = vec![None; col_len];
packed_columns[*idx] = Packers::from(col);
}
BlockType::Bool => {
let col: Vec<Option<bool>> = vec![None; col_len];
packed_columns[*idx] = Packers::from(col);
}
BlockType::Str => {
let col: Vec<Option<Vec<u8>>> = vec![None; col_len];
packed_columns[*idx] = Packers::from(col);
}
BlockType::Unsigned => {
let col: Vec<Option<u64>> = vec![None; col_len];
packed_columns[*idx] = Packers::from(col);
}
}
}
}
let (schema, packed_columns) =
Self::process_measurement_table(&mut block_reader, &mut m)?;
let mut table_writer = self
.table_writer_source
@ -697,6 +552,238 @@ impl TSMFileConverter {
}
Ok(())
}
// Given a measurement table `process_measurement_table` produces an
// appropriate schema and set of Packers.
fn process_measurement_table<R: BufRead + Seek>(
// block_reader: impl BlockDecoder,
mut block_reader: &mut TSMBlockReader<R>,
m: &mut MeasurementTable,
) -> Result<(Schema, Vec<Packers>), Error> {
let mut builder = SchemaBuilder::new(&m.name);
let mut packed_columns: Vec<Packers> = Vec::new();
let mut tks = Vec::new();
for tag in m.tag_columns() {
builder = builder.tag(tag);
tks.push(tag.clone());
packed_columns.push(Packers::String(Packer::new()));
}
let mut fks = Vec::new();
for (field_key, block_type) in m.field_columns().to_owned() {
builder = builder.field(&field_key, DataType::from(&block_type));
fks.push((field_key.clone(), block_type));
packed_columns.push(Packers::String(Packer::new())); // FIXME - will change
}
// Account for timestamp
packed_columns.push(Packers::Integer(Packer::new()));
let schema = builder.build();
// get mapping between named columns and packer indexes.
let name_packer = schema
.get_col_defs()
.iter()
.map(|c| (c.name.clone(), c.index as usize))
.collect::<BTreeMap<String, usize>>();
// For each tagset combination in the measurement I need
// to build out the table. Then for each column in the
// table I need to convert to a Packer<T> and append it
// to the packer_column.
for (i, (tag_set_pair, blocks)) in m.tag_set_fields_blocks().iter_mut().enumerate() {
let (ts, field_cols) = map_field_columns(&mut block_reader, blocks)
.map_err(|e| Error::TSMProcessing { source: e })?;
// Start with the timestamp column.
let col_len = ts.len();
let ts_idx = name_packer
.get(schema.timestamp())
.ok_or(Error::TSMProcessing {
// TODO clean this error up
source: TSMError {
description: "could not find ts column".to_string(),
},
})?;
if i == 0 {
packed_columns[*ts_idx] = Packers::from(ts);
} else {
packed_columns[*ts_idx]
.i64_packer_mut()
.extend_from_slice(&ts);
}
// Next let's pad out all of the tag columns we know have
// repeated values.
for (tag_key, tag_value) in tag_set_pair {
let idx = name_packer.get(tag_key).ok_or(Error::TSMProcessing {
// TODO clean this error up
source: TSMError {
description: "could not find column".to_string(),
},
})?;
// this will create a column of repeated values.
if i == 0 {
packed_columns[*idx] = Packers::from_elem_str(tag_value, col_len);
} else {
packed_columns[*idx]
.str_packer_mut()
.extend_from_slice(&vec![ByteArray::from(tag_value.as_ref()); col_len]);
}
}
// Next let's write out NULL values for any tag columns
// on the measurement that we don't have values for
// because they're not part of this tagset.
let tag_keys = tag_set_pair
.iter()
.map(|pair| pair.0.clone())
.collect::<BTreeSet<String>>();
for key in &tks {
if tag_keys.contains(key) {
continue;
}
let idx = name_packer.get(key).ok_or(Error::TSMProcessing {
// TODO clean this error up
source: TSMError {
description: "could not find column".to_string(),
},
})?;
if i == 0 {
// creates a column of repeated None values.
let col: Vec<Option<Vec<u8>>> = vec![None; col_len];
packed_columns[*idx] = Packers::from(col);
} else {
// pad out column with None values because we don't have a
// value for it.
packed_columns[*idx].str_packer_mut().pad_with_null(col_len);
}
}
// Next let's write out all of the field column data.
let mut got_field_cols = Vec::new();
for (field_key, field_values) in field_cols {
let idx = name_packer.get(&field_key).ok_or(Error::TSMProcessing {
// TODO clean this error up
source: TSMError {
description: "could not find column".to_string(),
},
})?;
if i == 0 {
match field_values {
ColumnData::Float(v) => packed_columns[*idx] = Packers::from(v),
ColumnData::Integer(v) => packed_columns[*idx] = Packers::from(v),
ColumnData::Str(v) => packed_columns[*idx] = Packers::from(v),
ColumnData::Bool(v) => packed_columns[*idx] = Packers::from(v),
ColumnData::Unsigned(v) => packed_columns[*idx] = Packers::from(v),
}
} else {
match field_values {
ColumnData::Float(v) => packed_columns[*idx]
.f64_packer_mut()
.extend_from_option_slice(&v),
ColumnData::Integer(v) => packed_columns[*idx]
.i64_packer_mut()
.extend_from_option_slice(&v),
ColumnData::Str(values) => {
// TODO fix this up....
let col = packed_columns[*idx].str_packer_mut();
for value in values {
match value {
Some(v) => col.push(ByteArray::from(v)),
None => col.push_option(None),
}
}
}
ColumnData::Bool(v) => packed_columns[*idx]
.bool_packer_mut()
.extend_from_option_slice(&v),
ColumnData::Unsigned(values) => {
// TODO fix this up....
let col = packed_columns[*idx].i64_packer_mut();
for value in values {
match value {
Some(v) => col.push(v as i64),
None => col.push_option(None),
}
}
}
}
}
got_field_cols.push(field_key);
}
// Finally let's write out all of the field columns that
// we don't have values for here.
for (key, field_type) in &fks {
if got_field_cols.contains(key) {
continue;
}
let idx = name_packer.get(key).ok_or(Error::TSMProcessing {
// TODO clean this error up
source: TSMError {
description: "could not find column".to_string(),
},
})?;
// this will create a column of repeated None values.
if i == 0 {
match field_type {
BlockType::Float => {
let col: Vec<Option<f64>> = vec![None; col_len];
packed_columns[*idx] = Packers::from(col);
}
BlockType::Integer => {
let col: Vec<Option<i64>> = vec![None; col_len];
packed_columns[*idx] = Packers::from(col);
}
BlockType::Bool => {
let col: Vec<Option<bool>> = vec![None; col_len];
packed_columns[*idx] = Packers::from(col);
}
BlockType::Str => {
let col: Vec<Option<Vec<u8>>> = vec![None; col_len];
packed_columns[*idx] = Packers::from(col);
}
BlockType::Unsigned => {
let col: Vec<Option<u64>> = vec![None; col_len];
packed_columns[*idx] = Packers::from(col);
}
}
} else {
match field_type {
BlockType::Float => {
packed_columns[*idx].f64_packer_mut().pad_with_null(col_len);
}
BlockType::Integer => {
packed_columns[*idx].i64_packer_mut().pad_with_null(col_len);
}
BlockType::Bool => {
packed_columns[*idx]
.bool_packer_mut()
.pad_with_null(col_len);
}
BlockType::Str => {
packed_columns[*idx].str_packer_mut().pad_with_null(col_len);
}
BlockType::Unsigned => {
packed_columns[*idx].i64_packer_mut().pad_with_null(col_len);
}
}
}
}
}
Ok((schema, packed_columns))
}
}
impl std::fmt::Debug for TSMFileConverter {
@ -716,6 +803,12 @@ mod delorean_ingest_tests {
use delorean_table_schema::ColumnDefinition;
use delorean_test_helpers::approximately_equal;
use libflate::gzip;
use std::fs::File;
use std::io::BufReader;
use std::io::Cursor;
use std::io::Read;
use std::sync::{Arc, Mutex};
/// Record what happens when the writer is created so we can
@ -724,6 +817,7 @@ mod delorean_ingest_tests {
struct WriterLog {
events: Vec<String>,
}
impl WriterLog {
fn new() -> Self {
Self { events: Vec::new() }
@ -1366,36 +1460,4 @@ mod delorean_ingest_tests {
Ok(())
}
// ----- Tests for TSM Data -----
// #[test]
// fn conversion_tsm_files() -> Result<(), Error> {
// let log = Arc::new(Mutex::new(WriterLog::new()));
// // let mut converter =
// // LineProtocolConverter::new(settings, NoOpWriterSource::new(log.clone()));
// // converter
// // .convert(parsed_lines)
// // .expect("conversion ok")
// // .finalize()
// // .expect("finalize");
// assert_eq!(
// get_events(&log),
// vec![
// "Created writer for measurement h2o_temperature",
// "Created writer for measurement air_temperature",
// "[air_temperature] Wrote batch of 4 cols, 3 rows",
// "[h2o_temperature] Wrote batch of 4 cols, 3 rows",
// "[air_temperature] Wrote batch of 4 cols, 1 rows",
// "[air_temperature] Closed",
// "[h2o_temperature] Wrote batch of 4 cols, 2 rows",
// "[h2o_temperature] Closed",
// ]
// );
// Ok(())
// }
}

View File

@ -5,6 +5,8 @@
// Note the maintainability of this code is not likely high (it came
// from the copy pasta factory) but the plan is to replace it
// soon... We'll see how long that actually takes...
use std::iter;
use parquet::data_type::ByteArray;
use std::default::Default;
@ -58,6 +60,15 @@ impl Packers {
}
}
pub fn push_none(&mut self) {
match self {
Self::Float(p) => p.push_option(None),
Self::Integer(p) => p.push_option(None),
Self::String(p) => p.push_option(None),
Self::Boolean(p) => p.push_option(None),
}
}
/// See description on `Packer::num_rows`
pub fn num_rows(&self) -> usize {
match self {
@ -68,15 +79,6 @@ impl Packers {
}
}
pub fn push_none(&mut self) {
match self {
Self::Float(p) => p.push_option(None),
Self::Integer(p) => p.push_option(None),
Self::String(p) => p.push_option(None),
Self::Boolean(p) => p.push_option(None),
}
}
/// Determines if the value for `row` is null is null.
///
/// If there is no row then `is_null` returns `true`.
@ -181,13 +183,19 @@ impl std::convert::From<Vec<Option<u64>>> for Packers {
}
#[derive(Debug, Default)]
pub struct Packer<T: Default> {
pub struct Packer<T>
where
T: Default + Clone,
{
values: Vec<T>,
def_levels: Vec<i16>,
rep_levels: Vec<i16>,
}
impl<T: Default> Packer<T> {
impl<T> Packer<T>
where
T: Default + Clone,
{
pub fn new() -> Self {
Self {
values: Vec::new(),
@ -273,6 +281,37 @@ impl<T: Default> Packer<T> {
self.rep_levels.push(1);
}
pub fn extend_from_packer(&mut self, other: &Packer<T>) {
self.values.extend_from_slice(&other.values);
self.def_levels.extend_from_slice(&other.def_levels);
self.rep_levels.extend_from_slice(&other.rep_levels);
}
pub fn extend_from_slice(&mut self, other: &[T]) {
self.values.extend_from_slice(other);
self.def_levels.extend(iter::repeat(1).take(other.len()));
self.rep_levels.extend(iter::repeat(1).take(other.len()));
}
pub fn extend_from_option_slice(&mut self, other: &[Option<T>]) {
for v in other {
self.push_option(v.clone()); // TODO(edd): perf here.
}
}
pub fn fill_with(&mut self, value: T, additional: usize) {
self.values.extend(iter::repeat(value).take(additional));
self.def_levels.extend(iter::repeat(1).take(additional));
self.rep_levels.extend(iter::repeat(1).take(additional));
}
pub fn pad_with_null(&mut self, additional: usize) {
self.values
.extend(iter::repeat(T::default()).take(additional));
self.def_levels.extend(iter::repeat(0).take(additional));
self.rep_levels.extend(iter::repeat(1).take(additional));
}
/// Return true if the row for index is null. Returns true if there is no
/// row for index.
pub fn is_null(&self, index: usize) -> bool {
@ -282,7 +321,10 @@ impl<T: Default> Packer<T> {
// Convert `Vec<T>`, e.g., `Vec<f64>` into the appropriate `Packer<T>` value,
// e.g., `Packer<f64>`.
impl<T: Default> std::convert::From<Vec<T>> for Packer<T> {
impl<T> std::convert::From<Vec<T>> for Packer<T>
where
T: Default + Clone,
{
fn from(v: Vec<T>) -> Self {
Self {
def_levels: vec![1; v.len()],
@ -294,7 +336,10 @@ impl<T: Default> std::convert::From<Vec<T>> for Packer<T> {
// Convert `Vec<Option<T>>`, e.g., `Vec<Option<f64>>` into the appropriate
// `Packer<T>` value, e.g., `Packer<f64>`.
impl<T: Default> std::convert::From<Vec<Option<T>>> for Packer<T> {
impl<T> std::convert::From<Vec<Option<T>>> for Packer<T>
where
T: Default + Clone,
{
fn from(values: Vec<Option<T>>) -> Self {
let mut packer = Self::new();
for v in values {
@ -316,6 +361,50 @@ mod test {
assert_eq!(packer.rep_levels.capacity(), 42);
}
#[test]
fn extend_from_slice() {
let mut packer: Packer<i64> = Packer::new();
packer.push(100);
packer.push(22);
packer.extend_from_slice(&[2, 3, 4]);
assert_eq!(packer.values, &[100, 22, 2, 3, 4]);
assert_eq!(packer.def_levels, &[1; 5]);
assert_eq!(packer.rep_levels, &[1; 5]);
}
#[test]
fn extend_from_packer() {
let mut packer_a: Packer<i64> = Packer::new();
packer_a.push(100);
packer_a.push(22);
let mut packer_b = Packer::new();
packer_b.push(3);
packer_a.extend_from_packer(&packer_b);
assert_eq!(packer_a.values, &[100, 22, 3]);
assert_eq!(packer_a.def_levels, &[1; 3]);
assert_eq!(packer_a.rep_levels, &[1; 3]);
}
#[test]
fn pad_with_null() {
let mut packer: Packer<i64> = Packer::new();
packer.push(100);
packer.push(22);
packer.pad_with_null(3);
assert_eq!(
packer.values,
&[100, 22, i64::default(), i64::default(), i64::default()]
);
assert_eq!(packer.def_levels, &[1, 1, 0, 0, 0]);
assert_eq!(packer.rep_levels, &[1; 5]);
}
#[test]
fn is_null() {
let mut packer: Packer<f64> = Packer::new();

View File

@ -242,6 +242,15 @@ where
}
}
impl<R> BlockDecoder for &mut &mut TSMBlockReader<R>
where
R: BufRead + Seek,
{
fn block_data(&mut self, block: &Block) -> Result<BlockData, TSMError> {
self.decode_block(block)
}
}
// Maps multiple columnar field blocks to a single tablular representation.
//
// Given a set of field keys and a set of blocks for each key,