feat: Line Protocol Schema extractor (#108)

* feat: schema inference from iterator of parsed lines

* fix: Clean up error handing even more

* fix: fmt

* fix: make a sacrifice to the clippy gods
pull/24376/head
Andrew Lamb 2020-06-03 18:29:57 -04:00 committed by GitHub
parent f00f408f34
commit 234b2f5752
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 407 additions and 12 deletions

14
Cargo.lock generated
View File

@ -407,6 +407,16 @@ dependencies = [
"tonic-build 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "delorean_ingest"
version = "0.1.0"
dependencies = [
"delorean_line_parser 0.1.0",
"line_protocol_schema 0.1.0",
"log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
"snafu 0.6.8 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "delorean_line_parser"
version = "0.1.0"
@ -424,6 +434,7 @@ version = "0.1.0"
dependencies = [
"assert_cmd 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
"clap 2.33.1 (registry+https://github.com/rust-lang/crates.io-index)",
"delorean_ingest 0.1.0",
"delorean_line_parser 0.1.0",
"env_logger 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
@ -894,6 +905,9 @@ dependencies = [
[[package]]
name = "line_protocol_schema"
version = "0.1.0"
dependencies = [
"log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "log"

View File

@ -7,6 +7,7 @@ default-run = "delorean"
[workspace]
members = [
"delorean_ingest",
"delorean_line_parser",
"delorean_storage_tool",
"delorean_test_helpers",

View File

@ -0,0 +1,13 @@
[package]
name = "delorean_ingest"
version = "0.1.0"
authors = ["Andrew Lamb <alamb@influxdata.com>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
snafu = "0.6.2"
log = "0.4.8"
delorean_line_parser = { path = "../delorean_line_parser" }
line_protocol_schema = { path = "../line_protocol_schema" }

271
delorean_ingest/src/lib.rs Normal file
View File

@ -0,0 +1,271 @@
//! Library with code for (aspirationally) ingesting various data formats into Delorean
use log::debug;
use snafu::{OptionExt, Snafu};
use delorean_line_parser::{FieldValue, ParsedLine};
use line_protocol_schema::{DataType, Schema, SchemaBuilder};
/// Handles converting raw line protocol `ParsedLine` structures into Delorean format.
pub struct LineProtocolConverter {
// Schema is used in tests and will be used to actually convert data shortly
schema: Schema,
}
#[derive(Snafu, Debug)]
pub enum Error {
/// Conversion needs at least one line of data
NeedsAtLeastOneLine,
// Only a single line protocol measurement field is currently supported
#[snafu(display(r#"More than one measurement not yet supported: {}"#, message))]
OnlyOneMeasurementSupported { message: String },
}
/// `LineProtocolConverter` are used to
/// converting an iterator of `ParsedLines` into the delorean
/// internal columnar data format (exactly what this is is TBD).
///
impl LineProtocolConverter {
pub fn schema(&self) -> &Schema {
&self.schema
}
/// Create a new `LineProtocolConverter` by extracting the implied
/// schema from an iterator of ParsedLines.
///
/// The converter can subsequently be used for any `ParsedLine`'s
/// that have the same schema (e.g. tag names, field names,
/// measurements).
///
pub fn new<'a>(
lines: impl Iterator<Item = ParsedLine<'a>>,
) -> Result<LineProtocolConverter, Error> {
let mut peekable_iter = lines.peekable();
let first_line = peekable_iter.peek().context(NeedsAtLeastOneLine)?;
let mut builder = SchemaBuilder::new(&first_line.series.measurement);
for line in peekable_iter {
let series = &line.series;
if &series.measurement != builder.get_measurement_name() {
return Err(Error::OnlyOneMeasurementSupported {
message: format!(
"Saw new measurement {}, had been using measurement {}",
builder.get_measurement_name(),
series.measurement
),
});
}
if let Some(tag_set) = &series.tag_set {
for (tag_name, _) in tag_set {
// FIXME avoid the copy / creation of a string!
builder = builder.tag(&tag_name.to_string());
}
}
for (field_name, field_value) in &line.field_set {
let field_type = match field_value {
FieldValue::F64(_) => DataType::Float,
FieldValue::I64(_) => DataType::Integer,
};
// FIXME: avoid the copy!
builder = builder.field(&field_name.to_string(), field_type);
}
}
let schema = builder.build();
debug!("Deduced line protocol schema: {:#?}", schema);
Ok(LineProtocolConverter { schema })
}
}
#[cfg(test)]
mod delorean_ingest_tests {
use super::*;
use line_protocol_schema::ColumnDefinition;
fn only_good_lines(data: &str) -> impl Iterator<Item = ParsedLine<'_>> {
delorean_line_parser::parse_lines(data).filter_map(|r| {
assert!(r.is_ok());
r.ok()
})
}
#[test]
fn no_lines() {
let parsed_lines = only_good_lines("");
let converter_result = LineProtocolConverter::new(parsed_lines);
assert!(matches!(converter_result, Err(Error::NeedsAtLeastOneLine)));
}
#[test]
fn one_line() {
let parsed_lines =
only_good_lines("cpu,host=A,region=west usage_system=64i 1590488773254420000");
let converter = LineProtocolConverter::new(parsed_lines).expect("conversion successful");
assert_eq!(converter.schema.measurement(), "cpu");
let cols = converter.schema.get_col_defs();
println!("Converted to {:#?}", cols);
assert_eq!(cols.len(), 4);
assert_eq!(cols[0], ColumnDefinition::new("host", 0, DataType::String));
assert_eq!(
cols[1],
ColumnDefinition::new("region", 1, DataType::String)
);
assert_eq!(
cols[2],
ColumnDefinition::new("usage_system", 2, DataType::Integer)
);
assert_eq!(
cols[3],
ColumnDefinition::new("timestamp", 3, DataType::Timestamp)
);
}
#[test]
fn multi_line_same_schema() {
let parsed_lines = only_good_lines(
r#"
cpu,host=A,region=west usage_system=64i 1590488773254420000
cpu,host=A,region=east usage_system=67i 1590488773254430000"#,
);
let converter = LineProtocolConverter::new(parsed_lines).expect("conversion successful");
assert_eq!(converter.schema.measurement(), "cpu");
let cols = converter.schema.get_col_defs();
println!("Converted to {:#?}", cols);
assert_eq!(cols.len(), 4);
assert_eq!(cols[0], ColumnDefinition::new("host", 0, DataType::String));
assert_eq!(
cols[1],
ColumnDefinition::new("region", 1, DataType::String)
);
assert_eq!(
cols[2],
ColumnDefinition::new("usage_system", 2, DataType::Integer)
);
assert_eq!(
cols[3],
ColumnDefinition::new("timestamp", 3, DataType::Timestamp)
);
}
#[test]
fn multi_line_new_field() {
// given two lines of protocol data that have different field names
let parsed_lines = only_good_lines(
r#"
cpu,host=A,region=west usage_system=64i 1590488773254420000
cpu,host=A,region=east usage_user=61.32 1590488773254430000"#,
);
// when we extract the schema
let converter = LineProtocolConverter::new(parsed_lines).expect("conversion successful");
assert_eq!(converter.schema.measurement(), "cpu");
// then both field names appear in the resulting schema
let cols = converter.schema.get_col_defs();
println!("Converted to {:#?}", cols);
assert_eq!(cols.len(), 5);
assert_eq!(cols[0], ColumnDefinition::new("host", 0, DataType::String));
assert_eq!(
cols[1],
ColumnDefinition::new("region", 1, DataType::String)
);
assert_eq!(
cols[2],
ColumnDefinition::new("usage_system", 2, DataType::Integer)
);
assert_eq!(
cols[3],
ColumnDefinition::new("usage_user", 3, DataType::Float)
);
assert_eq!(
cols[4],
ColumnDefinition::new("timestamp", 4, DataType::Timestamp)
);
}
#[test]
fn multi_line_new_tags() {
// given two lines of protocol data that have different tags
let parsed_lines = only_good_lines(
r#"
cpu,host=A usage_system=64i 1590488773254420000
cpu,host=A,fail_group=Z usage_system=61i 1590488773254430000"#,
);
// when we extract the schema
let converter = LineProtocolConverter::new(parsed_lines).expect("conversion successful");
assert_eq!(converter.schema.measurement(), "cpu");
// Then both tag names appear in the resulting schema
let cols = converter.schema.get_col_defs();
println!("Converted to {:#?}", cols);
assert_eq!(cols.len(), 4);
assert_eq!(cols[0], ColumnDefinition::new("host", 0, DataType::String));
assert_eq!(
cols[1],
ColumnDefinition::new("fail_group", 1, DataType::String)
);
assert_eq!(
cols[2],
ColumnDefinition::new("usage_system", 2, DataType::Integer)
);
assert_eq!(
cols[3],
ColumnDefinition::new("timestamp", 3, DataType::Timestamp)
);
}
#[test]
fn multi_line_field_changed() {
// given two lines of protocol data that have apparently different data types for the field:
let parsed_lines = only_good_lines(
r#"
cpu,host=A usage_system=64i 1590488773254420000
cpu,host=A usage_system=61.1 1590488773254430000"#,
);
// when we extract the schema
let converter = LineProtocolConverter::new(parsed_lines).expect("conversion successful");
assert_eq!(converter.schema.measurement(), "cpu");
// Then the first field type appears in the resulting schema (TBD is this what we want??)
let cols = converter.schema.get_col_defs();
println!("Converted to {:#?}", cols);
assert_eq!(cols.len(), 3);
assert_eq!(cols[0], ColumnDefinition::new("host", 0, DataType::String));
assert_eq!(
cols[1],
ColumnDefinition::new("usage_system", 1, DataType::Integer)
);
assert_eq!(
cols[2],
ColumnDefinition::new("timestamp", 2, DataType::Timestamp)
);
}
#[test]
fn multi_line_measurement_changed() {
// given two lines of protocol data for two different measurements
let parsed_lines = only_good_lines(
r#"
cpu,host=A usage_system=64i 1590488773254420000
vcpu,host=A usage_system=61i 1590488773254430000"#,
);
// when we extract the schema
let converter_result = LineProtocolConverter::new(parsed_lines);
// Then the converter does not support it
assert!(matches!(
converter_result,
Err(Error::OnlyOneMeasurementSupported { message: _ })
));
}
}

View File

@ -241,12 +241,31 @@ impl From<EscapedStr<'_>> for String {
}
}
impl From<&EscapedStr<'_>> for String {
fn from(other: &EscapedStr<'_>) -> Self {
other.to_string()
}
}
impl<'a> From<&'a str> for EscapedStr<'a> {
fn from(other: &'a str) -> Self {
Self(smallvec![other])
}
}
impl PartialEq<String> for EscapedStr<'_> {
fn eq(&self, other: &String) -> bool {
let s: &str = other;
*self == s
}
}
impl PartialEq<EscapedStr<'_>> for String {
fn eq(&self, other: &EscapedStr<'_>) -> bool {
other == self
}
}
impl PartialEq<&str> for EscapedStr<'_> {
fn eq(&self, other: &&str) -> bool {
let mut head = *other;
@ -261,6 +280,12 @@ impl PartialEq<&str> for EscapedStr<'_> {
}
}
impl PartialEq<EscapedStr<'_>> for &str {
fn eq(&self, other: &EscapedStr<'_>) -> bool {
other == self
}
}
pub fn parse_lines(mut i: &str) -> impl Iterator<Item = Result<ParsedLine<'_>>> {
std::iter::from_fn(move || {
let (remaining, _) = line_whitespace(i).expect("Cannot fail to parse whitespace");
@ -1297,4 +1322,21 @@ her"#,
Ok(())
}
#[test]
#[allow(clippy::op_ref)]
// Clippy disabled because it wascomplaining about uselessly
// taking references on both sides of the eq op but that actually
// invokes a different implementation of partial eq which I wanted
// to test.
fn string_comparison() {
let es = EscapedStr::from("foobar");
let s = String::from("foobar");
assert!(es == s);
assert!(s == es);
assert!(&es == &s);
assert!(&s == &es);
}
}

View File

@ -15,6 +15,7 @@ edition = "2018"
# https://doc.rust-lang.org/stable/cargo/reference/specifying-dependencies.html#specifying-dependencies-from-git-repositories
#parquet = { path = "/Users/alamb/Software/arrow/rust/parquet", version = "0.18.0-SNAPSHOT" }
delorean_line_parser = { path = "../delorean_line_parser" }
delorean_ingest = { path = "../delorean_ingest" }
clap = "2.33.1"
env_logger = "0.7.1"

View File

@ -1,11 +1,14 @@
use std::fs;
use std::sync::Arc;
use log::{debug, info};
use log::{debug, info, warn};
use clap::{crate_authors, crate_version, App, Arg, SubCommand};
use snafu::Snafu;
use delorean_ingest::LineProtocolConverter;
use delorean_line_parser::parse_lines;
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display(r#"IO Error: {} ({})"#, message, source))]
@ -17,6 +20,15 @@ pub enum Error {
type Result<T, E = Error> = std::result::Result<T, E>;
impl From<delorean_line_parser::Error> for Error {
fn from(other: delorean_line_parser::Error) -> Self {
Error::IOError {
message: String::from("Error from parser"),
source: Arc::new(other),
}
}
}
enum ReturnCode {
InternalError = 1,
ConversionFailed = 2,
@ -28,16 +40,42 @@ fn convert(input_filename: &str, output_filename: &str) -> Result<()> {
debug!("Writing to output file {}", output_filename);
// TODO: make a streaming parser that you can stream data through in blocks.
// for now, just read the whole thing into RAM...
// for now, just read the whole input file into RAM...
let buf = fs::read_to_string(input_filename).map_err(|e| {
let msg = format!("Error reading {}", input_filename);
let message = format!("Error reading {}", input_filename);
Error::IOError {
message: msg,
message,
source: Arc::new(e),
}
})?;
info!("Read {} bytes from {}", buf.len(), input_filename);
// FIXME: Design something sensible to do with lines that don't
// parse rather than just dropping them on the floor
let only_good_lines = parse_lines(&buf).filter_map(|r| match r {
Ok(line) => Some(line),
Err(e) => {
warn!("Ignorning line with parse error: {}", e);
None
}
});
// The idea here is to use the first few parsed lines to deduce the schema
let converter = match LineProtocolConverter::new(only_good_lines) {
Ok(converter) => converter,
Err(e) => {
let message = String::from("Error creating line protocol converter");
return Err(Error::IOError {
message,
source: Arc::new(e),
});
}
};
debug!("Extracted schema: {:?}", converter.schema());
// TODO: convert the sample and remaining good lines
unimplemented!("The actual conversion");
}

View File

@ -7,3 +7,4 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
log = "0.4.8"

View File

@ -29,6 +29,8 @@
//! ```
use std::collections::BTreeMap;
use log::warn;
/// Represents a specific Line Protocol Tag name
#[derive(Debug, PartialEq)]
pub struct Tag {
@ -112,6 +114,10 @@ pub struct Schema {
}
impl Schema {
pub fn measurement(&self) -> &str {
&self.measurement
}
// Return a Vec of `ColumnDefinition`s such that
// `v[idx].index == idx` for all columns
// (aka that the vec is in the same order as the columns of the schema
@ -156,7 +162,11 @@ impl SchemaBuilder {
}
}
/// Add a new tag name to the schema
pub fn get_measurement_name(&self) -> &String {
&self.measurement_name
}
/// Add a new tag name to the schema.
pub fn tag(mut self, name: &str) -> Self {
// check for existing tag (FIXME make this faster)
if self.tag_names.iter().find(|&s| s == name).is_none() {
@ -175,8 +185,7 @@ impl SchemaBuilder {
{
Some((_, existing_type)) => {
if *existing_type != data_type {
// FIXME: return Result rather than panic here.
panic!("Field '{}' type changed. Previously it had type {:?} but attempted to set type {:?}",
warn!("Ignoring new type for field '{}': Previously it had type {:?}, attempted to set type {:?}.",
name, existing_type, data_type);
}
}
@ -288,14 +297,19 @@ mod test {
}
#[test]
#[should_panic]
fn duplicate_field_name_different_type() {
SchemaBuilder::new(String::from("my_measurement"))
let schema = SchemaBuilder::new(String::from("my_measurement"))
.field("field1", DataType::Float)
.field("field1", DataType::Integer)
.build();
// TBD better error handling -- what should happen if there is
// a new type seen for an existing field?
// second Integer definition should be ignored, and type remains float
let cols = schema.get_col_defs();
assert_eq!(cols.len(), 2);
assert_eq!(cols[0], ColumnDefinition::new("field1", 0, DataType::Float));
assert_eq!(
cols[1],
ColumnDefinition::new("timestamp", 1, DataType::Timestamp)
);
}
#[test]