diff --git a/Cargo.lock b/Cargo.lock index 3137baf846..78a26eed9c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -685,6 +685,7 @@ dependencies = [ "delorean_test_helpers", "env_logger", "log", + "parquet", "snafu", ] diff --git a/delorean_ingest/Cargo.toml b/delorean_ingest/Cargo.toml index cbc02f4980..384962fe02 100644 --- a/delorean_ingest/Cargo.toml +++ b/delorean_ingest/Cargo.toml @@ -7,6 +7,14 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +# I get a build error when I use this one: +# failed to run custom build command for `arrow-flight v0.17.0` +#parquet = "0.17.0" +# this, we are living on the edge and pull directly from the arrow repo. +# https://github.com/apache/arrow/commit/04a1867eeb58f0c515e7ee5a6300a8f61045a6cd +parquet = { git = "https://github.com/apache/arrow.git", rev="04a1867eeb58f0c515e7ee5a6300a8f61045a6cd", version = "1.0.0-SNAPSHOT" } + + snafu = "0.6.2" env_logger = "0.7.1" log = "0.4.8" diff --git a/delorean_ingest/src/lib.rs b/delorean_ingest/src/lib.rs index 102d0711f2..50f8f8861a 100644 --- a/delorean_ingest/src/lib.rs +++ b/delorean_ingest/src/lib.rs @@ -5,12 +5,14 @@ //! Currently supports converting LineProtocol //! TODO move this to delorean/src/ingest/line_protocol.rs? use log::debug; +use parquet::data_type::ByteArray; use snafu::{ResultExt, Snafu}; use std::collections::BTreeMap; use delorean_line_parser::{FieldValue, ParsedLine}; -use delorean_table::{DeloreanTableWriter, DeloreanTableWriterSource, Error as TableError, Packer}; +use delorean_table::packers::Packers; +use delorean_table::{DeloreanTableWriter, DeloreanTableWriterSource, Error as TableError}; use delorean_table_schema::{DataType, Schema, SchemaBuilder}; #[derive(Debug, Clone)] @@ -348,14 +350,17 @@ impl<'a> MeasurementWriter<'a> { /// /// TODO: improve performance by reusing the the Vec rather /// than always making new ones -fn pack_lines<'a>(schema: &Schema, lines: &[ParsedLine<'a>]) -> Vec { +fn pack_lines<'a>(schema: &Schema, lines: &[ParsedLine<'a>]) -> Vec { let col_defs = schema.get_col_defs(); - let mut packers: Vec<_> = col_defs + let mut packers: Vec = col_defs .iter() .enumerate() .map(|(idx, col_def)| { debug!(" Column definition [{}] = {:?}", idx, col_def); - Packer::with_capacity(col_def.data_type, lines.len()) + + // Initialise a Packer for the matching data type wrapped in a + // Packers enum variant to allow it to live in a vector. + Packers::from(col_def.data_type) }) .collect(); @@ -393,7 +398,9 @@ fn pack_lines<'a>(schema: &Schema, lines: &[ParsedLine<'a>]) -> Vec { for (tag_name, tag_value) in tag_set { let tag_name_str = tag_name.to_string(); if let Some(packer) = packer_map.get_mut(&tag_name_str) { - packer.pack_str(Some(&tag_value.to_string())); + packer + .str_packer_mut() + .push(ByteArray::from(tag_value.to_string().as_str())); } else { panic!( "tag {} seen in input that has no matching column in schema", @@ -407,9 +414,17 @@ fn pack_lines<'a>(schema: &Schema, lines: &[ParsedLine<'a>]) -> Vec { let field_name_str = field_name.to_string(); if let Some(packer) = packer_map.get_mut(&field_name_str) { match *field_value { - FieldValue::F64(f) => packer.pack_f64(Some(f)), - FieldValue::I64(i) => packer.pack_i64(Some(i)), - FieldValue::String(ref s) => packer.pack_str(Some(&s.to_string())), + FieldValue::F64(f) => { + packer.f64_packer_mut().push(f); + } + FieldValue::I64(i) => { + packer.i64_packer_mut().push(i); + } + FieldValue::String(ref s) => { + packer + .str_packer_mut() + .push(ByteArray::from(s.to_string().as_str())); + } } } else { panic!( @@ -425,7 +440,8 @@ fn pack_lines<'a>(schema: &Schema, lines: &[ParsedLine<'a>]) -> Vec { // to microseconds let timestamp_micros = line.timestamp.map(|timestamp_nanos| timestamp_nanos / 1000); - packer.pack_i64(timestamp_micros); + // TODO(edd) why would line _not_ have a timestamp??? We should always have them + packer.i64_packer_mut().push_option(timestamp_micros) } else { panic!("No {} field present in schema...", timestamp_col_name); } @@ -434,7 +450,7 @@ fn pack_lines<'a>(schema: &Schema, lines: &[ParsedLine<'a>]) -> Vec { for packer in packer_map.values_mut() { if packer.len() < starting_len + 1 { assert_eq!(packer.len(), starting_len, "packer should be unchanged"); - packer.pack_none(); + packer.push_none(); } else { assert_eq!( starting_len + 1, @@ -446,7 +462,7 @@ fn pack_lines<'a>(schema: &Schema, lines: &[ParsedLine<'a>]) -> Vec { } } - // Should have added one value to all packers. Asser that invariant here + // Should have added one value to all packers. Assert that invariant here assert!( packer_map.values().all(|x| x.len() == starting_len + 1), "Should have added 1 row to all packers" @@ -461,6 +477,7 @@ mod delorean_ingest_tests { use delorean_table::{DeloreanTableWriter, DeloreanTableWriterSource, Error as TableError}; use delorean_table_schema::ColumnDefinition; use delorean_test_helpers::approximately_equal; + use parquet::data_type::ByteArray; use std::sync::{Arc, Mutex}; @@ -505,7 +522,7 @@ mod delorean_ingest_tests { } impl DeloreanTableWriter for NoOpWriter { - fn write_batch(&mut self, packers: &[Packer]) -> Result<(), TableError> { + fn write_batch(&mut self, packers: &[Packers]) -> Result<(), TableError> { if packers.is_empty() { log_event( &self.log, @@ -923,21 +940,6 @@ mod delorean_ingest_tests { Ok(()) } - // gets the packer's value as a string. - fn get_string_val(packer: &Packer, idx: usize) -> &str { - packer.as_string_packer().values[idx].as_utf8().unwrap() - } - - // gets the packer's value as an int - fn get_int_val(packer: &Packer, idx: usize) -> i64 { - packer.as_int_packer().values[idx] - } - - // gets the packer's value as an int - fn get_float_val(packer: &Packer, idx: usize) -> f64 { - packer.as_float_packer().values[idx] - } - #[test] fn pack_data_value() -> Result<(), Error> { let mut sampler = parse_data_into_sampler()?; @@ -954,81 +956,80 @@ mod delorean_ingest_tests { } // Tag values - let tag_packer = &packers[0]; - assert_eq!(get_string_val(tag_packer, 0), "A"); - assert_eq!(get_string_val(tag_packer, 1), "B"); + let tag_packer = packers[0].str_packer(); + assert_eq!(tag_packer.get(0).unwrap(), &ByteArray::from("A")); + assert_eq!(tag_packer.get(1).unwrap(), &ByteArray::from("B")); assert!(packers[0].is_null(2)); - assert_eq!(get_string_val(tag_packer, 3), "C"); - assert_eq!(get_string_val(tag_packer, 4), "D"); - assert_eq!(get_string_val(tag_packer, 5), "E"); - assert_eq!(get_string_val(tag_packer, 6), "F"); - assert_eq!(get_string_val(tag_packer, 7), "G"); + assert_eq!(tag_packer.get(3).unwrap(), &ByteArray::from("C")); + assert_eq!(tag_packer.get(4).unwrap(), &ByteArray::from("D")); + assert_eq!(tag_packer.get(5).unwrap(), &ByteArray::from("E")); + assert_eq!(tag_packer.get(6).unwrap(), &ByteArray::from("F")); + assert_eq!(tag_packer.get(7).unwrap(), &ByteArray::from("G")); // int_field values - let int_field_packer = &packers[1]; - assert_eq!(get_int_val(int_field_packer, 0), 64); - assert_eq!(get_int_val(int_field_packer, 1), 65); - assert_eq!(get_int_val(int_field_packer, 2), 66); + let int_field_packer = &packers[1].i64_packer(); + assert_eq!(int_field_packer.get(0).unwrap(), &64); + assert_eq!(int_field_packer.get(1).unwrap(), &65); + assert_eq!(int_field_packer.get(2).unwrap(), &66); assert!(int_field_packer.is_null(3)); - assert_eq!(get_int_val(int_field_packer, 4), 67); - assert_eq!(get_int_val(int_field_packer, 5), 68); - assert_eq!(get_int_val(int_field_packer, 6), 69); - assert_eq!(get_int_val(int_field_packer, 7), 70); + assert_eq!(int_field_packer.get(4).unwrap(), &67); + assert_eq!(int_field_packer.get(5).unwrap(), &68); + assert_eq!(int_field_packer.get(6).unwrap(), &69); + assert_eq!(int_field_packer.get(7).unwrap(), &70); // float_field values - let float_field_packer = &packers[2]; + let float_field_packer = &packers[2].f64_packer(); assert!(approximately_equal( - get_float_val(float_field_packer, 0), + *float_field_packer.get(0).unwrap(), 100.0 )); assert!(approximately_equal( - get_float_val(float_field_packer, 1), + *float_field_packer.get(1).unwrap(), 101.0 )); assert!(approximately_equal( - get_float_val(float_field_packer, 2), + *float_field_packer.get(2).unwrap(), 102.0 )); assert!(approximately_equal( - get_float_val(float_field_packer, 3), + *float_field_packer.get(3).unwrap(), 103.0 )); assert!(float_field_packer.is_null(4)); assert!(approximately_equal( - get_float_val(float_field_packer, 5), + *float_field_packer.get(5).unwrap(), 104.0 )); assert!(approximately_equal( - get_float_val(float_field_packer, 6), + *float_field_packer.get(6).unwrap(), 105.0 )); assert!(approximately_equal( - get_float_val(float_field_packer, 7), + *float_field_packer.get(7).unwrap(), 106.0 )); // str_field values - let str_field_packer = &packers[3]; - assert_eq!(get_string_val(str_field_packer, 0), "foo1"); - assert_eq!(get_string_val(str_field_packer, 1), "foo2"); - assert_eq!(get_string_val(str_field_packer, 2), "foo3"); - assert_eq!(get_string_val(str_field_packer, 3), "foo4"); - assert_eq!(get_string_val(str_field_packer, 4), "foo5"); + let str_field_packer = &packers[3].str_packer(); + assert_eq!(str_field_packer.get(0).unwrap(), &ByteArray::from("foo1")); + assert_eq!(str_field_packer.get(1).unwrap(), &ByteArray::from("foo2")); + assert_eq!(str_field_packer.get(2).unwrap(), &ByteArray::from("foo3")); + assert_eq!(str_field_packer.get(3).unwrap(), &ByteArray::from("foo4")); + assert_eq!(str_field_packer.get(4).unwrap(), &ByteArray::from("foo5")); assert!(str_field_packer.is_null(5)); - assert_eq!(get_string_val(str_field_packer, 6), "foo6"); - assert_eq!(get_string_val(str_field_packer, 7), "foo7"); + assert_eq!(str_field_packer.get(6).unwrap(), &ByteArray::from("foo6")); + assert_eq!(str_field_packer.get(7).unwrap(), &ByteArray::from("foo7")); - // timestamp values (NB The timestamps are truncated to Microseconds) - let timestamp_packer = &packers[4]; - assert_eq!(get_int_val(timestamp_packer, 0), 1_590_488_773_254_420); - assert_eq!(get_int_val(timestamp_packer, 1), 1_590_488_773_254_430); - assert_eq!(get_int_val(timestamp_packer, 2), 1_590_488_773_254_440); - assert_eq!(get_int_val(timestamp_packer, 3), 1_590_488_773_254_450); - assert_eq!(get_int_val(timestamp_packer, 4), 1_590_488_773_254_460); - assert_eq!(get_int_val(timestamp_packer, 5), 1_590_488_773_254_470); + // timestamp values + let timestamp_packer = &packers[4].i64_packer(); + assert_eq!(timestamp_packer.get(0).unwrap(), &1_590_488_773_254_420); + assert_eq!(timestamp_packer.get(1).unwrap(), &1_590_488_773_254_430); + assert_eq!(timestamp_packer.get(2).unwrap(), &1_590_488_773_254_440); + assert_eq!(timestamp_packer.get(3).unwrap(), &1_590_488_773_254_450); + assert_eq!(timestamp_packer.get(4).unwrap(), &1_590_488_773_254_460); + assert_eq!(timestamp_packer.get(5).unwrap(), &1_590_488_773_254_470); assert!(timestamp_packer.is_null(6)); - assert_eq!(get_int_val(timestamp_packer, 7), 1_590_488_773_254_480); - + assert_eq!(timestamp_packer.get(7).unwrap(), &1_590_488_773_254_480); Ok(()) } diff --git a/delorean_parquet/src/writer.rs b/delorean_parquet/src/writer.rs index 82b02bf1c1..0a8c69850a 100644 --- a/delorean_parquet/src/writer.rs +++ b/delorean_parquet/src/writer.rs @@ -19,7 +19,7 @@ use parquet::{ use snafu::{ResultExt, Snafu}; use crate::metadata::parquet_schema_as_string; -use delorean_table::{packers::Packer, DeloreanTableWriter, Error as TableError}; +use delorean_table::{DeloreanTableWriter, Error as TableError, Packers}; #[derive(Debug, Snafu)] pub enum Error { @@ -62,23 +62,24 @@ where /// # use delorean_table_schema; /// # use delorean_table_schema::DataType; /// # use delorean_table::DeloreanTableWriter; - /// # use delorean_table::packers::Packer; + /// # use delorean_table::packers::{Packer, Packers}; /// # use delorean_parquet::writer::DeloreanParquetTableWriter; + /// # use parquet::data_type::ByteArray; /// /// let schema = delorean_table_schema::SchemaBuilder::new("measurement_name") /// .tag("tag1") /// .field("field1", delorean_table_schema::DataType::Integer) /// .build(); /// - /// let mut packers = vec![ - /// Packer::new(DataType::String), // 0: tag1 - /// Packer::new(DataType::Integer), // 1: field1 - /// Packer::new(DataType::Integer), // 2: timestamp + /// let mut packers: Vec = vec![ + /// Packers::String(Packer::new()), // 0: tag1 + /// Packers::Integer(Packer::new()), // 1: field1 + /// Packers::Integer(Packer::new()), // 2: timestamp /// ]; /// - /// packers[0].pack_str(Some("tag1")); // tag1 val - /// packers[1].pack_i64(Some(100)); // field1 val - /// packers[2].pack_none(); // no timestamp + /// packers[0].str_packer_mut().push(ByteArray::from("tag1")); // tag1 val + /// packers[1].i64_packer_mut().push(100); // field1 val + /// packers[2].push_none(); // no timestamp /// /// // Write to '/tmp/example.parquet' /// let mut output_file_name = std::env::temp_dir(); @@ -126,7 +127,7 @@ where /// column chunk /// /// TODO: better control of column chunks - fn write_batch(&mut self, packers: &[Packer]) -> Result<(), TableError> { + fn write_batch(&mut self, packers: &[Packers]) -> Result<(), TableError> { // now write out the data let mut row_group_writer = self.file_writer @@ -154,15 +155,18 @@ where }); } }; + // TODO(edd) This seems super awkward and not the right way to do it... + // We know we have a direct mapping between a col_writer (ColumnWriter) + // type and a Packers variant. We also know that we do exactly the same + // work for each variant (we just dispatch to the writ_batch method) + // on the column write. + // + // I think this match could be so much shorter but not sure how yet. match col_writer { BoolColumnWriter(ref mut w) => { - let bool_packer = packer.as_bool_packer(); + let p = packer.bool_packer(); let n = w - .write_batch( - &bool_packer.values, - Some(&bool_packer.def_levels), - Some(&bool_packer.rep_levels), - ) + .write_batch(p.values(), Some(p.def_levels()), Some(p.rep_levels())) .context(ParquetLibraryError { message: String::from("Can't write_batch with bool values"), })?; @@ -170,13 +174,9 @@ where } Int32ColumnWriter(_) => unreachable!("ParquetWriter does not support INT32 data"), Int64ColumnWriter(ref mut w) => { - let int_packer = packer.as_int_packer(); + let p = packer.i64_packer(); let n = w - .write_batch( - &int_packer.values, - Some(&int_packer.def_levels), - Some(&int_packer.rep_levels), - ) + .write_batch(p.values(), Some(p.def_levels()), Some(p.rep_levels())) .context(ParquetLibraryError { message: String::from("Can't write_batch with int64 values"), })?; @@ -187,26 +187,18 @@ where unreachable!("ParquetWriter does not support FLOAT (32-bit float) data") } DoubleColumnWriter(ref mut w) => { - let float_packer = packer.as_float_packer(); + let p = packer.f64_packer(); let n = w - .write_batch( - &float_packer.values, - Some(&float_packer.def_levels), - Some(&float_packer.rep_levels), - ) + .write_batch(p.values(), Some(p.def_levels()), Some(p.rep_levels())) .context(ParquetLibraryError { message: String::from("Can't write_batch with f64 values"), })?; debug!("Wrote {} rows of f64 data", n); } ByteArrayColumnWriter(ref mut w) => { - let string_packer = packer.as_string_packer(); + let p = packer.str_packer(); let n = w - .write_batch( - &string_packer.values, - Some(&string_packer.def_levels), - Some(&string_packer.rep_levels), - ) + .write_batch(p.values(), Some(p.def_levels()), Some(p.rep_levels())) .context(ParquetLibraryError { message: String::from("Can't write_batch with byte array values"), })?; diff --git a/delorean_parquet/tests/read_write.rs b/delorean_parquet/tests/read_write.rs index 31857b3332..e5845ecdcd 100644 --- a/delorean_parquet/tests/read_write.rs +++ b/delorean_parquet/tests/read_write.rs @@ -1,7 +1,7 @@ use delorean_parquet::writer::DeloreanParquetTableWriter; +use delorean_table::{packers::Packer, DeloreanTableWriter, Packers}; -use delorean_table::{packers::Packer, DeloreanTableWriter}; -use delorean_table_schema::DataType; +use parquet::data_type::ByteArray; use std::fs; #[test] @@ -15,13 +15,13 @@ fn test_write_parquet_data() { .build(); assert_eq!(schema.get_col_defs().len(), 6); - let mut packers = vec![ - Packer::new(DataType::String), // 0: tag1 - Packer::new(DataType::String), // 1: string_field - Packer::new(DataType::Float), // 2: float_field - Packer::new(DataType::Integer), // 3: int_field - Packer::new(DataType::Boolean), // 4: bool_field - Packer::new(DataType::Integer), // 5: timstamp + let mut packers: Vec = vec![ + Packers::String(Packer::new()), // 0: tag1 + Packers::String(Packer::new()), // 1: string_field + Packers::Float(Packer::new()), // 2: float_field + Packers::Integer(Packer::new()), // 3: int_field + Packers::Boolean(Packer::new()), // 4: bool_field + Packers::Integer(Packer::new()), // 5: timestamp ]; // create this data: @@ -29,26 +29,34 @@ fn test_write_parquet_data() { // row 0: "tag1_val0", "str_val0", 1.0, 100, true, 900000000000 // row 1: null, null , null, null, null, null // row 2: "tag1_val2", "str_val2", 2.0, 200, false, 9100000000000 - packers[0].pack_str(Some("tag1_val0")); - packers[1].pack_str(Some("str_val0")); - packers[2].pack_f64(Some(1.0)); - packers[3].pack_i64(Some(100)); - packers[4].pack_bool(Some(true)); - packers[5].pack_i64(Some(900000000000)); + packers[0] + .str_packer_mut() + .push(ByteArray::from("tag1_val0")); + packers[1] + .str_packer_mut() + .push(ByteArray::from("str_val0")); + packers[2].f64_packer_mut().push(1.0); + packers[3].i64_packer_mut().push(100); + packers[4].bool_packer_mut().push(true); + packers[5].i64_packer_mut().push(900000000000); - packers[0].pack_none(); - packers[1].pack_none(); - packers[2].pack_none(); - packers[3].pack_none(); - packers[4].pack_none(); - packers[5].pack_none(); + packers[0].push_none(); + packers[1].push_none(); + packers[2].push_none(); + packers[3].push_none(); + packers[4].push_none(); + packers[5].push_none(); - packers[0].pack_str(Some("tag1_val2")); - packers[1].pack_str(Some("str_val2")); - packers[2].pack_f64(Some(2.0)); - packers[3].pack_i64(Some(200)); - packers[4].pack_bool(Some(true)); - packers[5].pack_i64(Some(910000000000)); + packers[0] + .str_packer_mut() + .push(ByteArray::from("tag1_val2")); + packers[1] + .str_packer_mut() + .push(ByteArray::from("str_val2")); + packers[2].f64_packer_mut().push(2.0); + packers[3].i64_packer_mut().push(200); + packers[4].bool_packer_mut().push(true); + packers[5].i64_packer_mut().push(910000000000); // write the data out to the parquet file let output_path = delorean_test_helpers::tempfile::Builder::new() diff --git a/delorean_table/src/lib.rs b/delorean_table/src/lib.rs index 210e519f4e..a1c6d3a518 100644 --- a/delorean_table/src/lib.rs +++ b/delorean_table/src/lib.rs @@ -3,7 +3,7 @@ pub mod packers; use snafu::Snafu; use delorean_table_schema::Schema; -pub use packers::Packer; +pub use packers::{Packer, Packers}; #[derive(Snafu, Debug)] pub enum Error { @@ -23,7 +23,7 @@ pub enum Error { /// Something that knows how to write a set of columns somewhere pub trait DeloreanTableWriter { /// Writes a batch of packed data to the underlying output - fn write_batch(&mut self, packers: &[Packer]) -> Result<(), Error>; + fn write_batch(&mut self, packers: &[Packers]) -> Result<(), Error>; /// Closes the underlying writer and finalizes the work to write the file. fn close(&mut self) -> Result<(), Error>; diff --git a/delorean_table/src/packers.rs b/delorean_table/src/packers.rs index 751a2faa92..9c79e6d82d 100644 --- a/delorean_table/src/packers.rs +++ b/delorean_table/src/packers.rs @@ -5,492 +5,288 @@ // Note the maintainability of this code is not likely high (it came // from the copy pasta factory) but the plan is to replace it // soon... We'll see how long that actually takes... - use parquet::data_type::ByteArray; // NOTE: See https://blog.twitter.com/engineering/en_us/a/2013/dremel-made-simple-with-parquet.html -// for an explination of nesting levels +// for an explanation of nesting levels -/// Packs data for a column of strings -#[derive(Debug)] -pub struct StringPacker { - pub values: Vec, - pub def_levels: Vec, - pub rep_levels: Vec, -} -impl StringPacker { - fn new() -> StringPacker { - StringPacker { - values: Vec::new(), - def_levels: Vec::new(), - rep_levels: Vec::new(), - } - } - - fn with_capacity(capacity: usize) -> StringPacker { - StringPacker { - values: Vec::with_capacity(capacity), - def_levels: Vec::with_capacity(capacity), - rep_levels: Vec::with_capacity(capacity), - } - } - - fn len(&self) -> usize { - self.values.len() - } - - // Adds (copies) the data to be encoded - fn pack(&mut self, s: Option<&str>) { - match s { - Some(s) => { - self.values.push(ByteArray::from(s)); - self.def_levels.push(1); - self.rep_levels.push(1); - } - None => { - self.values.push(ByteArray::from("")); - self.def_levels.push(0); - self.rep_levels.push(1); - } - } - } +pub enum Packers { + Float(Packer), + Integer(Packer), + String(Packer), + Boolean(Packer), } -// Packs data for a column of floats -#[derive(Debug)] -pub struct FloatPacker { - pub values: Vec, - pub def_levels: Vec, - pub rep_levels: Vec, -} -impl FloatPacker { - fn new() -> FloatPacker { - FloatPacker { - values: Vec::new(), - def_levels: Vec::new(), - rep_levels: Vec::new(), - } - } - - fn with_capacity(capacity: usize) -> FloatPacker { - FloatPacker { - values: Vec::with_capacity(capacity), - def_levels: Vec::with_capacity(capacity), - rep_levels: Vec::with_capacity(capacity), - } - } - - fn len(&self) -> usize { - self.values.len() - } - - // Adds (copies) the data to be encoded - fn pack(&mut self, f: Option) { - match f { - Some(f) => { - self.values.push(f); - self.def_levels.push(1); - self.rep_levels.push(1); - } - None => { - self.values.push(std::f64::NAN); // doesn't matter as def level == 0 - self.def_levels.push(0); - self.rep_levels.push(1); - } - } - } -} - -// Packs data for a column of ints -#[derive(Debug)] -pub struct IntPacker { - pub values: Vec, - pub def_levels: Vec, - pub rep_levels: Vec, -} -impl IntPacker { - fn new() -> IntPacker { - IntPacker { - values: Vec::new(), - def_levels: Vec::new(), - rep_levels: Vec::new(), - } - } - - fn with_capacity(capacity: usize) -> IntPacker { - IntPacker { - values: Vec::with_capacity(capacity), - def_levels: Vec::with_capacity(capacity), - rep_levels: Vec::with_capacity(capacity), - } - } - - fn len(&self) -> usize { - self.values.len() - } - - // Adds (copies) the data to be encoded - fn pack(&mut self, i: Option) { - match i { - Some(i) => { - self.values.push(i); - self.def_levels.push(1); - self.rep_levels.push(1); - } - None => { - self.values.push(0); // doesn't matter as def level == 0 - self.def_levels.push(0); - self.rep_levels.push(1); - } - } - } -} - -// Packs data for a column of bool -#[derive(Debug)] -pub struct BoolPacker { - pub values: Vec, - pub def_levels: Vec, - pub rep_levels: Vec, -} -impl BoolPacker { - fn new() -> BoolPacker { - BoolPacker { - values: Vec::new(), - def_levels: Vec::new(), - rep_levels: Vec::new(), - } - } - - fn with_capacity(capacity: usize) -> BoolPacker { - BoolPacker { - values: Vec::with_capacity(capacity), - def_levels: Vec::with_capacity(capacity), - rep_levels: Vec::with_capacity(capacity), - } - } - - fn len(&self) -> usize { - self.values.len() - } - - // Adds (copies) the data to be encoded - fn pack(&mut self, b: Option) { - match b { - Some(b) => { - self.values.push(b); - self.def_levels.push(1); - self.rep_levels.push(1); - } - None => { - self.values.push(false); // doesn't matter as def level == 0 - self.def_levels.push(0); - self.rep_levels.push(1); - } - } - } -} - -#[derive(Debug)] -pub enum Packer { - StringPackerType(StringPacker), - FloatPackerType(FloatPacker), - IntPackerType(IntPacker), - BoolPackerType(BoolPacker), -} - -impl Packer { - /// Create a new packer that can pack values of the specified protocol type - pub fn new(t: delorean_table_schema::DataType) -> Packer { - match t { - delorean_table_schema::DataType::String => { - Packer::StringPackerType(StringPacker::new()) - } - delorean_table_schema::DataType::Float => Packer::FloatPackerType(FloatPacker::new()), - delorean_table_schema::DataType::Integer => Packer::IntPackerType(IntPacker::new()), - delorean_table_schema::DataType::Boolean => Packer::BoolPackerType(BoolPacker::new()), - delorean_table_schema::DataType::Timestamp => Packer::IntPackerType(IntPacker::new()), - } - } - - /// Create a new packer that can pack values of the specified type, with the specified capacity - pub fn with_capacity(t: delorean_table_schema::DataType, capacity: usize) -> Packer { - match t { - delorean_table_schema::DataType::String => { - Packer::StringPackerType(StringPacker::with_capacity(capacity)) - } - delorean_table_schema::DataType::Float => { - Packer::FloatPackerType(FloatPacker::with_capacity(capacity)) - } - delorean_table_schema::DataType::Integer => { - Packer::IntPackerType(IntPacker::with_capacity(capacity)) - } - delorean_table_schema::DataType::Boolean => { - Packer::BoolPackerType(BoolPacker::with_capacity(capacity)) - } - delorean_table_schema::DataType::Timestamp => { - Packer::IntPackerType(IntPacker::with_capacity(capacity)) - } +impl Packers { + pub fn is_empty(&self) -> bool { + match self { + Self::Float(p) => p.is_empty(), + Self::Integer(p) => p.is_empty(), + Self::String(p) => p.is_empty(), + Self::Boolean(p) => p.is_empty(), } } pub fn len(&self) -> usize { match self { - Packer::StringPackerType(string_packer) => string_packer.len(), - Packer::FloatPackerType(float_packer) => float_packer.len(), - Packer::IntPackerType(int_packer) => int_packer.len(), - Packer::BoolPackerType(bool_packer) => bool_packer.len(), + Self::Float(p) => p.len(), + Self::Integer(p) => p.len(), + Self::String(p) => p.len(), + Self::Boolean(p) => p.len(), } } + pub fn push_none(&mut self) { + match self { + Self::Float(p) => p.push_option(None), + Self::Integer(p) => p.push_option(None), + Self::String(p) => p.push_option(None), + Self::Boolean(p) => p.push_option(None), + } + } + + pub fn is_null(&self, index: usize) -> bool { + match self { + Self::Float(p) => match p.def_levels.get(index) { + Some(x) => *x == 0, + None => true, + }, + Self::Integer(p) => match p.def_levels.get(index) { + Some(x) => *x == 0, + None => true, + }, + Self::String(p) => match p.def_levels.get(index) { + Some(x) => *x == 0, + None => true, + }, + Self::Boolean(p) => match p.def_levels.get(index) { + Some(x) => *x == 0, + None => true, + }, + } + } + + // TODO(edd): YUK! Seem unable to avoid runtime checking of these packer + // types. Need to figure this out. + pub fn f64_packer_mut(&mut self) -> &mut Packer { + if let Self::Float(p) = self { + p + } else { + panic!("packer is not a Float"); + } + } + + pub fn f64_packer(&self) -> &Packer { + if let Self::Float(p) = self { + p + } else { + panic!("packer is not a Float"); + } + } + + pub fn i64_packer_mut(&mut self) -> &mut Packer { + if let Self::Integer(p) = self { + p + } else { + panic!("packer is not an Integer"); + } + } + + pub fn i64_packer(&self) -> &Packer { + if let Self::Integer(p) = self { + p + } else { + panic!("packer is not an Integer"); + } + } + + pub fn str_packer_mut(&mut self) -> &mut Packer { + if let Self::String(p) = self { + p + } else { + panic!("packer is not a String"); + } + } + + pub fn str_packer(&self) -> &Packer { + if let Self::String(p) = self { + p + } else { + panic!("packer is not a String"); + } + } + + pub fn bool_packer_mut(&mut self) -> &mut Packer { + if let Self::Boolean(p) = self { + p + } else { + panic!("packer is not a Boolean"); + } + } + + pub fn bool_packer(&self) -> &Packer { + if let Self::Boolean(p) = self { + p + } else { + panic!("packer is not a Boolean"); + } + } +} + +impl std::convert::From for Packers { + fn from(t: delorean_table_schema::DataType) -> Self { + match t { + delorean_table_schema::DataType::Float => Packers::Float(Packer::::new()), + delorean_table_schema::DataType::Integer => Packers::Integer(Packer::::new()), + delorean_table_schema::DataType::String => Packers::String(Packer::::new()), + delorean_table_schema::DataType::Boolean => Packers::Boolean(Packer::::new()), + delorean_table_schema::DataType::Timestamp => Packers::Integer(Packer::::new()), + } + } +} + +#[derive(Debug, Default)] +pub struct Packer { + values: Vec, + def_levels: Vec, + rep_levels: Vec, +} + +impl Packer { + pub fn new() -> Self { + Self { + values: Vec::new(), + def_levels: Vec::new(), + rep_levels: Vec::new(), + } + } + + /// Create a new packer with the specified capacity + pub fn with_capacity(capacity: usize) -> Self { + Self { + values: Vec::with_capacity(capacity), + def_levels: Vec::with_capacity(capacity), + rep_levels: Vec::with_capacity(capacity), + } + } + + pub fn len(&self) -> usize { + self.values.len() + } + pub fn is_empty(&self) -> bool { - // Clippy made me put this - self.len() == 0 + self.values.is_empty() } - pub fn pack_str(&mut self, s: Option<&str>) { - if let Packer::StringPackerType(string_packer) = self { - string_packer.pack(s) - } else { - panic!("Packer {:?} does not know how to pack strings", self); + pub fn get(&self, index: usize) -> Option<&T> { + self.values.get(index) + } + + // TODO(edd): I don't like these getters. They're only needed so we can + // write the data into a parquet writer. We should have a method on Packer + // that accepts some implementation of a trait that a parquet writer satisfies + // and then pass the data through in here. + pub fn values(&self) -> &Vec { + &self.values + } + + pub fn def_levels(&self) -> &Vec { + &self.def_levels + } + + pub fn rep_levels(&self) -> &Vec { + &self.rep_levels + } + + pub fn push_option(&mut self, value: Option) { + match value { + Some(v) => self.push(v), + None => { + self.values.push(T::default()); // doesn't matter as def level == 0 + self.def_levels.push(0); + self.rep_levels.push(1); + } } } - pub fn pack_f64(&mut self, f: Option) { - if let Packer::FloatPackerType(float_packer) = self { - float_packer.pack(f) - } else { - panic!("Packer {:?} does not know how to pack floats", self); - } + pub fn push(&mut self, value: T) { + self.values.push(value); + self.def_levels.push(1); + self.rep_levels.push(1); } - pub fn pack_i64(&mut self, i: Option) { - if let Packer::IntPackerType(int_packer) = self { - int_packer.pack(i) - } else { - panic!("Packer {:?} does not know how to pack ints", self); + /// Return true if the row for index is null. Returns true if there is no + /// row for index. + pub fn is_null(&self, index: usize) -> bool { + match self.def_levels.get(index) { + Some(x) => *x == 0, + None => true, } } +} - pub fn pack_bool(&mut self, b: Option) { - if let Packer::BoolPackerType(bool_packer) = self { - bool_packer.pack(b) - } else { - panic!("Packer {:?} does not know how to pack bools", self); - } +pub trait PackerDefault { + fn default() -> Self; +} + +impl PackerDefault for f64 { + fn default() -> Self { + 0.0 } +} - pub fn as_string_packer(&self) -> &StringPacker { - if let Packer::StringPackerType(string_packer) = self { - string_packer - } else { - panic!("Packer {:?} is not a string packer", self); - } +impl PackerDefault for i64 { + fn default() -> Self { + 0 } +} - pub fn as_float_packer(&self) -> &FloatPacker { - if let Packer::FloatPackerType(float_packer) = self { - float_packer - } else { - panic!("Packer {:?} is not a float packer", self); - } +impl PackerDefault for ByteArray { + fn default() -> Self { + ByteArray::from("") } +} - pub fn as_int_packer(&self) -> &IntPacker { - if let Packer::IntPackerType(int_packer) = self { - int_packer - } else { - panic!("Packer {:?} is not an int packer", self); - } - } - - pub fn as_bool_packer(&self) -> &BoolPacker { - if let Packer::BoolPackerType(bool_packer) = self { - bool_packer - } else { - panic!("Packer {:?} is not an bool packer", self); - } - } - - pub fn pack_none(&mut self) { - match self { - Packer::StringPackerType(string_packer) => string_packer.pack(None), - Packer::FloatPackerType(float_packer) => float_packer.pack(None), - Packer::IntPackerType(int_packer) => int_packer.pack(None), - Packer::BoolPackerType(bool_packer) => bool_packer.pack(None), - } - } - - /// Return true if the idx'th row is null - pub fn is_null(&self, idx: usize) -> bool { - match self { - Packer::StringPackerType(string_packer) => string_packer.def_levels[idx] == 0, - Packer::FloatPackerType(float_packer) => float_packer.def_levels[idx] == 0, - Packer::IntPackerType(int_packer) => int_packer.def_levels[idx] == 0, - Packer::BoolPackerType(bool_packer) => bool_packer.def_levels[idx] == 0, - } +impl PackerDefault for bool { + fn default() -> Self { + false } } #[cfg(test)] mod test { use super::*; - use delorean_table_schema::DataType; - use delorean_test_helpers::approximately_equal; #[test] fn with_capacity() { - let string_packer = StringPacker::with_capacity(42); - assert_eq!(string_packer.values.capacity(), 42); - assert_eq!(string_packer.def_levels.capacity(), 42); - assert_eq!(string_packer.rep_levels.capacity(), 42); - - let float_packer = FloatPacker::with_capacity(43); - assert_eq!(float_packer.values.capacity(), 43); - assert_eq!(float_packer.def_levels.capacity(), 43); - assert_eq!(float_packer.rep_levels.capacity(), 43); - - let int_packer = IntPacker::with_capacity(44); - assert_eq!(int_packer.values.capacity(), 44); - assert_eq!(int_packer.def_levels.capacity(), 44); - assert_eq!(int_packer.rep_levels.capacity(), 44); - - let bool_packer = BoolPacker::with_capacity(45); - assert_eq!(bool_packer.values.capacity(), 45); - assert_eq!(bool_packer.def_levels.capacity(), 45); - assert_eq!(bool_packer.rep_levels.capacity(), 45); + let packer: Packer = Packer::with_capacity(42); + assert_eq!(packer.values.capacity(), 42); + assert_eq!(packer.def_levels.capacity(), 42); + assert_eq!(packer.rep_levels.capacity(), 42); } #[test] - fn string_packer() { - let mut packer = Packer::new(DataType::String); - assert_eq!(packer.len(), 0); - packer.pack_str(Some("foo")); - packer.pack_str(Some("")); - packer.pack_str(None); - packer.pack_none(); - packer.pack_str(Some("bar")); - assert_eq!(packer.len(), 5); - assert_eq!( - packer.as_string_packer().values[0].as_utf8().unwrap(), - "foo" - ); - assert_eq!(packer.as_string_packer().values[1].as_utf8().unwrap(), ""); - assert_eq!( - packer.as_string_packer().values[4].as_utf8().unwrap(), - "bar" - ); - assert_eq!(packer.as_string_packer().def_levels, vec![1, 1, 0, 0, 1]); - assert_eq!(packer.as_string_packer().rep_levels, vec![1, 1, 1, 1, 1]); + fn is_null() { + let mut packer: Packer = Packer::new(); + packer.push(22.3); + packer.push_option(Some(100.3)); + packer.push_option(None); + packer.push(33.3); - assert!(!packer.is_null(1)); - assert!(packer.is_null(2)); + assert_eq!(packer.is_null(0), false); + assert_eq!(packer.is_null(1), false); + assert_eq!(packer.is_null(2), true); + assert_eq!(packer.is_null(3), false); + assert_eq!(packer.is_null(4), true); // out of bounds } #[test] - #[should_panic] - fn string_packer_pack_float() { - let mut packer = Packer::new(DataType::String); - packer.pack_f64(Some(5.3)); - } + fn packers() { + let mut packers: Vec = Vec::new(); + packers.push(Packers::Float(Packer::new())); + packers.push(Packers::Integer(Packer::new())); + packers.push(Packers::Boolean(Packer::new())); - #[test] - fn float_packer() { - let mut packer = Packer::new(DataType::Float); - assert_eq!(packer.len(), 0); - packer.pack_f64(Some(1.23)); - packer.pack_f64(None); - packer.pack_none(); - packer.pack_f64(Some(4.56)); - assert_eq!(packer.len(), 4); - assert!(approximately_equal( - packer.as_float_packer().values[0], - 1.23 - )); - assert!(approximately_equal( - packer.as_float_packer().values[3], - 4.56 - )); - assert_eq!(packer.as_float_packer().def_levels, vec![1, 0, 0, 1]); - assert_eq!(packer.as_float_packer().rep_levels, vec![1, 1, 1, 1]); - - assert!(!packer.is_null(0)); - assert!(packer.is_null(1)); - } - - #[test] - #[should_panic] - fn float_packer_pack_string() { - let mut packer = Packer::new(DataType::Float); - packer.pack_str(Some("foo")); - } - - fn test_int_packer(mut packer: Packer) { - assert_eq!(packer.len(), 0); - packer.pack_i64(Some(1)); - packer.pack_i64(None); - packer.pack_none(); - packer.pack_i64(Some(-1)); - assert_eq!(packer.len(), 4); - assert_eq!(packer.as_int_packer().values[0], 1); - assert_eq!(packer.as_int_packer().values[3], -1); - assert_eq!(packer.as_int_packer().def_levels, vec![1, 0, 0, 1]); - assert_eq!(packer.as_int_packer().rep_levels, vec![1, 1, 1, 1]); - - assert!(!packer.is_null(0)); - assert!(packer.is_null(1)); - } - - #[test] - fn int_packer() { - let packer = Packer::new(DataType::Integer); - test_int_packer(packer); - } - - #[test] - #[should_panic] - fn int_packer_pack_string() { - let mut packer = Packer::new(DataType::Integer); - packer.pack_str(Some("foo")); - } - - #[test] - fn bool_packer() { - let mut packer = Packer::new(DataType::Boolean); - assert_eq!(packer.len(), 0); - packer.pack_bool(Some(true)); - packer.pack_bool(Some(false)); - packer.pack_bool(None); - packer.pack_none(); - packer.pack_bool(Some(true)); - assert_eq!(packer.len(), 5); - assert_eq!(packer.as_bool_packer().values[0], true); - assert_eq!(packer.as_bool_packer().values[1], false); - assert_eq!(packer.as_bool_packer().values[4], true); - assert_eq!(packer.as_bool_packer().def_levels, vec![1, 1, 0, 0, 1]); - assert_eq!(packer.as_bool_packer().rep_levels, vec![1, 1, 1, 1, 1]); - - assert!(!packer.is_null(1)); - assert!(packer.is_null(2)); - } - - #[test] - #[should_panic] - fn bool_packer_pack_string() { - let mut packer = Packer::new(DataType::Boolean); - packer.pack_str(Some("foo")); - } - - #[test] - fn timstamp_packer() { - let packer = Packer::new(DataType::Timestamp); - test_int_packer(packer); - } - - #[test] - #[should_panic] - fn timstamp_packer_pack_string() { - let mut packer = Packer::new(DataType::Timestamp); - packer.pack_str(Some("foo")); + packers.get_mut(0).unwrap().f64_packer_mut().push(22.033); } }