diff --git a/Cargo.lock b/Cargo.lock index 91b42c3532..a5881d04f2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -407,6 +407,16 @@ dependencies = [ "tonic-build 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "delorean_ingest" +version = "0.1.0" +dependencies = [ + "delorean_line_parser 0.1.0", + "line_protocol_schema 0.1.0", + "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", + "snafu 0.6.8 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "delorean_line_parser" version = "0.1.0" @@ -424,6 +434,7 @@ version = "0.1.0" dependencies = [ "assert_cmd 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", "clap 2.33.1 (registry+https://github.com/rust-lang/crates.io-index)", + "delorean_ingest 0.1.0", "delorean_line_parser 0.1.0", "env_logger 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", @@ -894,6 +905,9 @@ dependencies = [ [[package]] name = "line_protocol_schema" version = "0.1.0" +dependencies = [ + "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", +] [[package]] name = "log" diff --git a/Cargo.toml b/Cargo.toml index beae4c610e..9317823b62 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,7 @@ default-run = "delorean" [workspace] members = [ + "delorean_ingest", "delorean_line_parser", "delorean_storage_tool", "delorean_test_helpers", diff --git a/delorean_ingest/Cargo.toml b/delorean_ingest/Cargo.toml new file mode 100644 index 0000000000..e61be397b0 --- /dev/null +++ b/delorean_ingest/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "delorean_ingest" +version = "0.1.0" +authors = ["Andrew Lamb "] +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +snafu = "0.6.2" +log = "0.4.8" +delorean_line_parser = { path = "../delorean_line_parser" } +line_protocol_schema = { path = "../line_protocol_schema" } \ No newline at end of file diff --git a/delorean_ingest/src/lib.rs b/delorean_ingest/src/lib.rs new file mode 100644 index 0000000000..316ffdb324 --- /dev/null +++ b/delorean_ingest/src/lib.rs @@ -0,0 +1,271 @@ +//! Library with code for (aspirationally) ingesting various data formats into Delorean +use log::debug; +use snafu::{OptionExt, Snafu}; + +use delorean_line_parser::{FieldValue, ParsedLine}; + +use line_protocol_schema::{DataType, Schema, SchemaBuilder}; + +/// Handles converting raw line protocol `ParsedLine` structures into Delorean format. +pub struct LineProtocolConverter { + // Schema is used in tests and will be used to actually convert data shortly + schema: Schema, +} + +#[derive(Snafu, Debug)] +pub enum Error { + /// Conversion needs at least one line of data + NeedsAtLeastOneLine, + + // Only a single line protocol measurement field is currently supported + #[snafu(display(r#"More than one measurement not yet supported: {}"#, message))] + OnlyOneMeasurementSupported { message: String }, +} + +/// `LineProtocolConverter` are used to +/// converting an iterator of `ParsedLines` into the delorean +/// internal columnar data format (exactly what this is is TBD). +/// +impl LineProtocolConverter { + pub fn schema(&self) -> &Schema { + &self.schema + } + + /// Create a new `LineProtocolConverter` by extracting the implied + /// schema from an iterator of ParsedLines. + /// + /// The converter can subsequently be used for any `ParsedLine`'s + /// that have the same schema (e.g. tag names, field names, + /// measurements). + /// + pub fn new<'a>( + lines: impl Iterator>, + ) -> Result { + let mut peekable_iter = lines.peekable(); + let first_line = peekable_iter.peek().context(NeedsAtLeastOneLine)?; + + let mut builder = SchemaBuilder::new(&first_line.series.measurement); + + for line in peekable_iter { + let series = &line.series; + if &series.measurement != builder.get_measurement_name() { + return Err(Error::OnlyOneMeasurementSupported { + message: format!( + "Saw new measurement {}, had been using measurement {}", + builder.get_measurement_name(), + series.measurement + ), + }); + } + if let Some(tag_set) = &series.tag_set { + for (tag_name, _) in tag_set { + // FIXME avoid the copy / creation of a string! + builder = builder.tag(&tag_name.to_string()); + } + } + for (field_name, field_value) in &line.field_set { + let field_type = match field_value { + FieldValue::F64(_) => DataType::Float, + FieldValue::I64(_) => DataType::Integer, + }; + // FIXME: avoid the copy! + builder = builder.field(&field_name.to_string(), field_type); + } + } + + let schema = builder.build(); + debug!("Deduced line protocol schema: {:#?}", schema); + Ok(LineProtocolConverter { schema }) + } +} + +#[cfg(test)] +mod delorean_ingest_tests { + use super::*; + use line_protocol_schema::ColumnDefinition; + + fn only_good_lines(data: &str) -> impl Iterator> { + delorean_line_parser::parse_lines(data).filter_map(|r| { + assert!(r.is_ok()); + r.ok() + }) + } + + #[test] + fn no_lines() { + let parsed_lines = only_good_lines(""); + let converter_result = LineProtocolConverter::new(parsed_lines); + + assert!(matches!(converter_result, Err(Error::NeedsAtLeastOneLine))); + } + + #[test] + fn one_line() { + let parsed_lines = + only_good_lines("cpu,host=A,region=west usage_system=64i 1590488773254420000"); + + let converter = LineProtocolConverter::new(parsed_lines).expect("conversion successful"); + assert_eq!(converter.schema.measurement(), "cpu"); + + let cols = converter.schema.get_col_defs(); + println!("Converted to {:#?}", cols); + assert_eq!(cols.len(), 4); + assert_eq!(cols[0], ColumnDefinition::new("host", 0, DataType::String)); + assert_eq!( + cols[1], + ColumnDefinition::new("region", 1, DataType::String) + ); + assert_eq!( + cols[2], + ColumnDefinition::new("usage_system", 2, DataType::Integer) + ); + assert_eq!( + cols[3], + ColumnDefinition::new("timestamp", 3, DataType::Timestamp) + ); + } + + #[test] + fn multi_line_same_schema() { + let parsed_lines = only_good_lines( + r#" + cpu,host=A,region=west usage_system=64i 1590488773254420000 + cpu,host=A,region=east usage_system=67i 1590488773254430000"#, + ); + + let converter = LineProtocolConverter::new(parsed_lines).expect("conversion successful"); + assert_eq!(converter.schema.measurement(), "cpu"); + + let cols = converter.schema.get_col_defs(); + println!("Converted to {:#?}", cols); + assert_eq!(cols.len(), 4); + assert_eq!(cols[0], ColumnDefinition::new("host", 0, DataType::String)); + assert_eq!( + cols[1], + ColumnDefinition::new("region", 1, DataType::String) + ); + assert_eq!( + cols[2], + ColumnDefinition::new("usage_system", 2, DataType::Integer) + ); + assert_eq!( + cols[3], + ColumnDefinition::new("timestamp", 3, DataType::Timestamp) + ); + } + + #[test] + fn multi_line_new_field() { + // given two lines of protocol data that have different field names + let parsed_lines = only_good_lines( + r#" + cpu,host=A,region=west usage_system=64i 1590488773254420000 + cpu,host=A,region=east usage_user=61.32 1590488773254430000"#, + ); + + // when we extract the schema + let converter = LineProtocolConverter::new(parsed_lines).expect("conversion successful"); + assert_eq!(converter.schema.measurement(), "cpu"); + + // then both field names appear in the resulting schema + let cols = converter.schema.get_col_defs(); + println!("Converted to {:#?}", cols); + assert_eq!(cols.len(), 5); + assert_eq!(cols[0], ColumnDefinition::new("host", 0, DataType::String)); + assert_eq!( + cols[1], + ColumnDefinition::new("region", 1, DataType::String) + ); + assert_eq!( + cols[2], + ColumnDefinition::new("usage_system", 2, DataType::Integer) + ); + assert_eq!( + cols[3], + ColumnDefinition::new("usage_user", 3, DataType::Float) + ); + assert_eq!( + cols[4], + ColumnDefinition::new("timestamp", 4, DataType::Timestamp) + ); + } + + #[test] + fn multi_line_new_tags() { + // given two lines of protocol data that have different tags + let parsed_lines = only_good_lines( + r#" + cpu,host=A usage_system=64i 1590488773254420000 + cpu,host=A,fail_group=Z usage_system=61i 1590488773254430000"#, + ); + + // when we extract the schema + let converter = LineProtocolConverter::new(parsed_lines).expect("conversion successful"); + assert_eq!(converter.schema.measurement(), "cpu"); + + // Then both tag names appear in the resulting schema + let cols = converter.schema.get_col_defs(); + println!("Converted to {:#?}", cols); + assert_eq!(cols.len(), 4); + assert_eq!(cols[0], ColumnDefinition::new("host", 0, DataType::String)); + assert_eq!( + cols[1], + ColumnDefinition::new("fail_group", 1, DataType::String) + ); + assert_eq!( + cols[2], + ColumnDefinition::new("usage_system", 2, DataType::Integer) + ); + assert_eq!( + cols[3], + ColumnDefinition::new("timestamp", 3, DataType::Timestamp) + ); + } + + #[test] + fn multi_line_field_changed() { + // given two lines of protocol data that have apparently different data types for the field: + let parsed_lines = only_good_lines( + r#" + cpu,host=A usage_system=64i 1590488773254420000 + cpu,host=A usage_system=61.1 1590488773254430000"#, + ); + + // when we extract the schema + let converter = LineProtocolConverter::new(parsed_lines).expect("conversion successful"); + assert_eq!(converter.schema.measurement(), "cpu"); + + // Then the first field type appears in the resulting schema (TBD is this what we want??) + let cols = converter.schema.get_col_defs(); + println!("Converted to {:#?}", cols); + assert_eq!(cols.len(), 3); + assert_eq!(cols[0], ColumnDefinition::new("host", 0, DataType::String)); + assert_eq!( + cols[1], + ColumnDefinition::new("usage_system", 1, DataType::Integer) + ); + assert_eq!( + cols[2], + ColumnDefinition::new("timestamp", 2, DataType::Timestamp) + ); + } + + #[test] + fn multi_line_measurement_changed() { + // given two lines of protocol data for two different measurements + let parsed_lines = only_good_lines( + r#" + cpu,host=A usage_system=64i 1590488773254420000 + vcpu,host=A usage_system=61i 1590488773254430000"#, + ); + + // when we extract the schema + let converter_result = LineProtocolConverter::new(parsed_lines); + + // Then the converter does not support it + assert!(matches!( + converter_result, + Err(Error::OnlyOneMeasurementSupported { message: _ }) + )); + } +} diff --git a/delorean_line_parser/src/lib.rs b/delorean_line_parser/src/lib.rs index 47dc91463e..57ada42853 100644 --- a/delorean_line_parser/src/lib.rs +++ b/delorean_line_parser/src/lib.rs @@ -241,12 +241,31 @@ impl From> for String { } } +impl From<&EscapedStr<'_>> for String { + fn from(other: &EscapedStr<'_>) -> Self { + other.to_string() + } +} + impl<'a> From<&'a str> for EscapedStr<'a> { fn from(other: &'a str) -> Self { Self(smallvec![other]) } } +impl PartialEq for EscapedStr<'_> { + fn eq(&self, other: &String) -> bool { + let s: &str = other; + *self == s + } +} + +impl PartialEq> for String { + fn eq(&self, other: &EscapedStr<'_>) -> bool { + other == self + } +} + impl PartialEq<&str> for EscapedStr<'_> { fn eq(&self, other: &&str) -> bool { let mut head = *other; @@ -261,6 +280,12 @@ impl PartialEq<&str> for EscapedStr<'_> { } } +impl PartialEq> for &str { + fn eq(&self, other: &EscapedStr<'_>) -> bool { + other == self + } +} + pub fn parse_lines(mut i: &str) -> impl Iterator>> { std::iter::from_fn(move || { let (remaining, _) = line_whitespace(i).expect("Cannot fail to parse whitespace"); @@ -1297,4 +1322,21 @@ her"#, Ok(()) } + + #[test] + #[allow(clippy::op_ref)] + // Clippy disabled because it wascomplaining about uselessly + // taking references on both sides of the eq op but that actually + // invokes a different implementation of partial eq which I wanted + // to test. + fn string_comparison() { + let es = EscapedStr::from("foobar"); + let s = String::from("foobar"); + + assert!(es == s); + assert!(s == es); + + assert!(&es == &s); + assert!(&s == &es); + } } diff --git a/delorean_storage_tool/Cargo.toml b/delorean_storage_tool/Cargo.toml index 96b2019a0f..378e9bda9c 100644 --- a/delorean_storage_tool/Cargo.toml +++ b/delorean_storage_tool/Cargo.toml @@ -15,6 +15,7 @@ edition = "2018" # https://doc.rust-lang.org/stable/cargo/reference/specifying-dependencies.html#specifying-dependencies-from-git-repositories #parquet = { path = "/Users/alamb/Software/arrow/rust/parquet", version = "0.18.0-SNAPSHOT" } delorean_line_parser = { path = "../delorean_line_parser" } +delorean_ingest = { path = "../delorean_ingest" } clap = "2.33.1" env_logger = "0.7.1" diff --git a/delorean_storage_tool/src/main.rs b/delorean_storage_tool/src/main.rs index 7a70582312..7133140c53 100644 --- a/delorean_storage_tool/src/main.rs +++ b/delorean_storage_tool/src/main.rs @@ -1,11 +1,14 @@ use std::fs; use std::sync::Arc; -use log::{debug, info}; +use log::{debug, info, warn}; use clap::{crate_authors, crate_version, App, Arg, SubCommand}; use snafu::Snafu; +use delorean_ingest::LineProtocolConverter; +use delorean_line_parser::parse_lines; + #[derive(Debug, Snafu)] pub enum Error { #[snafu(display(r#"IO Error: {} ({})"#, message, source))] @@ -17,6 +20,15 @@ pub enum Error { type Result = std::result::Result; +impl From for Error { + fn from(other: delorean_line_parser::Error) -> Self { + Error::IOError { + message: String::from("Error from parser"), + source: Arc::new(other), + } + } +} + enum ReturnCode { InternalError = 1, ConversionFailed = 2, @@ -28,16 +40,42 @@ fn convert(input_filename: &str, output_filename: &str) -> Result<()> { debug!("Writing to output file {}", output_filename); // TODO: make a streaming parser that you can stream data through in blocks. - // for now, just read the whole thing into RAM... + // for now, just read the whole input file into RAM... let buf = fs::read_to_string(input_filename).map_err(|e| { - let msg = format!("Error reading {}", input_filename); + let message = format!("Error reading {}", input_filename); Error::IOError { - message: msg, + message, source: Arc::new(e), } })?; info!("Read {} bytes from {}", buf.len(), input_filename); + // FIXME: Design something sensible to do with lines that don't + // parse rather than just dropping them on the floor + let only_good_lines = parse_lines(&buf).filter_map(|r| match r { + Ok(line) => Some(line), + Err(e) => { + warn!("Ignorning line with parse error: {}", e); + None + } + }); + + // The idea here is to use the first few parsed lines to deduce the schema + let converter = match LineProtocolConverter::new(only_good_lines) { + Ok(converter) => converter, + Err(e) => { + let message = String::from("Error creating line protocol converter"); + return Err(Error::IOError { + message, + source: Arc::new(e), + }); + } + }; + + debug!("Extracted schema: {:?}", converter.schema()); + + // TODO: convert the sample and remaining good lines + unimplemented!("The actual conversion"); } diff --git a/line_protocol_schema/Cargo.toml b/line_protocol_schema/Cargo.toml index 03eb7825bc..44628c2103 100644 --- a/line_protocol_schema/Cargo.toml +++ b/line_protocol_schema/Cargo.toml @@ -7,3 +7,4 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +log = "0.4.8" diff --git a/line_protocol_schema/src/lib.rs b/line_protocol_schema/src/lib.rs index a7963f8f41..0efc811c15 100644 --- a/line_protocol_schema/src/lib.rs +++ b/line_protocol_schema/src/lib.rs @@ -29,6 +29,8 @@ //! ``` use std::collections::BTreeMap; +use log::warn; + /// Represents a specific Line Protocol Tag name #[derive(Debug, PartialEq)] pub struct Tag { @@ -112,6 +114,10 @@ pub struct Schema { } impl Schema { + pub fn measurement(&self) -> &str { + &self.measurement + } + // Return a Vec of `ColumnDefinition`s such that // `v[idx].index == idx` for all columns // (aka that the vec is in the same order as the columns of the schema @@ -156,7 +162,11 @@ impl SchemaBuilder { } } - /// Add a new tag name to the schema + pub fn get_measurement_name(&self) -> &String { + &self.measurement_name + } + + /// Add a new tag name to the schema. pub fn tag(mut self, name: &str) -> Self { // check for existing tag (FIXME make this faster) if self.tag_names.iter().find(|&s| s == name).is_none() { @@ -175,9 +185,8 @@ impl SchemaBuilder { { Some((_, existing_type)) => { if *existing_type != data_type { - // FIXME: return Result rather than panic here. - panic!("Field '{}' type changed. Previously it had type {:?} but attempted to set type {:?}", - name, existing_type, data_type); + warn!("Ignoring new type for field '{}': Previously it had type {:?}, attempted to set type {:?}.", + name, existing_type, data_type); } } None => { @@ -288,14 +297,19 @@ mod test { } #[test] - #[should_panic] fn duplicate_field_name_different_type() { - SchemaBuilder::new(String::from("my_measurement")) + let schema = SchemaBuilder::new(String::from("my_measurement")) .field("field1", DataType::Float) .field("field1", DataType::Integer) .build(); - // TBD better error handling -- what should happen if there is - // a new type seen for an existing field? + // second Integer definition should be ignored, and type remains float + let cols = schema.get_col_defs(); + assert_eq!(cols.len(), 2); + assert_eq!(cols[0], ColumnDefinition::new("field1", 0, DataType::Float)); + assert_eq!( + cols[1], + ColumnDefinition::new("timestamp", 1, DataType::Timestamp) + ); } #[test]