diff --git a/Cargo.lock b/Cargo.lock index 7bb88bb2c0..8d4f04c2a4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -686,6 +686,7 @@ dependencies = [ "delorean_test_helpers", "delorean_tsm", "env_logger", + "libflate", "log", "snafu", ] diff --git a/delorean_ingest/Cargo.toml b/delorean_ingest/Cargo.toml index bf797454f5..b1a26ea873 100644 --- a/delorean_ingest/Cargo.toml +++ b/delorean_ingest/Cargo.toml @@ -18,3 +18,4 @@ delorean_tsm = { path = "../delorean_tsm" } [dev-dependencies] delorean_test_helpers ={ path = "../delorean_test_helpers" } +libflate = "1.0.0" diff --git a/delorean_ingest/src/lib.rs b/delorean_ingest/src/lib.rs index ea194da633..0cc6026b16 100644 --- a/delorean_ingest/src/lib.rs +++ b/delorean_ingest/src/lib.rs @@ -21,7 +21,9 @@ use delorean_table::{ ByteArray, DeloreanTableWriter, DeloreanTableWriterSource, Error as TableError, }; use delorean_table_schema::{DataType, Schema, SchemaBuilder}; -use delorean_tsm::mapper::{map_field_columns, ColumnData, TSMMeasurementMapper}; +use delorean_tsm::mapper::{ + map_field_columns, BlockDecoder, ColumnData, MeasurementTable, TSMMeasurementMapper, +}; use delorean_tsm::reader::{TSMBlockReader, TSMIndexReader}; use delorean_tsm::{BlockType, TSMError}; @@ -530,155 +532,8 @@ impl TSMFileConverter { for measurement in mapper { match measurement { Ok(mut m) => { - let mut builder = SchemaBuilder::new(&m.name); - let mut packed_columns: Vec = Vec::new(); - - let mut tks = Vec::new(); - for tag in m.tag_columns() { - builder = builder.tag(tag); - tks.push(tag.clone()); - packed_columns.push(Packers::String(Packer::new())); - } - - let mut fks = Vec::new(); - for (field_key, block_type) in m.field_columns().to_owned() { - builder = builder.field(&field_key, DataType::from(&block_type)); - fks.push((field_key.clone(), block_type)); - packed_columns.push(Packers::String(Packer::new())); // FIXME - will change - } - - // Account for timestamp - packed_columns.push(Packers::Integer(Packer::new())); - - let schema = builder.build(); - - // get mapping between named columns and packer indexes. - let name_packer = schema - .get_col_defs() - .iter() - .map(|c| (c.name.clone(), c.index as usize)) - .collect::>(); - - // For each tagset combination in the measurement I need - // to build out the table. Then for each column in the - // table I need to convert to a Packer and append it - // to the packer_column. - - for (tag_set_pair, blocks) in m.tag_set_fields_blocks() { - let (ts, field_cols) = map_field_columns(&mut block_reader, blocks) - .map_err(|e| Error::TSMProcessing { source: e })?; - - // Start with the timestamp column. - let col_len = ts.len(); - let ts_idx = - name_packer - .get(schema.timestamp()) - .ok_or(Error::TSMProcessing { - // TODO clean this error up - source: TSMError { - description: "could not find ts column".to_string(), - }, - })?; - - packed_columns[*ts_idx] = Packers::from(ts); - - // Next let's pad out all of the tag columns we know have - // repeated values. - for (tag_key, tag_value) in tag_set_pair { - let idx = name_packer.get(tag_key).ok_or(Error::TSMProcessing { - // TODO clean this error up - source: TSMError { - description: "could not find column".to_string(), - }, - })?; - - // this will create a column of repeated values. - packed_columns[*idx] = Packers::from_elem_str(tag_value, col_len); - } - - // Next let's write out NULL values for any tag columns - // on the measurement that we don't have values for - // because they're not part of this tagset. - let tag_keys = tag_set_pair - .iter() - .map(|pair| pair.0.clone()) - .collect::>(); - for key in &tks { - if tag_keys.contains(key) { - continue; - } - - let idx = name_packer.get(key).ok_or(Error::TSMProcessing { - // TODO clean this error up - source: TSMError { - description: "could not find column".to_string(), - }, - })?; - - // this will create a column of repeated None values. - let col: Vec>> = vec![None; col_len]; - packed_columns[*idx] = Packers::from(col); - } - - // Next let's write out all of the field column data. - let mut got_field_cols = Vec::new(); - for (field_key, field_values) in field_cols { - let idx = name_packer.get(&field_key).ok_or(Error::TSMProcessing { - // TODO clean this error up - source: TSMError { - description: "could not find column".to_string(), - }, - })?; - - match field_values { - ColumnData::Float(v) => packed_columns[*idx] = Packers::from(v), - ColumnData::Integer(v) => packed_columns[*idx] = Packers::from(v), - ColumnData::Str(v) => packed_columns[*idx] = Packers::from(v), - ColumnData::Bool(v) => packed_columns[*idx] = Packers::from(v), - ColumnData::Unsigned(v) => packed_columns[*idx] = Packers::from(v), - } - got_field_cols.push(field_key); - } - - // Finally let's write out all of the field columns that - // we don't have values for here. - for (key, field_type) in &fks { - if got_field_cols.contains(key) { - continue; - } - - let idx = name_packer.get(key).ok_or(Error::TSMProcessing { - // TODO clean this error up - source: TSMError { - description: "could not find column".to_string(), - }, - })?; - - // this will create a column of repeated None values. - match field_type { - BlockType::Float => { - let col: Vec> = vec![None; col_len]; - packed_columns[*idx] = Packers::from(col); - } - BlockType::Integer => { - let col: Vec> = vec![None; col_len]; - packed_columns[*idx] = Packers::from(col); - } - BlockType::Bool => { - let col: Vec> = vec![None; col_len]; - packed_columns[*idx] = Packers::from(col); - } - BlockType::Str => { - let col: Vec>> = vec![None; col_len]; - packed_columns[*idx] = Packers::from(col); - } - BlockType::Unsigned => { - let col: Vec> = vec![None; col_len]; - packed_columns[*idx] = Packers::from(col); - } - } - } - } + let (schema, packed_columns) = + Self::process_measurement_table(&mut block_reader, &mut m)?; let mut table_writer = self .table_writer_source @@ -697,6 +552,238 @@ impl TSMFileConverter { } Ok(()) } + + // Given a measurement table `process_measurement_table` produces an + // appropriate schema and set of Packers. + fn process_measurement_table( + // block_reader: impl BlockDecoder, + mut block_reader: &mut TSMBlockReader, + m: &mut MeasurementTable, + ) -> Result<(Schema, Vec), Error> { + let mut builder = SchemaBuilder::new(&m.name); + let mut packed_columns: Vec = Vec::new(); + + let mut tks = Vec::new(); + for tag in m.tag_columns() { + builder = builder.tag(tag); + tks.push(tag.clone()); + packed_columns.push(Packers::String(Packer::new())); + } + + let mut fks = Vec::new(); + for (field_key, block_type) in m.field_columns().to_owned() { + builder = builder.field(&field_key, DataType::from(&block_type)); + fks.push((field_key.clone(), block_type)); + packed_columns.push(Packers::String(Packer::new())); // FIXME - will change + } + + // Account for timestamp + packed_columns.push(Packers::Integer(Packer::new())); + + let schema = builder.build(); + + // get mapping between named columns and packer indexes. + let name_packer = schema + .get_col_defs() + .iter() + .map(|c| (c.name.clone(), c.index as usize)) + .collect::>(); + + // For each tagset combination in the measurement I need + // to build out the table. Then for each column in the + // table I need to convert to a Packer and append it + // to the packer_column. + + for (i, (tag_set_pair, blocks)) in m.tag_set_fields_blocks().iter_mut().enumerate() { + let (ts, field_cols) = map_field_columns(&mut block_reader, blocks) + .map_err(|e| Error::TSMProcessing { source: e })?; + + // Start with the timestamp column. + let col_len = ts.len(); + let ts_idx = name_packer + .get(schema.timestamp()) + .ok_or(Error::TSMProcessing { + // TODO clean this error up + source: TSMError { + description: "could not find ts column".to_string(), + }, + })?; + + if i == 0 { + packed_columns[*ts_idx] = Packers::from(ts); + } else { + packed_columns[*ts_idx] + .i64_packer_mut() + .extend_from_slice(&ts); + } + + // Next let's pad out all of the tag columns we know have + // repeated values. + for (tag_key, tag_value) in tag_set_pair { + let idx = name_packer.get(tag_key).ok_or(Error::TSMProcessing { + // TODO clean this error up + source: TSMError { + description: "could not find column".to_string(), + }, + })?; + + // this will create a column of repeated values. + if i == 0 { + packed_columns[*idx] = Packers::from_elem_str(tag_value, col_len); + } else { + packed_columns[*idx] + .str_packer_mut() + .extend_from_slice(&vec![ByteArray::from(tag_value.as_ref()); col_len]); + } + } + + // Next let's write out NULL values for any tag columns + // on the measurement that we don't have values for + // because they're not part of this tagset. + let tag_keys = tag_set_pair + .iter() + .map(|pair| pair.0.clone()) + .collect::>(); + for key in &tks { + if tag_keys.contains(key) { + continue; + } + + let idx = name_packer.get(key).ok_or(Error::TSMProcessing { + // TODO clean this error up + source: TSMError { + description: "could not find column".to_string(), + }, + })?; + + if i == 0 { + // creates a column of repeated None values. + let col: Vec>> = vec![None; col_len]; + packed_columns[*idx] = Packers::from(col); + } else { + // pad out column with None values because we don't have a + // value for it. + packed_columns[*idx].str_packer_mut().pad_with_null(col_len); + } + } + + // Next let's write out all of the field column data. + let mut got_field_cols = Vec::new(); + for (field_key, field_values) in field_cols { + let idx = name_packer.get(&field_key).ok_or(Error::TSMProcessing { + // TODO clean this error up + source: TSMError { + description: "could not find column".to_string(), + }, + })?; + + if i == 0 { + match field_values { + ColumnData::Float(v) => packed_columns[*idx] = Packers::from(v), + ColumnData::Integer(v) => packed_columns[*idx] = Packers::from(v), + ColumnData::Str(v) => packed_columns[*idx] = Packers::from(v), + ColumnData::Bool(v) => packed_columns[*idx] = Packers::from(v), + ColumnData::Unsigned(v) => packed_columns[*idx] = Packers::from(v), + } + } else { + match field_values { + ColumnData::Float(v) => packed_columns[*idx] + .f64_packer_mut() + .extend_from_option_slice(&v), + ColumnData::Integer(v) => packed_columns[*idx] + .i64_packer_mut() + .extend_from_option_slice(&v), + ColumnData::Str(values) => { + // TODO fix this up.... + let col = packed_columns[*idx].str_packer_mut(); + for value in values { + match value { + Some(v) => col.push(ByteArray::from(v)), + None => col.push_option(None), + } + } + } + ColumnData::Bool(v) => packed_columns[*idx] + .bool_packer_mut() + .extend_from_option_slice(&v), + ColumnData::Unsigned(values) => { + // TODO fix this up.... + let col = packed_columns[*idx].i64_packer_mut(); + for value in values { + match value { + Some(v) => col.push(v as i64), + None => col.push_option(None), + } + } + } + } + } + got_field_cols.push(field_key); + } + + // Finally let's write out all of the field columns that + // we don't have values for here. + for (key, field_type) in &fks { + if got_field_cols.contains(key) { + continue; + } + + let idx = name_packer.get(key).ok_or(Error::TSMProcessing { + // TODO clean this error up + source: TSMError { + description: "could not find column".to_string(), + }, + })?; + + // this will create a column of repeated None values. + if i == 0 { + match field_type { + BlockType::Float => { + let col: Vec> = vec![None; col_len]; + packed_columns[*idx] = Packers::from(col); + } + BlockType::Integer => { + let col: Vec> = vec![None; col_len]; + packed_columns[*idx] = Packers::from(col); + } + BlockType::Bool => { + let col: Vec> = vec![None; col_len]; + packed_columns[*idx] = Packers::from(col); + } + BlockType::Str => { + let col: Vec>> = vec![None; col_len]; + packed_columns[*idx] = Packers::from(col); + } + BlockType::Unsigned => { + let col: Vec> = vec![None; col_len]; + packed_columns[*idx] = Packers::from(col); + } + } + } else { + match field_type { + BlockType::Float => { + packed_columns[*idx].f64_packer_mut().pad_with_null(col_len); + } + BlockType::Integer => { + packed_columns[*idx].i64_packer_mut().pad_with_null(col_len); + } + BlockType::Bool => { + packed_columns[*idx] + .bool_packer_mut() + .pad_with_null(col_len); + } + BlockType::Str => { + packed_columns[*idx].str_packer_mut().pad_with_null(col_len); + } + BlockType::Unsigned => { + packed_columns[*idx].i64_packer_mut().pad_with_null(col_len); + } + } + } + } + } + Ok((schema, packed_columns)) + } } impl std::fmt::Debug for TSMFileConverter { @@ -716,6 +803,12 @@ mod delorean_ingest_tests { use delorean_table_schema::ColumnDefinition; use delorean_test_helpers::approximately_equal; + use libflate::gzip; + use std::fs::File; + use std::io::BufReader; + use std::io::Cursor; + use std::io::Read; + use std::sync::{Arc, Mutex}; /// Record what happens when the writer is created so we can @@ -724,6 +817,7 @@ mod delorean_ingest_tests { struct WriterLog { events: Vec, } + impl WriterLog { fn new() -> Self { Self { events: Vec::new() } @@ -1366,36 +1460,4 @@ mod delorean_ingest_tests { Ok(()) } - - // ----- Tests for TSM Data ----- - - // #[test] - // fn conversion_tsm_files() -> Result<(), Error> { - // let log = Arc::new(Mutex::new(WriterLog::new())); - - // // let mut converter = - // // LineProtocolConverter::new(settings, NoOpWriterSource::new(log.clone())); - - // // converter - // // .convert(parsed_lines) - // // .expect("conversion ok") - // // .finalize() - // // .expect("finalize"); - - // assert_eq!( - // get_events(&log), - // vec![ - // "Created writer for measurement h2o_temperature", - // "Created writer for measurement air_temperature", - // "[air_temperature] Wrote batch of 4 cols, 3 rows", - // "[h2o_temperature] Wrote batch of 4 cols, 3 rows", - // "[air_temperature] Wrote batch of 4 cols, 1 rows", - // "[air_temperature] Closed", - // "[h2o_temperature] Wrote batch of 4 cols, 2 rows", - // "[h2o_temperature] Closed", - // ] - // ); - - // Ok(()) - // } } diff --git a/delorean_table/src/packers.rs b/delorean_table/src/packers.rs index a7795dd494..24f0148e79 100644 --- a/delorean_table/src/packers.rs +++ b/delorean_table/src/packers.rs @@ -5,6 +5,8 @@ // Note the maintainability of this code is not likely high (it came // from the copy pasta factory) but the plan is to replace it // soon... We'll see how long that actually takes... +use std::iter; + use parquet::data_type::ByteArray; use std::default::Default; @@ -58,6 +60,15 @@ impl Packers { } } + pub fn push_none(&mut self) { + match self { + Self::Float(p) => p.push_option(None), + Self::Integer(p) => p.push_option(None), + Self::String(p) => p.push_option(None), + Self::Boolean(p) => p.push_option(None), + } + } + /// See description on `Packer::num_rows` pub fn num_rows(&self) -> usize { match self { @@ -68,15 +79,6 @@ impl Packers { } } - pub fn push_none(&mut self) { - match self { - Self::Float(p) => p.push_option(None), - Self::Integer(p) => p.push_option(None), - Self::String(p) => p.push_option(None), - Self::Boolean(p) => p.push_option(None), - } - } - /// Determines if the value for `row` is null is null. /// /// If there is no row then `is_null` returns `true`. @@ -181,13 +183,19 @@ impl std::convert::From>> for Packers { } #[derive(Debug, Default)] -pub struct Packer { +pub struct Packer +where + T: Default + Clone, +{ values: Vec, def_levels: Vec, rep_levels: Vec, } -impl Packer { +impl Packer +where + T: Default + Clone, +{ pub fn new() -> Self { Self { values: Vec::new(), @@ -273,6 +281,37 @@ impl Packer { self.rep_levels.push(1); } + pub fn extend_from_packer(&mut self, other: &Packer) { + self.values.extend_from_slice(&other.values); + self.def_levels.extend_from_slice(&other.def_levels); + self.rep_levels.extend_from_slice(&other.rep_levels); + } + + pub fn extend_from_slice(&mut self, other: &[T]) { + self.values.extend_from_slice(other); + self.def_levels.extend(iter::repeat(1).take(other.len())); + self.rep_levels.extend(iter::repeat(1).take(other.len())); + } + + pub fn extend_from_option_slice(&mut self, other: &[Option]) { + for v in other { + self.push_option(v.clone()); // TODO(edd): perf here. + } + } + + pub fn fill_with(&mut self, value: T, additional: usize) { + self.values.extend(iter::repeat(value).take(additional)); + self.def_levels.extend(iter::repeat(1).take(additional)); + self.rep_levels.extend(iter::repeat(1).take(additional)); + } + + pub fn pad_with_null(&mut self, additional: usize) { + self.values + .extend(iter::repeat(T::default()).take(additional)); + self.def_levels.extend(iter::repeat(0).take(additional)); + self.rep_levels.extend(iter::repeat(1).take(additional)); + } + /// Return true if the row for index is null. Returns true if there is no /// row for index. pub fn is_null(&self, index: usize) -> bool { @@ -282,7 +321,10 @@ impl Packer { // Convert `Vec`, e.g., `Vec` into the appropriate `Packer` value, // e.g., `Packer`. -impl std::convert::From> for Packer { +impl std::convert::From> for Packer +where + T: Default + Clone, +{ fn from(v: Vec) -> Self { Self { def_levels: vec![1; v.len()], @@ -294,7 +336,10 @@ impl std::convert::From> for Packer { // Convert `Vec>`, e.g., `Vec>` into the appropriate // `Packer` value, e.g., `Packer`. -impl std::convert::From>> for Packer { +impl std::convert::From>> for Packer +where + T: Default + Clone, +{ fn from(values: Vec>) -> Self { let mut packer = Self::new(); for v in values { @@ -316,6 +361,50 @@ mod test { assert_eq!(packer.rep_levels.capacity(), 42); } + #[test] + fn extend_from_slice() { + let mut packer: Packer = Packer::new(); + packer.push(100); + packer.push(22); + + packer.extend_from_slice(&[2, 3, 4]); + + assert_eq!(packer.values, &[100, 22, 2, 3, 4]); + assert_eq!(packer.def_levels, &[1; 5]); + assert_eq!(packer.rep_levels, &[1; 5]); + } + + #[test] + fn extend_from_packer() { + let mut packer_a: Packer = Packer::new(); + packer_a.push(100); + packer_a.push(22); + + let mut packer_b = Packer::new(); + packer_b.push(3); + + packer_a.extend_from_packer(&packer_b); + assert_eq!(packer_a.values, &[100, 22, 3]); + assert_eq!(packer_a.def_levels, &[1; 3]); + assert_eq!(packer_a.rep_levels, &[1; 3]); + } + + #[test] + fn pad_with_null() { + let mut packer: Packer = Packer::new(); + packer.push(100); + packer.push(22); + + packer.pad_with_null(3); + + assert_eq!( + packer.values, + &[100, 22, i64::default(), i64::default(), i64::default()] + ); + assert_eq!(packer.def_levels, &[1, 1, 0, 0, 0]); + assert_eq!(packer.rep_levels, &[1; 5]); + } + #[test] fn is_null() { let mut packer: Packer = Packer::new(); diff --git a/delorean_tsm/src/mapper.rs b/delorean_tsm/src/mapper.rs index 5362b1fed9..7df4684f98 100644 --- a/delorean_tsm/src/mapper.rs +++ b/delorean_tsm/src/mapper.rs @@ -242,6 +242,15 @@ where } } +impl BlockDecoder for &mut &mut TSMBlockReader +where + R: BufRead + Seek, +{ + fn block_data(&mut self, block: &Block) -> Result { + self.decode_block(block) + } +} + // Maps multiple columnar field blocks to a single tablular representation. // // Given a set of field keys and a set of blocks for each key,