refactor: make Packer generic

pull/24376/head
Edd Robinson 2020-06-18 09:27:29 +01:00
parent ac153fa3d2
commit ac7bb6bf68
7 changed files with 377 additions and 571 deletions

1
Cargo.lock generated
View File

@ -685,6 +685,7 @@ dependencies = [
"delorean_test_helpers",
"env_logger",
"log",
"parquet",
"snafu",
]

View File

@ -7,6 +7,14 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
# I get a build error when I use this one:
# failed to run custom build command for `arrow-flight v0.17.0`
#parquet = "0.17.0"
# this, we are living on the edge and pull directly from the arrow repo.
# https://github.com/apache/arrow/commit/04a1867eeb58f0c515e7ee5a6300a8f61045a6cd
parquet = { git = "https://github.com/apache/arrow.git", rev="04a1867eeb58f0c515e7ee5a6300a8f61045a6cd", version = "1.0.0-SNAPSHOT" }
snafu = "0.6.2"
env_logger = "0.7.1"
log = "0.4.8"

View File

@ -5,12 +5,14 @@
//! Currently supports converting LineProtocol
//! TODO move this to delorean/src/ingest/line_protocol.rs?
use log::debug;
use parquet::data_type::ByteArray;
use snafu::{ResultExt, Snafu};
use std::collections::BTreeMap;
use delorean_line_parser::{FieldValue, ParsedLine};
use delorean_table::{DeloreanTableWriter, DeloreanTableWriterSource, Error as TableError, Packer};
use delorean_table::packers::Packers;
use delorean_table::{DeloreanTableWriter, DeloreanTableWriterSource, Error as TableError};
use delorean_table_schema::{DataType, Schema, SchemaBuilder};
#[derive(Debug, Clone)]
@ -348,14 +350,17 @@ impl<'a> MeasurementWriter<'a> {
///
/// TODO: improve performance by reusing the the Vec<Packer> rather
/// than always making new ones
fn pack_lines<'a>(schema: &Schema, lines: &[ParsedLine<'a>]) -> Vec<Packer> {
fn pack_lines<'a>(schema: &Schema, lines: &[ParsedLine<'a>]) -> Vec<Packers> {
let col_defs = schema.get_col_defs();
let mut packers: Vec<_> = col_defs
let mut packers: Vec<Packers> = col_defs
.iter()
.enumerate()
.map(|(idx, col_def)| {
debug!(" Column definition [{}] = {:?}", idx, col_def);
Packer::with_capacity(col_def.data_type, lines.len())
// Initialise a Packer<T> for the matching data type wrapped in a
// Packers enum variant to allow it to live in a vector.
Packers::from(col_def.data_type)
})
.collect();
@ -393,7 +398,9 @@ fn pack_lines<'a>(schema: &Schema, lines: &[ParsedLine<'a>]) -> Vec<Packer> {
for (tag_name, tag_value) in tag_set {
let tag_name_str = tag_name.to_string();
if let Some(packer) = packer_map.get_mut(&tag_name_str) {
packer.pack_str(Some(&tag_value.to_string()));
packer
.str_packer_mut()
.push(ByteArray::from(tag_value.to_string().as_str()));
} else {
panic!(
"tag {} seen in input that has no matching column in schema",
@ -407,9 +414,17 @@ fn pack_lines<'a>(schema: &Schema, lines: &[ParsedLine<'a>]) -> Vec<Packer> {
let field_name_str = field_name.to_string();
if let Some(packer) = packer_map.get_mut(&field_name_str) {
match *field_value {
FieldValue::F64(f) => packer.pack_f64(Some(f)),
FieldValue::I64(i) => packer.pack_i64(Some(i)),
FieldValue::String(ref s) => packer.pack_str(Some(&s.to_string())),
FieldValue::F64(f) => {
packer.f64_packer_mut().push(f);
}
FieldValue::I64(i) => {
packer.i64_packer_mut().push(i);
}
FieldValue::String(ref s) => {
packer
.str_packer_mut()
.push(ByteArray::from(s.to_string().as_str()));
}
}
} else {
panic!(
@ -425,7 +440,8 @@ fn pack_lines<'a>(schema: &Schema, lines: &[ParsedLine<'a>]) -> Vec<Packer> {
// to microseconds
let timestamp_micros = line.timestamp.map(|timestamp_nanos| timestamp_nanos / 1000);
packer.pack_i64(timestamp_micros);
// TODO(edd) why would line _not_ have a timestamp??? We should always have them
packer.i64_packer_mut().push_option(timestamp_micros)
} else {
panic!("No {} field present in schema...", timestamp_col_name);
}
@ -434,7 +450,7 @@ fn pack_lines<'a>(schema: &Schema, lines: &[ParsedLine<'a>]) -> Vec<Packer> {
for packer in packer_map.values_mut() {
if packer.len() < starting_len + 1 {
assert_eq!(packer.len(), starting_len, "packer should be unchanged");
packer.pack_none();
packer.push_none();
} else {
assert_eq!(
starting_len + 1,
@ -446,7 +462,7 @@ fn pack_lines<'a>(schema: &Schema, lines: &[ParsedLine<'a>]) -> Vec<Packer> {
}
}
// Should have added one value to all packers. Asser that invariant here
// Should have added one value to all packers. Assert that invariant here
assert!(
packer_map.values().all(|x| x.len() == starting_len + 1),
"Should have added 1 row to all packers"
@ -461,6 +477,7 @@ mod delorean_ingest_tests {
use delorean_table::{DeloreanTableWriter, DeloreanTableWriterSource, Error as TableError};
use delorean_table_schema::ColumnDefinition;
use delorean_test_helpers::approximately_equal;
use parquet::data_type::ByteArray;
use std::sync::{Arc, Mutex};
@ -505,7 +522,7 @@ mod delorean_ingest_tests {
}
impl DeloreanTableWriter for NoOpWriter {
fn write_batch(&mut self, packers: &[Packer]) -> Result<(), TableError> {
fn write_batch(&mut self, packers: &[Packers]) -> Result<(), TableError> {
if packers.is_empty() {
log_event(
&self.log,
@ -923,21 +940,6 @@ mod delorean_ingest_tests {
Ok(())
}
// gets the packer's value as a string.
fn get_string_val(packer: &Packer, idx: usize) -> &str {
packer.as_string_packer().values[idx].as_utf8().unwrap()
}
// gets the packer's value as an int
fn get_int_val(packer: &Packer, idx: usize) -> i64 {
packer.as_int_packer().values[idx]
}
// gets the packer's value as an int
fn get_float_val(packer: &Packer, idx: usize) -> f64 {
packer.as_float_packer().values[idx]
}
#[test]
fn pack_data_value() -> Result<(), Error> {
let mut sampler = parse_data_into_sampler()?;
@ -954,81 +956,80 @@ mod delorean_ingest_tests {
}
// Tag values
let tag_packer = &packers[0];
assert_eq!(get_string_val(tag_packer, 0), "A");
assert_eq!(get_string_val(tag_packer, 1), "B");
let tag_packer = packers[0].str_packer();
assert_eq!(tag_packer.get(0).unwrap(), &ByteArray::from("A"));
assert_eq!(tag_packer.get(1).unwrap(), &ByteArray::from("B"));
assert!(packers[0].is_null(2));
assert_eq!(get_string_val(tag_packer, 3), "C");
assert_eq!(get_string_val(tag_packer, 4), "D");
assert_eq!(get_string_val(tag_packer, 5), "E");
assert_eq!(get_string_val(tag_packer, 6), "F");
assert_eq!(get_string_val(tag_packer, 7), "G");
assert_eq!(tag_packer.get(3).unwrap(), &ByteArray::from("C"));
assert_eq!(tag_packer.get(4).unwrap(), &ByteArray::from("D"));
assert_eq!(tag_packer.get(5).unwrap(), &ByteArray::from("E"));
assert_eq!(tag_packer.get(6).unwrap(), &ByteArray::from("F"));
assert_eq!(tag_packer.get(7).unwrap(), &ByteArray::from("G"));
// int_field values
let int_field_packer = &packers[1];
assert_eq!(get_int_val(int_field_packer, 0), 64);
assert_eq!(get_int_val(int_field_packer, 1), 65);
assert_eq!(get_int_val(int_field_packer, 2), 66);
let int_field_packer = &packers[1].i64_packer();
assert_eq!(int_field_packer.get(0).unwrap(), &64);
assert_eq!(int_field_packer.get(1).unwrap(), &65);
assert_eq!(int_field_packer.get(2).unwrap(), &66);
assert!(int_field_packer.is_null(3));
assert_eq!(get_int_val(int_field_packer, 4), 67);
assert_eq!(get_int_val(int_field_packer, 5), 68);
assert_eq!(get_int_val(int_field_packer, 6), 69);
assert_eq!(get_int_val(int_field_packer, 7), 70);
assert_eq!(int_field_packer.get(4).unwrap(), &67);
assert_eq!(int_field_packer.get(5).unwrap(), &68);
assert_eq!(int_field_packer.get(6).unwrap(), &69);
assert_eq!(int_field_packer.get(7).unwrap(), &70);
// float_field values
let float_field_packer = &packers[2];
let float_field_packer = &packers[2].f64_packer();
assert!(approximately_equal(
get_float_val(float_field_packer, 0),
*float_field_packer.get(0).unwrap(),
100.0
));
assert!(approximately_equal(
get_float_val(float_field_packer, 1),
*float_field_packer.get(1).unwrap(),
101.0
));
assert!(approximately_equal(
get_float_val(float_field_packer, 2),
*float_field_packer.get(2).unwrap(),
102.0
));
assert!(approximately_equal(
get_float_val(float_field_packer, 3),
*float_field_packer.get(3).unwrap(),
103.0
));
assert!(float_field_packer.is_null(4));
assert!(approximately_equal(
get_float_val(float_field_packer, 5),
*float_field_packer.get(5).unwrap(),
104.0
));
assert!(approximately_equal(
get_float_val(float_field_packer, 6),
*float_field_packer.get(6).unwrap(),
105.0
));
assert!(approximately_equal(
get_float_val(float_field_packer, 7),
*float_field_packer.get(7).unwrap(),
106.0
));
// str_field values
let str_field_packer = &packers[3];
assert_eq!(get_string_val(str_field_packer, 0), "foo1");
assert_eq!(get_string_val(str_field_packer, 1), "foo2");
assert_eq!(get_string_val(str_field_packer, 2), "foo3");
assert_eq!(get_string_val(str_field_packer, 3), "foo4");
assert_eq!(get_string_val(str_field_packer, 4), "foo5");
let str_field_packer = &packers[3].str_packer();
assert_eq!(str_field_packer.get(0).unwrap(), &ByteArray::from("foo1"));
assert_eq!(str_field_packer.get(1).unwrap(), &ByteArray::from("foo2"));
assert_eq!(str_field_packer.get(2).unwrap(), &ByteArray::from("foo3"));
assert_eq!(str_field_packer.get(3).unwrap(), &ByteArray::from("foo4"));
assert_eq!(str_field_packer.get(4).unwrap(), &ByteArray::from("foo5"));
assert!(str_field_packer.is_null(5));
assert_eq!(get_string_val(str_field_packer, 6), "foo6");
assert_eq!(get_string_val(str_field_packer, 7), "foo7");
assert_eq!(str_field_packer.get(6).unwrap(), &ByteArray::from("foo6"));
assert_eq!(str_field_packer.get(7).unwrap(), &ByteArray::from("foo7"));
// timestamp values (NB The timestamps are truncated to Microseconds)
let timestamp_packer = &packers[4];
assert_eq!(get_int_val(timestamp_packer, 0), 1_590_488_773_254_420);
assert_eq!(get_int_val(timestamp_packer, 1), 1_590_488_773_254_430);
assert_eq!(get_int_val(timestamp_packer, 2), 1_590_488_773_254_440);
assert_eq!(get_int_val(timestamp_packer, 3), 1_590_488_773_254_450);
assert_eq!(get_int_val(timestamp_packer, 4), 1_590_488_773_254_460);
assert_eq!(get_int_val(timestamp_packer, 5), 1_590_488_773_254_470);
// timestamp values
let timestamp_packer = &packers[4].i64_packer();
assert_eq!(timestamp_packer.get(0).unwrap(), &1_590_488_773_254_420);
assert_eq!(timestamp_packer.get(1).unwrap(), &1_590_488_773_254_430);
assert_eq!(timestamp_packer.get(2).unwrap(), &1_590_488_773_254_440);
assert_eq!(timestamp_packer.get(3).unwrap(), &1_590_488_773_254_450);
assert_eq!(timestamp_packer.get(4).unwrap(), &1_590_488_773_254_460);
assert_eq!(timestamp_packer.get(5).unwrap(), &1_590_488_773_254_470);
assert!(timestamp_packer.is_null(6));
assert_eq!(get_int_val(timestamp_packer, 7), 1_590_488_773_254_480);
assert_eq!(timestamp_packer.get(7).unwrap(), &1_590_488_773_254_480);
Ok(())
}

View File

@ -19,7 +19,7 @@ use parquet::{
use snafu::{ResultExt, Snafu};
use crate::metadata::parquet_schema_as_string;
use delorean_table::{packers::Packer, DeloreanTableWriter, Error as TableError};
use delorean_table::{DeloreanTableWriter, Error as TableError, Packers};
#[derive(Debug, Snafu)]
pub enum Error {
@ -62,23 +62,24 @@ where
/// # use delorean_table_schema;
/// # use delorean_table_schema::DataType;
/// # use delorean_table::DeloreanTableWriter;
/// # use delorean_table::packers::Packer;
/// # use delorean_table::packers::{Packer, Packers};
/// # use delorean_parquet::writer::DeloreanParquetTableWriter;
/// # use parquet::data_type::ByteArray;
///
/// let schema = delorean_table_schema::SchemaBuilder::new("measurement_name")
/// .tag("tag1")
/// .field("field1", delorean_table_schema::DataType::Integer)
/// .build();
///
/// let mut packers = vec![
/// Packer::new(DataType::String), // 0: tag1
/// Packer::new(DataType::Integer), // 1: field1
/// Packer::new(DataType::Integer), // 2: timestamp
/// let mut packers: Vec<Packers> = vec![
/// Packers::String(Packer::new()), // 0: tag1
/// Packers::Integer(Packer::new()), // 1: field1
/// Packers::Integer(Packer::new()), // 2: timestamp
/// ];
///
/// packers[0].pack_str(Some("tag1")); // tag1 val
/// packers[1].pack_i64(Some(100)); // field1 val
/// packers[2].pack_none(); // no timestamp
/// packers[0].str_packer_mut().push(ByteArray::from("tag1")); // tag1 val
/// packers[1].i64_packer_mut().push(100); // field1 val
/// packers[2].push_none(); // no timestamp
///
/// // Write to '/tmp/example.parquet'
/// let mut output_file_name = std::env::temp_dir();
@ -126,7 +127,7 @@ where
/// column chunk
///
/// TODO: better control of column chunks
fn write_batch(&mut self, packers: &[Packer]) -> Result<(), TableError> {
fn write_batch(&mut self, packers: &[Packers]) -> Result<(), TableError> {
// now write out the data
let mut row_group_writer =
self.file_writer
@ -154,15 +155,18 @@ where
});
}
};
// TODO(edd) This seems super awkward and not the right way to do it...
// We know we have a direct mapping between a col_writer (ColumnWriter)
// type and a Packers variant. We also know that we do exactly the same
// work for each variant (we just dispatch to the writ_batch method)
// on the column write.
//
// I think this match could be so much shorter but not sure how yet.
match col_writer {
BoolColumnWriter(ref mut w) => {
let bool_packer = packer.as_bool_packer();
let p = packer.bool_packer();
let n = w
.write_batch(
&bool_packer.values,
Some(&bool_packer.def_levels),
Some(&bool_packer.rep_levels),
)
.write_batch(p.values(), Some(p.def_levels()), Some(p.rep_levels()))
.context(ParquetLibraryError {
message: String::from("Can't write_batch with bool values"),
})?;
@ -170,13 +174,9 @@ where
}
Int32ColumnWriter(_) => unreachable!("ParquetWriter does not support INT32 data"),
Int64ColumnWriter(ref mut w) => {
let int_packer = packer.as_int_packer();
let p = packer.i64_packer();
let n = w
.write_batch(
&int_packer.values,
Some(&int_packer.def_levels),
Some(&int_packer.rep_levels),
)
.write_batch(p.values(), Some(p.def_levels()), Some(p.rep_levels()))
.context(ParquetLibraryError {
message: String::from("Can't write_batch with int64 values"),
})?;
@ -187,26 +187,18 @@ where
unreachable!("ParquetWriter does not support FLOAT (32-bit float) data")
}
DoubleColumnWriter(ref mut w) => {
let float_packer = packer.as_float_packer();
let p = packer.f64_packer();
let n = w
.write_batch(
&float_packer.values,
Some(&float_packer.def_levels),
Some(&float_packer.rep_levels),
)
.write_batch(p.values(), Some(p.def_levels()), Some(p.rep_levels()))
.context(ParquetLibraryError {
message: String::from("Can't write_batch with f64 values"),
})?;
debug!("Wrote {} rows of f64 data", n);
}
ByteArrayColumnWriter(ref mut w) => {
let string_packer = packer.as_string_packer();
let p = packer.str_packer();
let n = w
.write_batch(
&string_packer.values,
Some(&string_packer.def_levels),
Some(&string_packer.rep_levels),
)
.write_batch(p.values(), Some(p.def_levels()), Some(p.rep_levels()))
.context(ParquetLibraryError {
message: String::from("Can't write_batch with byte array values"),
})?;

View File

@ -1,7 +1,7 @@
use delorean_parquet::writer::DeloreanParquetTableWriter;
use delorean_table::{packers::Packer, DeloreanTableWriter, Packers};
use delorean_table::{packers::Packer, DeloreanTableWriter};
use delorean_table_schema::DataType;
use parquet::data_type::ByteArray;
use std::fs;
#[test]
@ -15,13 +15,13 @@ fn test_write_parquet_data() {
.build();
assert_eq!(schema.get_col_defs().len(), 6);
let mut packers = vec![
Packer::new(DataType::String), // 0: tag1
Packer::new(DataType::String), // 1: string_field
Packer::new(DataType::Float), // 2: float_field
Packer::new(DataType::Integer), // 3: int_field
Packer::new(DataType::Boolean), // 4: bool_field
Packer::new(DataType::Integer), // 5: timstamp
let mut packers: Vec<Packers> = vec![
Packers::String(Packer::new()), // 0: tag1
Packers::String(Packer::new()), // 1: string_field
Packers::Float(Packer::new()), // 2: float_field
Packers::Integer(Packer::new()), // 3: int_field
Packers::Boolean(Packer::new()), // 4: bool_field
Packers::Integer(Packer::new()), // 5: timestamp
];
// create this data:
@ -29,26 +29,34 @@ fn test_write_parquet_data() {
// row 0: "tag1_val0", "str_val0", 1.0, 100, true, 900000000000
// row 1: null, null , null, null, null, null
// row 2: "tag1_val2", "str_val2", 2.0, 200, false, 9100000000000
packers[0].pack_str(Some("tag1_val0"));
packers[1].pack_str(Some("str_val0"));
packers[2].pack_f64(Some(1.0));
packers[3].pack_i64(Some(100));
packers[4].pack_bool(Some(true));
packers[5].pack_i64(Some(900000000000));
packers[0]
.str_packer_mut()
.push(ByteArray::from("tag1_val0"));
packers[1]
.str_packer_mut()
.push(ByteArray::from("str_val0"));
packers[2].f64_packer_mut().push(1.0);
packers[3].i64_packer_mut().push(100);
packers[4].bool_packer_mut().push(true);
packers[5].i64_packer_mut().push(900000000000);
packers[0].pack_none();
packers[1].pack_none();
packers[2].pack_none();
packers[3].pack_none();
packers[4].pack_none();
packers[5].pack_none();
packers[0].push_none();
packers[1].push_none();
packers[2].push_none();
packers[3].push_none();
packers[4].push_none();
packers[5].push_none();
packers[0].pack_str(Some("tag1_val2"));
packers[1].pack_str(Some("str_val2"));
packers[2].pack_f64(Some(2.0));
packers[3].pack_i64(Some(200));
packers[4].pack_bool(Some(true));
packers[5].pack_i64(Some(910000000000));
packers[0]
.str_packer_mut()
.push(ByteArray::from("tag1_val2"));
packers[1]
.str_packer_mut()
.push(ByteArray::from("str_val2"));
packers[2].f64_packer_mut().push(2.0);
packers[3].i64_packer_mut().push(200);
packers[4].bool_packer_mut().push(true);
packers[5].i64_packer_mut().push(910000000000);
// write the data out to the parquet file
let output_path = delorean_test_helpers::tempfile::Builder::new()

View File

@ -3,7 +3,7 @@ pub mod packers;
use snafu::Snafu;
use delorean_table_schema::Schema;
pub use packers::Packer;
pub use packers::{Packer, Packers};
#[derive(Snafu, Debug)]
pub enum Error {
@ -23,7 +23,7 @@ pub enum Error {
/// Something that knows how to write a set of columns somewhere
pub trait DeloreanTableWriter {
/// Writes a batch of packed data to the underlying output
fn write_batch(&mut self, packers: &[Packer]) -> Result<(), Error>;
fn write_batch(&mut self, packers: &[Packers]) -> Result<(), Error>;
/// Closes the underlying writer and finalizes the work to write the file.
fn close(&mut self) -> Result<(), Error>;

View File

@ -5,492 +5,288 @@
// Note the maintainability of this code is not likely high (it came
// from the copy pasta factory) but the plan is to replace it
// soon... We'll see how long that actually takes...
use parquet::data_type::ByteArray;
// NOTE: See https://blog.twitter.com/engineering/en_us/a/2013/dremel-made-simple-with-parquet.html
// for an explination of nesting levels
// for an explanation of nesting levels
/// Packs data for a column of strings
#[derive(Debug)]
pub struct StringPacker {
pub values: Vec<ByteArray>,
pub def_levels: Vec<i16>,
pub rep_levels: Vec<i16>,
}
impl StringPacker {
fn new() -> StringPacker {
StringPacker {
values: Vec::new(),
def_levels: Vec::new(),
rep_levels: Vec::new(),
}
}
fn with_capacity(capacity: usize) -> StringPacker {
StringPacker {
values: Vec::with_capacity(capacity),
def_levels: Vec::with_capacity(capacity),
rep_levels: Vec::with_capacity(capacity),
}
}
fn len(&self) -> usize {
self.values.len()
}
// Adds (copies) the data to be encoded
fn pack(&mut self, s: Option<&str>) {
match s {
Some(s) => {
self.values.push(ByteArray::from(s));
self.def_levels.push(1);
self.rep_levels.push(1);
}
None => {
self.values.push(ByteArray::from(""));
self.def_levels.push(0);
self.rep_levels.push(1);
}
}
}
pub enum Packers {
Float(Packer<f64>),
Integer(Packer<i64>),
String(Packer<ByteArray>),
Boolean(Packer<bool>),
}
// Packs data for a column of floats
#[derive(Debug)]
pub struct FloatPacker {
pub values: Vec<f64>,
pub def_levels: Vec<i16>,
pub rep_levels: Vec<i16>,
}
impl FloatPacker {
fn new() -> FloatPacker {
FloatPacker {
values: Vec::new(),
def_levels: Vec::new(),
rep_levels: Vec::new(),
}
}
fn with_capacity(capacity: usize) -> FloatPacker {
FloatPacker {
values: Vec::with_capacity(capacity),
def_levels: Vec::with_capacity(capacity),
rep_levels: Vec::with_capacity(capacity),
}
}
fn len(&self) -> usize {
self.values.len()
}
// Adds (copies) the data to be encoded
fn pack(&mut self, f: Option<f64>) {
match f {
Some(f) => {
self.values.push(f);
self.def_levels.push(1);
self.rep_levels.push(1);
}
None => {
self.values.push(std::f64::NAN); // doesn't matter as def level == 0
self.def_levels.push(0);
self.rep_levels.push(1);
}
}
}
}
// Packs data for a column of ints
#[derive(Debug)]
pub struct IntPacker {
pub values: Vec<i64>,
pub def_levels: Vec<i16>,
pub rep_levels: Vec<i16>,
}
impl IntPacker {
fn new() -> IntPacker {
IntPacker {
values: Vec::new(),
def_levels: Vec::new(),
rep_levels: Vec::new(),
}
}
fn with_capacity(capacity: usize) -> IntPacker {
IntPacker {
values: Vec::with_capacity(capacity),
def_levels: Vec::with_capacity(capacity),
rep_levels: Vec::with_capacity(capacity),
}
}
fn len(&self) -> usize {
self.values.len()
}
// Adds (copies) the data to be encoded
fn pack(&mut self, i: Option<i64>) {
match i {
Some(i) => {
self.values.push(i);
self.def_levels.push(1);
self.rep_levels.push(1);
}
None => {
self.values.push(0); // doesn't matter as def level == 0
self.def_levels.push(0);
self.rep_levels.push(1);
}
}
}
}
// Packs data for a column of bool
#[derive(Debug)]
pub struct BoolPacker {
pub values: Vec<bool>,
pub def_levels: Vec<i16>,
pub rep_levels: Vec<i16>,
}
impl BoolPacker {
fn new() -> BoolPacker {
BoolPacker {
values: Vec::new(),
def_levels: Vec::new(),
rep_levels: Vec::new(),
}
}
fn with_capacity(capacity: usize) -> BoolPacker {
BoolPacker {
values: Vec::with_capacity(capacity),
def_levels: Vec::with_capacity(capacity),
rep_levels: Vec::with_capacity(capacity),
}
}
fn len(&self) -> usize {
self.values.len()
}
// Adds (copies) the data to be encoded
fn pack(&mut self, b: Option<bool>) {
match b {
Some(b) => {
self.values.push(b);
self.def_levels.push(1);
self.rep_levels.push(1);
}
None => {
self.values.push(false); // doesn't matter as def level == 0
self.def_levels.push(0);
self.rep_levels.push(1);
}
}
}
}
#[derive(Debug)]
pub enum Packer {
StringPackerType(StringPacker),
FloatPackerType(FloatPacker),
IntPackerType(IntPacker),
BoolPackerType(BoolPacker),
}
impl Packer {
/// Create a new packer that can pack values of the specified protocol type
pub fn new(t: delorean_table_schema::DataType) -> Packer {
match t {
delorean_table_schema::DataType::String => {
Packer::StringPackerType(StringPacker::new())
}
delorean_table_schema::DataType::Float => Packer::FloatPackerType(FloatPacker::new()),
delorean_table_schema::DataType::Integer => Packer::IntPackerType(IntPacker::new()),
delorean_table_schema::DataType::Boolean => Packer::BoolPackerType(BoolPacker::new()),
delorean_table_schema::DataType::Timestamp => Packer::IntPackerType(IntPacker::new()),
}
}
/// Create a new packer that can pack values of the specified type, with the specified capacity
pub fn with_capacity(t: delorean_table_schema::DataType, capacity: usize) -> Packer {
match t {
delorean_table_schema::DataType::String => {
Packer::StringPackerType(StringPacker::with_capacity(capacity))
}
delorean_table_schema::DataType::Float => {
Packer::FloatPackerType(FloatPacker::with_capacity(capacity))
}
delorean_table_schema::DataType::Integer => {
Packer::IntPackerType(IntPacker::with_capacity(capacity))
}
delorean_table_schema::DataType::Boolean => {
Packer::BoolPackerType(BoolPacker::with_capacity(capacity))
}
delorean_table_schema::DataType::Timestamp => {
Packer::IntPackerType(IntPacker::with_capacity(capacity))
}
impl Packers {
pub fn is_empty(&self) -> bool {
match self {
Self::Float(p) => p.is_empty(),
Self::Integer(p) => p.is_empty(),
Self::String(p) => p.is_empty(),
Self::Boolean(p) => p.is_empty(),
}
}
pub fn len(&self) -> usize {
match self {
Packer::StringPackerType(string_packer) => string_packer.len(),
Packer::FloatPackerType(float_packer) => float_packer.len(),
Packer::IntPackerType(int_packer) => int_packer.len(),
Packer::BoolPackerType(bool_packer) => bool_packer.len(),
Self::Float(p) => p.len(),
Self::Integer(p) => p.len(),
Self::String(p) => p.len(),
Self::Boolean(p) => p.len(),
}
}
pub fn push_none(&mut self) {
match self {
Self::Float(p) => p.push_option(None),
Self::Integer(p) => p.push_option(None),
Self::String(p) => p.push_option(None),
Self::Boolean(p) => p.push_option(None),
}
}
pub fn is_null(&self, index: usize) -> bool {
match self {
Self::Float(p) => match p.def_levels.get(index) {
Some(x) => *x == 0,
None => true,
},
Self::Integer(p) => match p.def_levels.get(index) {
Some(x) => *x == 0,
None => true,
},
Self::String(p) => match p.def_levels.get(index) {
Some(x) => *x == 0,
None => true,
},
Self::Boolean(p) => match p.def_levels.get(index) {
Some(x) => *x == 0,
None => true,
},
}
}
// TODO(edd): YUK! Seem unable to avoid runtime checking of these packer
// types. Need to figure this out.
pub fn f64_packer_mut(&mut self) -> &mut Packer<f64> {
if let Self::Float(p) = self {
p
} else {
panic!("packer is not a Float");
}
}
pub fn f64_packer(&self) -> &Packer<f64> {
if let Self::Float(p) = self {
p
} else {
panic!("packer is not a Float");
}
}
pub fn i64_packer_mut(&mut self) -> &mut Packer<i64> {
if let Self::Integer(p) = self {
p
} else {
panic!("packer is not an Integer");
}
}
pub fn i64_packer(&self) -> &Packer<i64> {
if let Self::Integer(p) = self {
p
} else {
panic!("packer is not an Integer");
}
}
pub fn str_packer_mut(&mut self) -> &mut Packer<ByteArray> {
if let Self::String(p) = self {
p
} else {
panic!("packer is not a String");
}
}
pub fn str_packer(&self) -> &Packer<ByteArray> {
if let Self::String(p) = self {
p
} else {
panic!("packer is not a String");
}
}
pub fn bool_packer_mut(&mut self) -> &mut Packer<bool> {
if let Self::Boolean(p) = self {
p
} else {
panic!("packer is not a Boolean");
}
}
pub fn bool_packer(&self) -> &Packer<bool> {
if let Self::Boolean(p) = self {
p
} else {
panic!("packer is not a Boolean");
}
}
}
impl std::convert::From<delorean_table_schema::DataType> for Packers {
fn from(t: delorean_table_schema::DataType) -> Self {
match t {
delorean_table_schema::DataType::Float => Packers::Float(Packer::<f64>::new()),
delorean_table_schema::DataType::Integer => Packers::Integer(Packer::<i64>::new()),
delorean_table_schema::DataType::String => Packers::String(Packer::<ByteArray>::new()),
delorean_table_schema::DataType::Boolean => Packers::Boolean(Packer::<bool>::new()),
delorean_table_schema::DataType::Timestamp => Packers::Integer(Packer::<i64>::new()),
}
}
}
#[derive(Debug, Default)]
pub struct Packer<T: PackerDefault> {
values: Vec<T>,
def_levels: Vec<i16>,
rep_levels: Vec<i16>,
}
impl<T: PackerDefault> Packer<T> {
pub fn new() -> Self {
Self {
values: Vec::new(),
def_levels: Vec::new(),
rep_levels: Vec::new(),
}
}
/// Create a new packer with the specified capacity
pub fn with_capacity(capacity: usize) -> Self {
Self {
values: Vec::with_capacity(capacity),
def_levels: Vec::with_capacity(capacity),
rep_levels: Vec::with_capacity(capacity),
}
}
pub fn len(&self) -> usize {
self.values.len()
}
pub fn is_empty(&self) -> bool {
// Clippy made me put this
self.len() == 0
self.values.is_empty()
}
pub fn pack_str(&mut self, s: Option<&str>) {
if let Packer::StringPackerType(string_packer) = self {
string_packer.pack(s)
} else {
panic!("Packer {:?} does not know how to pack strings", self);
pub fn get(&self, index: usize) -> Option<&T> {
self.values.get(index)
}
// TODO(edd): I don't like these getters. They're only needed so we can
// write the data into a parquet writer. We should have a method on Packer
// that accepts some implementation of a trait that a parquet writer satisfies
// and then pass the data through in here.
pub fn values(&self) -> &Vec<T> {
&self.values
}
pub fn def_levels(&self) -> &Vec<i16> {
&self.def_levels
}
pub fn rep_levels(&self) -> &Vec<i16> {
&self.rep_levels
}
pub fn push_option(&mut self, value: Option<T>) {
match value {
Some(v) => self.push(v),
None => {
self.values.push(T::default()); // doesn't matter as def level == 0
self.def_levels.push(0);
self.rep_levels.push(1);
}
}
}
pub fn pack_f64(&mut self, f: Option<f64>) {
if let Packer::FloatPackerType(float_packer) = self {
float_packer.pack(f)
} else {
panic!("Packer {:?} does not know how to pack floats", self);
}
pub fn push(&mut self, value: T) {
self.values.push(value);
self.def_levels.push(1);
self.rep_levels.push(1);
}
pub fn pack_i64(&mut self, i: Option<i64>) {
if let Packer::IntPackerType(int_packer) = self {
int_packer.pack(i)
} else {
panic!("Packer {:?} does not know how to pack ints", self);
/// Return true if the row for index is null. Returns true if there is no
/// row for index.
pub fn is_null(&self, index: usize) -> bool {
match self.def_levels.get(index) {
Some(x) => *x == 0,
None => true,
}
}
}
pub fn pack_bool(&mut self, b: Option<bool>) {
if let Packer::BoolPackerType(bool_packer) = self {
bool_packer.pack(b)
} else {
panic!("Packer {:?} does not know how to pack bools", self);
}
pub trait PackerDefault {
fn default() -> Self;
}
impl PackerDefault for f64 {
fn default() -> Self {
0.0
}
}
pub fn as_string_packer(&self) -> &StringPacker {
if let Packer::StringPackerType(string_packer) = self {
string_packer
} else {
panic!("Packer {:?} is not a string packer", self);
}
impl PackerDefault for i64 {
fn default() -> Self {
0
}
}
pub fn as_float_packer(&self) -> &FloatPacker {
if let Packer::FloatPackerType(float_packer) = self {
float_packer
} else {
panic!("Packer {:?} is not a float packer", self);
}
impl PackerDefault for ByteArray {
fn default() -> Self {
ByteArray::from("")
}
}
pub fn as_int_packer(&self) -> &IntPacker {
if let Packer::IntPackerType(int_packer) = self {
int_packer
} else {
panic!("Packer {:?} is not an int packer", self);
}
}
pub fn as_bool_packer(&self) -> &BoolPacker {
if let Packer::BoolPackerType(bool_packer) = self {
bool_packer
} else {
panic!("Packer {:?} is not an bool packer", self);
}
}
pub fn pack_none(&mut self) {
match self {
Packer::StringPackerType(string_packer) => string_packer.pack(None),
Packer::FloatPackerType(float_packer) => float_packer.pack(None),
Packer::IntPackerType(int_packer) => int_packer.pack(None),
Packer::BoolPackerType(bool_packer) => bool_packer.pack(None),
}
}
/// Return true if the idx'th row is null
pub fn is_null(&self, idx: usize) -> bool {
match self {
Packer::StringPackerType(string_packer) => string_packer.def_levels[idx] == 0,
Packer::FloatPackerType(float_packer) => float_packer.def_levels[idx] == 0,
Packer::IntPackerType(int_packer) => int_packer.def_levels[idx] == 0,
Packer::BoolPackerType(bool_packer) => bool_packer.def_levels[idx] == 0,
}
impl PackerDefault for bool {
fn default() -> Self {
false
}
}
#[cfg(test)]
mod test {
use super::*;
use delorean_table_schema::DataType;
use delorean_test_helpers::approximately_equal;
#[test]
fn with_capacity() {
let string_packer = StringPacker::with_capacity(42);
assert_eq!(string_packer.values.capacity(), 42);
assert_eq!(string_packer.def_levels.capacity(), 42);
assert_eq!(string_packer.rep_levels.capacity(), 42);
let float_packer = FloatPacker::with_capacity(43);
assert_eq!(float_packer.values.capacity(), 43);
assert_eq!(float_packer.def_levels.capacity(), 43);
assert_eq!(float_packer.rep_levels.capacity(), 43);
let int_packer = IntPacker::with_capacity(44);
assert_eq!(int_packer.values.capacity(), 44);
assert_eq!(int_packer.def_levels.capacity(), 44);
assert_eq!(int_packer.rep_levels.capacity(), 44);
let bool_packer = BoolPacker::with_capacity(45);
assert_eq!(bool_packer.values.capacity(), 45);
assert_eq!(bool_packer.def_levels.capacity(), 45);
assert_eq!(bool_packer.rep_levels.capacity(), 45);
let packer: Packer<bool> = Packer::with_capacity(42);
assert_eq!(packer.values.capacity(), 42);
assert_eq!(packer.def_levels.capacity(), 42);
assert_eq!(packer.rep_levels.capacity(), 42);
}
#[test]
fn string_packer() {
let mut packer = Packer::new(DataType::String);
assert_eq!(packer.len(), 0);
packer.pack_str(Some("foo"));
packer.pack_str(Some(""));
packer.pack_str(None);
packer.pack_none();
packer.pack_str(Some("bar"));
assert_eq!(packer.len(), 5);
assert_eq!(
packer.as_string_packer().values[0].as_utf8().unwrap(),
"foo"
);
assert_eq!(packer.as_string_packer().values[1].as_utf8().unwrap(), "");
assert_eq!(
packer.as_string_packer().values[4].as_utf8().unwrap(),
"bar"
);
assert_eq!(packer.as_string_packer().def_levels, vec![1, 1, 0, 0, 1]);
assert_eq!(packer.as_string_packer().rep_levels, vec![1, 1, 1, 1, 1]);
fn is_null() {
let mut packer: Packer<f64> = Packer::new();
packer.push(22.3);
packer.push_option(Some(100.3));
packer.push_option(None);
packer.push(33.3);
assert!(!packer.is_null(1));
assert!(packer.is_null(2));
assert_eq!(packer.is_null(0), false);
assert_eq!(packer.is_null(1), false);
assert_eq!(packer.is_null(2), true);
assert_eq!(packer.is_null(3), false);
assert_eq!(packer.is_null(4), true); // out of bounds
}
#[test]
#[should_panic]
fn string_packer_pack_float() {
let mut packer = Packer::new(DataType::String);
packer.pack_f64(Some(5.3));
}
fn packers() {
let mut packers: Vec<Packers> = Vec::new();
packers.push(Packers::Float(Packer::new()));
packers.push(Packers::Integer(Packer::new()));
packers.push(Packers::Boolean(Packer::new()));
#[test]
fn float_packer() {
let mut packer = Packer::new(DataType::Float);
assert_eq!(packer.len(), 0);
packer.pack_f64(Some(1.23));
packer.pack_f64(None);
packer.pack_none();
packer.pack_f64(Some(4.56));
assert_eq!(packer.len(), 4);
assert!(approximately_equal(
packer.as_float_packer().values[0],
1.23
));
assert!(approximately_equal(
packer.as_float_packer().values[3],
4.56
));
assert_eq!(packer.as_float_packer().def_levels, vec![1, 0, 0, 1]);
assert_eq!(packer.as_float_packer().rep_levels, vec![1, 1, 1, 1]);
assert!(!packer.is_null(0));
assert!(packer.is_null(1));
}
#[test]
#[should_panic]
fn float_packer_pack_string() {
let mut packer = Packer::new(DataType::Float);
packer.pack_str(Some("foo"));
}
fn test_int_packer(mut packer: Packer) {
assert_eq!(packer.len(), 0);
packer.pack_i64(Some(1));
packer.pack_i64(None);
packer.pack_none();
packer.pack_i64(Some(-1));
assert_eq!(packer.len(), 4);
assert_eq!(packer.as_int_packer().values[0], 1);
assert_eq!(packer.as_int_packer().values[3], -1);
assert_eq!(packer.as_int_packer().def_levels, vec![1, 0, 0, 1]);
assert_eq!(packer.as_int_packer().rep_levels, vec![1, 1, 1, 1]);
assert!(!packer.is_null(0));
assert!(packer.is_null(1));
}
#[test]
fn int_packer() {
let packer = Packer::new(DataType::Integer);
test_int_packer(packer);
}
#[test]
#[should_panic]
fn int_packer_pack_string() {
let mut packer = Packer::new(DataType::Integer);
packer.pack_str(Some("foo"));
}
#[test]
fn bool_packer() {
let mut packer = Packer::new(DataType::Boolean);
assert_eq!(packer.len(), 0);
packer.pack_bool(Some(true));
packer.pack_bool(Some(false));
packer.pack_bool(None);
packer.pack_none();
packer.pack_bool(Some(true));
assert_eq!(packer.len(), 5);
assert_eq!(packer.as_bool_packer().values[0], true);
assert_eq!(packer.as_bool_packer().values[1], false);
assert_eq!(packer.as_bool_packer().values[4], true);
assert_eq!(packer.as_bool_packer().def_levels, vec![1, 1, 0, 0, 1]);
assert_eq!(packer.as_bool_packer().rep_levels, vec![1, 1, 1, 1, 1]);
assert!(!packer.is_null(1));
assert!(packer.is_null(2));
}
#[test]
#[should_panic]
fn bool_packer_pack_string() {
let mut packer = Packer::new(DataType::Boolean);
packer.pack_str(Some("foo"));
}
#[test]
fn timstamp_packer() {
let packer = Packer::new(DataType::Timestamp);
test_int_packer(packer);
}
#[test]
#[should_panic]
fn timstamp_packer_pack_string() {
let mut packer = Packer::new(DataType::Timestamp);
packer.pack_str(Some("foo"));
packers.get_mut(0).unwrap().f64_packer_mut().push(22.033);
}
}