Merge pull request #173 from influxdata/idiomatic-snafu

pull/24376/head
Jake Goulding 2020-06-26 14:22:42 -04:00 committed by GitHub
commit f5d73ee580
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 320 additions and 261 deletions

View File

@ -9,21 +9,23 @@
clippy::use_self clippy::use_self
)] )]
use std::collections::{BTreeMap, BTreeSet};
use std::io::{BufRead, Seek};
use log::debug;
use snafu::{ResultExt, Snafu};
use delorean_line_parser::{FieldValue, ParsedLine}; use delorean_line_parser::{FieldValue, ParsedLine};
use delorean_table::{ use delorean_table::{
packers::{Packer, Packers}, packers::{Packer, Packers},
ByteArray, DeloreanTableWriter, DeloreanTableWriterSource, Error as TableError, ByteArray, DeloreanTableWriter, DeloreanTableWriterSource, Error as TableError,
}; };
use delorean_table_schema::{DataType, Schema, SchemaBuilder}; use delorean_table_schema::{DataType, Schema, SchemaBuilder};
use delorean_tsm::mapper::{map_field_columns, ColumnData, MeasurementTable, TSMMeasurementMapper}; use delorean_tsm::{
use delorean_tsm::reader::{TSMBlockReader, TSMIndexReader}; mapper::{map_field_columns, ColumnData, MeasurementTable, TSMMeasurementMapper},
use delorean_tsm::{BlockType, TSMError}; reader::{TSMBlockReader, TSMIndexReader},
BlockType, TSMError,
};
use log::debug;
use snafu::{ensure, OptionExt, ResultExt, Snafu};
use std::{
collections::{BTreeMap, BTreeSet},
io::{BufRead, Seek},
};
#[derive(Debug, Clone, Copy)] #[derive(Debug, Clone, Copy)]
pub struct ConversionSettings { pub struct ConversionSettings {
@ -66,8 +68,8 @@ pub enum Error {
NeedsAtLeastOneLine, NeedsAtLeastOneLine,
// Only a single line protocol measurement field is currently supported // Only a single line protocol measurement field is currently supported
#[snafu(display(r#"More than one measurement not yet supported: {}"#, message))] #[snafu(display(r#"More than one measurement not yet supported: Saw new measurement {}, had been using measurement {}"#, actual, expected))]
OnlyOneMeasurementSupported { message: String }, OnlyOneMeasurementSupported { expected: String, actual: String },
#[snafu(display(r#"Error writing to TableWriter: {}"#, source))] #[snafu(display(r#"Error writing to TableWriter: {}"#, source))]
Writing { source: TableError }, Writing { source: TableError },
@ -77,6 +79,14 @@ pub enum Error {
#[snafu(display(r#"Error processing TSM File: {}"#, source))] #[snafu(display(r#"Error processing TSM File: {}"#, source))]
TSMProcessing { source: TSMError }, TSMProcessing { source: TSMError },
// TODO clean this error up
#[snafu(display(r#"could not find ts column"#))]
CouldNotFindTsColumn,
// TODO clean this error up
#[snafu(display(r#"could not find column"#))]
CouldNotFindColumn,
} }
/// Handles buffering `ParsedLine` objects and deducing a schema from that sample /// Handles buffering `ParsedLine` objects and deducing a schema from that sample
@ -269,23 +279,22 @@ impl<'a> MeasurementSampler<'a> {
/// Use the contents of self.schema_sample to deduce the Schema of /// Use the contents of self.schema_sample to deduce the Schema of
/// `ParsedLine`s and return the deduced schema /// `ParsedLine`s and return the deduced schema
fn deduce_schema_from_sample(&mut self) -> Result<Schema, Error> { fn deduce_schema_from_sample(&mut self) -> Result<Schema, Error> {
if self.schema_sample.is_empty() { ensure!(!self.schema_sample.is_empty(), NeedsAtLeastOneLine);
return Err(Error::NeedsAtLeastOneLine {});
}
let mut builder = SchemaBuilder::new(self.schema_sample[0].series.measurement.as_str()); let mut builder = SchemaBuilder::new(self.schema_sample[0].series.measurement.as_str());
for line in &self.schema_sample { for line in &self.schema_sample {
let series = &line.series; let series = &line.series;
if &series.measurement != builder.get_measurement_name() {
return Err(Error::OnlyOneMeasurementSupported { let measurement_name = builder.get_measurement_name();
message: format!( ensure!(
"Saw new measurement {}, had been using measurement {}", series.measurement == measurement_name,
builder.get_measurement_name(), OnlyOneMeasurementSupported {
series.measurement actual: measurement_name,
), expected: &series.measurement,
});
} }
);
if let Some(tag_set) = &series.tag_set { if let Some(tag_set) = &series.tag_set {
for (tag_name, _) in tag_set { for (tag_name, _) in tag_set {
builder = builder.tag(tag_name.as_str()); builder = builder.tag(tag_name.as_str());
@ -521,15 +530,14 @@ impl TSMFileConverter {
index_stream_size: usize, index_stream_size: usize,
block_stream: impl BufRead + Seek, block_stream: impl BufRead + Seek,
) -> Result<(), Error> { ) -> Result<(), Error> {
let index_reader = TSMIndexReader::try_new(index_stream, index_stream_size) let index_reader =
.map_err(|e| Error::TSMProcessing { source: e })?; TSMIndexReader::try_new(index_stream, index_stream_size).context(TSMProcessing)?;
let mut block_reader = TSMBlockReader::new(block_stream); let mut block_reader = TSMBlockReader::new(block_stream);
let mapper = TSMMeasurementMapper::new(index_reader.peekable()); let mapper = TSMMeasurementMapper::new(index_reader.peekable());
for measurement in mapper { for measurement in mapper {
match measurement { let mut m = measurement.context(TSMProcessing)?;
Ok(mut m) => {
let (schema, packed_columns) = let (schema, packed_columns) =
Self::process_measurement_table(&mut block_reader, &mut m)?; Self::process_measurement_table(&mut block_reader, &mut m)?;
@ -540,13 +548,8 @@ impl TSMFileConverter {
table_writer table_writer
.write_batch(&packed_columns) .write_batch(&packed_columns)
.map_err(|e| Error::WriterCreation { source: e })?; .context(WriterCreation)?;
table_writer table_writer.close().context(WriterCreation)?;
.close()
.map_err(|e| Error::WriterCreation { source: e })?;
}
Err(e) => return Err(Error::TSMProcessing { source: e }),
}
} }
Ok(()) Ok(())
} }
@ -592,19 +595,14 @@ impl TSMFileConverter {
// to the packer_column. // to the packer_column.
for (i, (tag_set_pair, blocks)) in m.tag_set_fields_blocks().iter_mut().enumerate() { for (i, (tag_set_pair, blocks)) in m.tag_set_fields_blocks().iter_mut().enumerate() {
let (ts, field_cols) = map_field_columns(&mut block_reader, blocks) let (ts, field_cols) =
.map_err(|e| Error::TSMProcessing { source: e })?; map_field_columns(&mut block_reader, blocks).context(TSMProcessing)?;
// Start with the timestamp column. // Start with the timestamp column.
let col_len = ts.len(); let col_len = ts.len();
let ts_idx = name_packer let ts_idx = name_packer
.get(schema.timestamp()) .get(schema.timestamp())
.ok_or(Error::TSMProcessing { .context(CouldNotFindTsColumn)?;
// TODO clean this error up
source: TSMError {
description: "could not find ts column".to_string(),
},
})?;
if i == 0 { if i == 0 {
packed_columns[*ts_idx] = Packers::from(ts); packed_columns[*ts_idx] = Packers::from(ts);
@ -617,12 +615,7 @@ impl TSMFileConverter {
// Next let's pad out all of the tag columns we know have // Next let's pad out all of the tag columns we know have
// repeated values. // repeated values.
for (tag_key, tag_value) in tag_set_pair { for (tag_key, tag_value) in tag_set_pair {
let idx = name_packer.get(tag_key).ok_or(Error::TSMProcessing { let idx = name_packer.get(tag_key).context(CouldNotFindColumn)?;
// TODO clean this error up
source: TSMError {
description: "could not find column".to_string(),
},
})?;
// this will create a column of repeated values. // this will create a column of repeated values.
if i == 0 { if i == 0 {
@ -646,12 +639,7 @@ impl TSMFileConverter {
continue; continue;
} }
let idx = name_packer.get(key).ok_or(Error::TSMProcessing { let idx = name_packer.get(key).context(CouldNotFindColumn)?;
// TODO clean this error up
source: TSMError {
description: "could not find column".to_string(),
},
})?;
if i == 0 { if i == 0 {
// creates a column of repeated None values. // creates a column of repeated None values.
@ -669,12 +657,7 @@ impl TSMFileConverter {
// Next let's write out all of the field column data. // Next let's write out all of the field column data.
let mut got_field_cols = Vec::new(); let mut got_field_cols = Vec::new();
for (field_key, field_values) in field_cols { for (field_key, field_values) in field_cols {
let idx = name_packer.get(&field_key).ok_or(Error::TSMProcessing { let idx = name_packer.get(&field_key).context(CouldNotFindColumn)?;
// TODO clean this error up
source: TSMError {
description: "could not find column".to_string(),
},
})?;
if i == 0 { if i == 0 {
match field_values { match field_values {
@ -725,12 +708,7 @@ impl TSMFileConverter {
continue; continue;
} }
let idx = name_packer.get(key).ok_or(Error::TSMProcessing { let idx = name_packer.get(key).context(CouldNotFindColumn)?;
// TODO clean this error up
source: TSMError {
description: "could not find column".to_string(),
},
})?;
// this will create a column of repeated None values. // this will create a column of repeated None values.
if i == 0 { if i == 0 {
@ -1160,7 +1138,7 @@ mod delorean_ingest_tests {
// Then the converter does not support it // Then the converter does not support it
assert!(matches!( assert!(matches!(
schema_result, schema_result,
Err(Error::OnlyOneMeasurementSupported { message: _ }) Err(Error::OnlyOneMeasurementSupported { .. })
)); ));
} }

View File

@ -326,6 +326,21 @@ impl<'a> From<&'a str> for EscapedStr<'a> {
} }
} }
impl From<EscapedStr<'_>> for String {
fn from(other: EscapedStr<'_>) -> Self {
match other {
EscapedStr::SingleSlice(s) => s.into(),
EscapedStr::CopiedValue(s) => s,
}
}
}
impl From<&EscapedStr<'_>> for String {
fn from(other: &EscapedStr<'_>) -> Self {
other.to_string()
}
}
impl PartialEq<&str> for EscapedStr<'_> { impl PartialEq<&str> for EscapedStr<'_> {
fn eq(&self, other: &&str) -> bool { fn eq(&self, other: &&str) -> bool {
self.as_str() == *other self.as_str() == *other

View File

@ -5,6 +5,7 @@ use parquet::errors::ParquetError;
#[derive(Debug, Snafu)] #[derive(Debug, Snafu)]
pub enum Error { pub enum Error {
#[snafu(display(r#"{}, underlying parquet error {}"#, message, source))] #[snafu(display(r#"{}, underlying parquet error {}"#, message, source))]
#[snafu(visibility(pub(crate)))]
ParquetLibraryError { ParquetLibraryError {
message: String, message: String,
source: ParquetError, source: ParquetError,

View File

@ -1,17 +1,13 @@
//! Provide storage statistics for parquet files //! Provide storage statistics for parquet files
use std::io::{Read, Seek}; use delorean_table_schema::DataType;
use parquet::{ use parquet::{
file::reader::{FileReader, SerializedFileReader}, file::reader::{FileReader, SerializedFileReader},
schema, schema,
}; };
use snafu::ResultExt;
use std::io::{Read, Seek};
use delorean_table_schema::DataType; use crate::{error::Result, Length, TryClone};
use crate::{
error::{Error, Result},
Length, TryClone,
};
pub fn parquet_schema_as_string(parquet_schema: &schema::types::Type) -> String { pub fn parquet_schema_as_string(parquet_schema: &schema::types::Type) -> String {
let mut parquet_schema_string = Vec::new(); let mut parquet_schema_string = Vec::new();
@ -42,9 +38,8 @@ where
{ {
let input_len = input.len(); let input_len = input.len();
let reader = SerializedFileReader::new(input).map_err(|e| Error::ParquetLibraryError { let reader = SerializedFileReader::new(input).context(crate::error::ParquetLibraryError {
message: String::from("Creating parquet reader"), message: "Creating parquet reader",
source: e,
})?; })?;
let parquet_metadata = reader.metadata(); let parquet_metadata = reader.metadata();

View File

@ -1,19 +1,18 @@
//! Provide storage statistics for parquet files //! Provide storage statistics for parquet files
use std::collections::BTreeMap;
use std::convert::TryInto;
use std::io::{Read, Seek};
use log::debug;
use parquet::basic::{Compression, Encoding};
use parquet::file::reader::{FileReader, SerializedFileReader};
use delorean_table::stats::{ColumnStats, ColumnStatsBuilder}; use delorean_table::stats::{ColumnStats, ColumnStatsBuilder};
use log::debug;
use crate::{ use parquet::{
error::{Error, Result}, basic::{Compression, Encoding},
metadata::data_type_from_parquet_type, file::reader::{FileReader, SerializedFileReader},
Length, TryClone,
}; };
use snafu::ResultExt;
use std::{
collections::BTreeMap,
convert::TryInto,
io::{Read, Seek},
};
use crate::{error::Result, metadata::data_type_from_parquet_type, Length, TryClone};
/// Calculate storage statistics for a particular parquet file that can /// Calculate storage statistics for a particular parquet file that can
/// be read from `input`, with a total size of `input_size` byes /// be read from `input`, with a total size of `input_size` byes
@ -23,9 +22,8 @@ pub fn col_stats<R: 'static>(input: R) -> Result<Vec<ColumnStats>>
where where
R: Read + Seek + TryClone + Length, R: Read + Seek + TryClone + Length,
{ {
let reader = SerializedFileReader::new(input).map_err(|e| Error::ParquetLibraryError { let reader = SerializedFileReader::new(input).context(crate::error::ParquetLibraryError {
message: String::from("Creating parquet reader"), message: "Creating parquet reader",
source: e,
})?; })?;
let mut stats_builders = BTreeMap::new(); let mut stats_builders = BTreeMap::new();

View File

@ -1,10 +1,4 @@
//! This module contains the code to write delorean table data to parquet //! This module contains the code to write delorean table data to parquet
use std::{
fmt,
io::{Seek, Write},
rc::Rc,
};
use log::debug; use log::debug;
use parquet::{ use parquet::{
basic::{Compression, Encoding, LogicalType, Repetition, Type as PhysicalType}, basic::{Compression, Encoding, LogicalType, Repetition, Type as PhysicalType},
@ -16,7 +10,12 @@ use parquet::{
}, },
schema::types::{ColumnPath, Type}, schema::types::{ColumnPath, Type},
}; };
use snafu::{ResultExt, Snafu}; use snafu::{OptionExt, ResultExt, Snafu};
use std::{
fmt,
io::{Seek, Write},
rc::Rc,
};
use crate::metadata::parquet_schema_as_string; use crate::metadata::parquet_schema_as_string;
use delorean_table::{DeloreanTableWriter, Error as TableError, Packers}; use delorean_table::{DeloreanTableWriter, Error as TableError, Packers};
@ -28,8 +27,8 @@ pub enum Error {
message: String, message: String,
source: ParquetError, source: ParquetError,
}, },
#[snafu(display(r#"{}"#, message))] #[snafu(display(r#"Could not get packer for column {}"#, column_number))]
MismatchedColumns { message: String }, MismatchedColumns { column_number: usize },
} }
impl From<Error> for TableError { impl From<Error> for TableError {
@ -142,16 +141,11 @@ where
message: String::from("Can't create the next row_group_writer"), message: String::from("Can't create the next row_group_writer"),
})? })?
{ {
let packer = match packers.get(column_number) { let packer = packers
Some(packer) => packer, .get(column_number)
None => { .context(MismatchedColumns { column_number })
return Err(TableError::Other { .map_err(TableError::from_other)?;
source: Box::new(Error::MismatchedColumns {
message: format!("Could not get packer for column {}", column_number),
}),
});
}
};
// TODO(edd) This seems super awkward and not the right way to do it... // TODO(edd) This seems super awkward and not the right way to do it...
// We know we have a direct mapping between a col_writer (ColumnWriter) // We know we have a direct mapping between a col_writer (ColumnWriter)
// type and a Packers variant. We also know that we do exactly the same // type and a Packers variant. We also know that we do exactly the same

View File

@ -44,6 +44,21 @@ pub enum Error {
}, },
} }
impl Error {
pub fn from_io(source: std::io::Error, message: impl Into<String>) -> Self {
Self::IO {
source,
message: message.into(),
}
}
pub fn from_other(source: impl std::error::Error + 'static) -> Self {
Self::Other {
source: Box::new(source),
}
}
}
/// Something that knows how to write a set of columns somewhere /// Something that knows how to write a set of columns somewhere
pub trait DeloreanTableWriter { pub trait DeloreanTableWriter {
/// Writes a batch of packed data to the underlying output /// Writes a batch of packed data to the underlying output

View File

@ -196,7 +196,7 @@ impl SchemaBuilder {
} }
} }
pub fn get_measurement_name(&self) -> &String { pub fn get_measurement_name(&self) -> &str {
&self.measurement_name &self.measurement_name
} }

View File

@ -1,16 +1,24 @@
use delorean_ingest::{ConversionSettings, LineProtocolConverter, TSMFileConverter}; use delorean_ingest::{
ConversionSettings, Error as IngestError, LineProtocolConverter, TSMFileConverter,
};
use delorean_line_parser::parse_lines; use delorean_line_parser::parse_lines;
use delorean_parquet::writer::DeloreanParquetTableWriter; use delorean_parquet::writer::DeloreanParquetTableWriter;
use delorean_parquet::writer::Error as ParquetWriterError;
use delorean_table::{DeloreanTableWriter, DeloreanTableWriterSource, Error as TableError}; use delorean_table::{DeloreanTableWriter, DeloreanTableWriterSource, Error as TableError};
use delorean_table_schema::Schema; use delorean_table_schema::Schema;
use log::{debug, info, warn}; use log::{debug, info, warn};
use std::convert::TryInto; use snafu::{ResultExt, Snafu};
use std::fs; use std::{
use std::io::Read; convert::TryInto,
use std::path::{Path, PathBuf}; fs,
io::Read,
path::{Path, PathBuf},
};
use crate::commands::error::{Error, Result};
use crate::commands::input::{FileType, InputReader}; use crate::commands::input::{FileType, InputReader};
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Creates `DeloreanParquetTableWriter` suitable for writing to a single file /// Creates `DeloreanParquetTableWriter` suitable for writing to a single file
#[derive(Debug)] #[derive(Debug)]
struct ParquetFileWriterSource { struct ParquetFileWriterSource {
@ -24,16 +32,18 @@ impl DeloreanTableWriterSource for ParquetFileWriterSource {
// Returns a `DeloreanTableWriter suitable for writing data from packers. // Returns a `DeloreanTableWriter suitable for writing data from packers.
fn next_writer(&mut self, schema: &Schema) -> Result<Box<dyn DeloreanTableWriter>, TableError> { fn next_writer(&mut self, schema: &Schema) -> Result<Box<dyn DeloreanTableWriter>, TableError> {
if self.made_file { if self.made_file {
return Err(TableError::Other { return MultipleMeasurementsToSingleFile {
source: Box::new(Error::MultipleMeasurementsToSingleFile { new_measurement_name: schema.measurement(),
new_measurement_name: String::from(schema.measurement()), }
}), .fail()
}); .map_err(TableError::from_other);
} }
let output_file = fs::File::create(&self.output_filename).map_err(|e| TableError::IO { let output_file = fs::File::create(&self.output_filename).map_err(|e| {
message: format!("Error creating output file {}", self.output_filename), TableError::from_io(
source: e, e,
format!("Error creating output file {}", self.output_filename),
)
})?; })?;
info!( info!(
"Writing output for measurement {} to {} ...", "Writing output for measurement {} to {} ...",
@ -41,11 +51,9 @@ impl DeloreanTableWriterSource for ParquetFileWriterSource {
self.output_filename self.output_filename
); );
let writer = DeloreanParquetTableWriter::new(schema, output_file).map_err(|e| { let writer = DeloreanParquetTableWriter::new(schema, output_file)
TableError::Other { .context(UnableToCreateParquetTableWriter)
source: Box::new(Error::UnableToCreateParquetTableWriter { source: e }), .map_err(TableError::from_other)?;
}
})?;
self.made_file = true; self.made_file = true;
Ok(Box::new(writer)) Ok(Box::new(writer))
} }
@ -68,9 +76,11 @@ impl DeloreanTableWriterSource for ParquetDirectoryWriterSource {
output_file_path.push(schema.measurement()); output_file_path.push(schema.measurement());
output_file_path.set_extension("parquet"); output_file_path.set_extension("parquet");
let output_file = fs::File::create(&output_file_path).map_err(|e| TableError::IO { let output_file = fs::File::create(&output_file_path).map_err(|e| {
message: format!("Error creating output file {:?}", output_file_path), TableError::from_io(
source: e, e,
format!("Error creating output file {:?}", output_file_path),
)
})?; })?;
info!( info!(
"Writing output for measurement {} to {:?} ...", "Writing output for measurement {} to {:?} ...",
@ -78,11 +88,9 @@ impl DeloreanTableWriterSource for ParquetDirectoryWriterSource {
output_file_path output_file_path
); );
let writer = DeloreanParquetTableWriter::new(schema, output_file).map_err(|e| { let writer = DeloreanParquetTableWriter::new(schema, output_file)
TableError::Other { .context(UnableToCreateParquetTableWriter)
source: Box::new(Error::UnableToCreateParquetTableWriter { source: e }), .map_err(TableError::from_other)?;
}
})?;
Ok(Box::new(writer)) Ok(Box::new(writer))
} }
} }
@ -97,7 +105,7 @@ pub fn convert(input_filename: &str, output_name: &str) -> Result<()> {
info!("convert starting"); info!("convert starting");
debug!("Reading from input file {}", input_filename); debug!("Reading from input file {}", input_filename);
let input_reader = InputReader::new(input_filename)?; let input_reader = InputReader::new(input_filename).context(OpenInput)?;
info!( info!(
"Preparing to convert {} bytes from {}", "Preparing to convert {} bytes from {}",
input_reader.len(), input_reader.len(),
@ -111,13 +119,14 @@ pub fn convert(input_filename: &str, output_name: &str) -> Result<()> {
FileType::TSM => { FileType::TSM => {
// TODO(edd): we can remove this when I figure out the best way to share // TODO(edd): we can remove this when I figure out the best way to share
// the reader between the TSM index reader and the Block decoder. // the reader between the TSM index reader and the Block decoder.
let input_block_reader = InputReader::new(input_filename)?; let input_block_reader = InputReader::new(input_filename).context(OpenInput)?;
let len = input_reader.len() as usize; let len = input_reader.len() as usize;
convert_tsm_to_parquet(input_reader, len, input_block_reader, output_name) convert_tsm_to_parquet(input_reader, len, input_block_reader, output_name)
} }
FileType::Parquet => Err(Error::NotImplemented { FileType::Parquet => NotImplemented {
operation_name: String::from("Parquet format conversion"), operation_name: "Parquet format conversion",
}), }
.fail(),
} }
} }
@ -132,13 +141,12 @@ fn convert_line_protocol_to_parquet(
input_reader input_reader
.len() .len()
.try_into() .try_into()
.expect("Can not allocate buffer"), .expect("Cannot allocate buffer"),
); );
input_reader input_reader
.read_to_string(&mut buf) .read_to_string(&mut buf)
.map_err(|e| Error::UnableToReadInput { .context(UnableToReadInput {
name: String::from(input_filename), name: input_filename,
source: e,
})?; })?;
// FIXME: Design something sensible to do with lines that don't // FIXME: Design something sensible to do with lines that don't
@ -169,10 +177,8 @@ fn convert_line_protocol_to_parquet(
let mut converter = LineProtocolConverter::new(settings, writer_source); let mut converter = LineProtocolConverter::new(settings, writer_source);
converter converter
.convert(only_good_lines) .convert(only_good_lines)
.map_err(|e| Error::UnableToWriteGoodLines { source: e })?; .context(UnableToWriteGoodLines)?;
converter converter.finalize().context(UnableToCloseTableWriter)?;
.finalize()
.map_err(|e| Error::UnableToCloseTableWriter { source: e })?;
info!("Completing writing to {} successfully", output_name); info!("Completing writing to {} successfully", output_name);
Ok(()) Ok(())
} }
@ -200,5 +206,35 @@ fn convert_tsm_to_parquet(
let mut converter = TSMFileConverter::new(writer_source); let mut converter = TSMFileConverter::new(writer_source);
converter converter
.convert(index_stream, index_stream_size, block_stream) .convert(index_stream, index_stream_size, block_stream)
.map_err(|e| Error::UnableToCloseTableWriter { source: e }) .context(UnableToCloseTableWriter)
}
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Error reading {} ({})", name.display(), source))]
UnableToReadInput {
name: PathBuf,
source: std::io::Error,
},
#[snafu(display(
"Cannot write multiple measurements to a single file. Saw new measurement named {}",
new_measurement_name
))]
MultipleMeasurementsToSingleFile { new_measurement_name: String },
#[snafu(display("Error creating a parquet table writer {}", source))]
UnableToCreateParquetTableWriter { source: ParquetWriterError },
#[snafu(display("Not implemented: {}", operation_name))]
NotImplemented { operation_name: String },
#[snafu(display("Error writing remaining lines {}", source))]
UnableToWriteGoodLines { source: IngestError },
#[snafu(display("Error opening input {}", source))]
OpenInput { source: super::input::Error },
#[snafu(display("Error while closing the table writer {}", source))]
UnableToCloseTableWriter { source: IngestError },
} }

View File

@ -20,7 +20,7 @@ pub enum Error {
}, },
#[snafu(display( #[snafu(display(
"Can not write multiple measurements to a single file. Saw new measurement named {}", "Cannot write multiple measurements to a single file. Saw new measurement named {}",
new_measurement_name new_measurement_name
))] ))]
MultipleMeasurementsToSingleFile { new_measurement_name: String }, MultipleMeasurementsToSingleFile { new_measurement_name: String },

View File

@ -1,45 +1,45 @@
use std::collections::{BTreeMap, BTreeSet}; use delorean_parquet::{error::Error as DeloreanParquetError, metadata::print_parquet_metadata};
use std::convert::TryInto; use delorean_tsm::{reader::IndexEntry, reader::TSMIndexReader, InfluxID, TSMError};
use delorean_tsm::reader::{IndexEntry, TSMIndexReader};
use delorean_tsm::{InfluxID, TSMError};
use delorean_parquet::metadata::print_parquet_metadata;
use log::{debug, info}; use log::{debug, info};
use snafu::{ResultExt, Snafu};
use std::{
collections::{BTreeMap, BTreeSet},
convert::TryInto,
};
use crate::commands::error::{Error, Result};
use crate::commands::input::{FileType, InputReader}; use crate::commands::input::{FileType, InputReader};
pub type Result<T, E = Error> = std::result::Result<T, E>;
pub fn dump_meta(input_filename: &str) -> Result<()> { pub fn dump_meta(input_filename: &str) -> Result<()> {
info!("meta starting"); info!("meta starting");
debug!("Reading from input file {}", input_filename); debug!("Reading from input file {}", input_filename);
let input_reader = InputReader::new(input_filename)?; let input_reader = InputReader::new(input_filename).context(OpenInput)?;
match input_reader.file_type() { match input_reader.file_type() {
FileType::LineProtocol => Err(Error::NotImplemented { FileType::LineProtocol => NotImplemented {
operation_name: String::from("Line protocol metadata dump"), operation_name: "Line protocol metadata dump",
}), }
.fail(),
FileType::TSM => { FileType::TSM => {
let len = input_reader let len = input_reader
.len() .len()
.try_into() .try_into()
.expect("File size more than usize"); .expect("File size more than usize");
let reader = let reader = TSMIndexReader::try_new(input_reader, len).context(TSM)?;
TSMIndexReader::try_new(input_reader, len).map_err(|e| Error::TSM { source: e })?;
let mut stats_builder = TSMMetadataBuilder::new(); let mut stats_builder = TSMMetadataBuilder::new();
for mut entry in reader { for entry in reader {
stats_builder.process_entry(&mut entry)?; let entry = entry.context(TSM)?;
stats_builder.process_entry(entry)?;
} }
stats_builder.print_report(); stats_builder.print_report();
Ok(()) Ok(())
} }
FileType::Parquet => { FileType::Parquet => {
print_parquet_metadata(input_reader) print_parquet_metadata(input_reader).context(UnableDumpToParquetMetadata)
.map_err(|e| Error::UnableDumpToParquetMetadata { source: e })?;
Ok(())
} }
} }
} }
@ -54,9 +54,7 @@ struct MeasurementMetadata {
impl MeasurementMetadata { impl MeasurementMetadata {
fn update_for_entry(&mut self, index_entry: &mut IndexEntry) -> Result<()> { fn update_for_entry(&mut self, index_entry: &mut IndexEntry) -> Result<()> {
let key = index_entry let key = index_entry.parse_key().context(TSM)?;
.parse_key()
.map_err(|e| Error::TSM { source: e })?;
for (tag_name, tag_value) in key.tagset { for (tag_name, tag_value) in key.tagset {
let tag_entry = self.tags.entry(tag_name).or_default(); let tag_entry = self.tags.entry(tag_name).or_default();
@ -93,9 +91,7 @@ impl BucketMetadata {
fn update_for_entry(&mut self, index_entry: &mut IndexEntry) -> Result<()> { fn update_for_entry(&mut self, index_entry: &mut IndexEntry) -> Result<()> {
self.count += 1; self.count += 1;
self.total_records += u64::from(index_entry.count); self.total_records += u64::from(index_entry.count);
let key = index_entry let key = index_entry.parse_key().context(TSM)?;
.parse_key()
.map_err(|e| Error::TSM { source: e })?;
let meta = self.measurements.entry(key.measurement).or_default(); let meta = self.measurements.entry(key.measurement).or_default();
meta.update_for_entry(index_entry)?; meta.update_for_entry(index_entry)?;
@ -124,18 +120,13 @@ impl TSMMetadataBuilder {
Self::default() Self::default()
} }
fn process_entry(&mut self, entry: &mut Result<IndexEntry, TSMError>) -> Result<()> { fn process_entry(&mut self, mut index_entry: IndexEntry) -> Result<()> {
match entry {
Ok(index_entry) => {
self.num_entries += 1; self.num_entries += 1;
let key = (index_entry.org_id(), index_entry.bucket_id()); let key = (index_entry.org_id(), index_entry.bucket_id());
let stats = self.bucket_stats.entry(key).or_default(); let stats = self.bucket_stats.entry(key).or_default();
stats.update_for_entry(index_entry)?; stats.update_for_entry(&mut index_entry)?;
Ok(()) Ok(())
} }
Err(e) => Err(Error::TSM { source: e.clone() }),
}
}
fn print_report(&self) { fn print_report(&self) {
println!("TSM Metadata Report:"); println!("TSM Metadata Report:");
@ -152,3 +143,18 @@ impl TSMMetadataBuilder {
} }
} }
} }
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Error opening input {}", source))]
OpenInput { source: super::input::Error },
#[snafu(display("Not implemented: {}", operation_name))]
NotImplemented { operation_name: String },
#[snafu(display("Unable to dump parquet file metadata: {}", source))]
UnableDumpToParquetMetadata { source: DeloreanParquetError },
#[snafu(display(r#"Error reading TSM data: {}"#, source))]
TSM { source: TSMError },
}

View File

@ -1,12 +1,15 @@
use delorean_parquet::ParquetError;
/// Module to handle input files (and maybe urls?) /// Module to handle input files (and maybe urls?)
use libflate::gzip; use libflate::gzip;
use std::fs::File; use snafu::{OptionExt, ResultExt, Snafu};
use std::io; use std::{
use std::io::{BufRead, BufReader, Cursor, Read, Seek, SeekFrom}; fs::File,
use std::path::Path; io,
io::{BufRead, BufReader, Cursor, Read, Seek, SeekFrom},
path::{Path, PathBuf},
};
use crate::commands::error::{Error, Result}; pub type Result<T, E = Error> = std::result::Result<T, E>;
use delorean_parquet::ParquetError;
#[derive(Debug)] #[derive(Debug)]
pub enum FileType { pub enum FileType {
@ -40,17 +43,11 @@ pub struct MemoryInputReader {
impl FileInputReader { impl FileInputReader {
fn new(file_type: FileType, input_name: &str) -> Result<Self> { fn new(file_type: FileType, input_name: &str) -> Result<Self> {
let file = File::open(input_name).map_err(|e| Error::UnableToReadInput { let file = File::open(input_name).context(UnableToReadInput { input_name })?;
name: String::from(input_name),
source: e,
})?;
let file_size = file let file_size = file
.metadata() .metadata()
.map_err(|e| Error::UnableToReadInput { .context(UnableToReadInput { input_name })?
name: String::from(input_name),
source: e,
})?
.len(); .len();
Ok(Self { Ok(Self {
@ -146,14 +143,12 @@ impl InputReader {
// inspect contents. // inspect contents.
let ext = path let ext = path
.extension() .extension()
.ok_or(Error::UnknownInputType { .context(UnknownInputType {
details: String::from("No extension"), details: String::from("No extension"),
input_name: path.display().to_string(), input_name: path.display().to_string(),
})? })?
.to_str() .to_str()
.ok_or(Error::FileNameDecode { .context(FileNameDecode { input_name: path })?;
input_name: path.display().to_string(),
})?;
match ext { match ext {
"tsm" => Ok(Self::FileInputType(FileInputReader::new( "tsm" => Ok(Self::FileInputType(FileInputReader::new(
@ -173,31 +168,20 @@ impl InputReader {
let stem_ext = stem let stem_ext = stem
.extension() .extension()
.ok_or(Error::UnknownInputType { .context(UnknownInputType {
details: String::from("No extension before .gz"), details: String::from("No extension before .gz"),
input_name: path.display().to_string(), input_name: path,
})? })?
.to_str() .to_str()
.ok_or(Error::FileNameDecode { .context(FileNameDecode { input_name: path })?;
input_name: path.display().to_string(),
})?;
let file = File::open(input_name).map_err(|e| Error::UnableToReadInput { let file = File::open(input_name).context(UnableToReadInput { input_name })?;
name: input_name.to_string(),
source: e,
})?;
let mut decoder = let mut decoder =
gzip::Decoder::new(file).map_err(|gzip_err| Error::UnableToReadInput { gzip::Decoder::new(file).context(UnableToReadInput { input_name })?;
name: input_name.to_string(),
source: gzip_err,
})?;
let mut buffer = Vec::new(); let mut buffer = Vec::new();
decoder decoder
.read_to_end(&mut buffer) .read_to_end(&mut buffer)
.map_err(|e| Error::ReadingGzip { .context(ReadingGzip { input_name })?;
input_name: input_name.to_string(),
source: e,
})?;
match stem_ext { match stem_ext {
"tsm" => Ok(Self::MemoryInputType(MemoryInputReader::new( "tsm" => Ok(Self::MemoryInputType(MemoryInputReader::new(
@ -212,16 +196,42 @@ impl InputReader {
FileType::Parquet, FileType::Parquet,
buffer, buffer,
))), ))),
_ => Err(Error::UnknownInputType { _ => UnknownInputType {
details: String::from("Unknown input extension before .gz"), details: "Unknown input extension before .gz",
input_name: input_name.to_string(), input_name,
}), }
.fail(),
} }
} }
_ => Err(Error::UnknownInputType { _ => UnknownInputType {
details: String::from("Unknown input extension"), details: "Unknown input extension",
input_name: input_name.to_string(), input_name,
}), }
.fail(),
} }
} }
} }
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Error reading {} ({})", input_name.display(), source))]
UnableToReadInput {
input_name: PathBuf,
source: std::io::Error,
},
#[snafu(display("Unknown input type: {} for {}", details, input_name.display()))]
UnknownInputType {
details: String,
input_name: PathBuf,
},
#[snafu(display("Can't convert filename to utf-8, : {}", input_name.display()))]
FileNameDecode { input_name: PathBuf },
#[snafu(display("Can't read gzip data : {}", input_name.display()))]
ReadingGzip {
input_name: PathBuf,
source: std::io::Error,
},
}

View File

@ -1,37 +1,37 @@
//! This module contains code to report compression statistics for storage files //! This module contains code to report compression statistics for storage files
use delorean_parquet::{error::Error as DeloreanParquetError, stats::col_stats};
use log::info; use log::info;
use snafu::{ResultExt, Snafu};
use crate::{ use crate::commands::input::{FileType, InputReader};
commands::error::{Error, Result},
commands::input::{FileType, InputReader},
};
use delorean_parquet::stats::col_stats; pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Print statistics about the file name in input_filename to stdout /// Print statistics about the file name in input_filename to stdout
pub fn stats(input_filename: &str) -> Result<()> { pub fn stats(input_filename: &str) -> Result<()> {
info!("stats starting"); info!("stats starting");
let input_reader = InputReader::new(input_filename)?; let input_reader = InputReader::new(input_filename).context(OpenInput)?;
let (input_len, col_stats) = match input_reader.file_type() { let (input_len, col_stats) = match input_reader.file_type() {
FileType::LineProtocol => { FileType::LineProtocol => {
return Err(Error::NotImplemented { return NotImplemented {
operation_name: String::from("Line protocol storage statistics"), operation_name: "Line protocol storage statistics",
}); }
.fail()
} }
FileType::TSM => { FileType::TSM => {
return Err(Error::NotImplemented { return NotImplemented {
operation_name: String::from("TSM storage statistics"), operation_name: "TSM storage statistics",
}) }
.fail()
} }
FileType::Parquet => { FileType::Parquet => {
let input_len = input_reader.len(); let input_len = input_reader.len();
( (
input_len, input_len,
col_stats(input_reader) col_stats(input_reader).context(UnableDumpToParquetMetadata)?,
.map_err(|e| Error::UnableDumpToParquetMetadata { source: e })?,
) )
} }
}; };
@ -76,3 +76,15 @@ pub fn stats(input_filename: &str) -> Result<()> {
Ok(()) Ok(())
} }
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Not implemented: {}", operation_name))]
NotImplemented { operation_name: String },
#[snafu(display("Error opening input {}", source))]
OpenInput { source: super::input::Error },
#[snafu(display("Unable to dump parquet file metadata: {}", source))]
UnableDumpToParquetMetadata { source: DeloreanParquetError },
}

View File

@ -11,7 +11,6 @@ use log::{debug, error, warn};
mod commands { mod commands {
pub mod convert; pub mod convert;
mod error;
pub mod file_meta; pub mod file_meta;
mod input; mod input;
pub mod stats; pub mod stats;

View File

@ -195,7 +195,7 @@ fn meta_bad_input_filename() {
.code(2) .code(2)
.stderr(predicate::str::contains("Metadata dump failed")) .stderr(predicate::str::contains("Metadata dump failed"))
.stderr(predicate::str::contains( .stderr(predicate::str::contains(
"Metadata dump failed: Unknown input type: No extension for non_existent_input", "Metadata dump failed: Error opening input Unknown input type: No extension for non_existent_input",
)); ));
} }
@ -209,7 +209,7 @@ fn meta_non_existent_input_filename() {
.code(2) .code(2)
.stderr(predicate::str::contains("Metadata dump failed")) .stderr(predicate::str::contains("Metadata dump failed"))
.stderr(predicate::str::contains( .stderr(predicate::str::contains(
"Metadata dump failed: Error reading non_existent_input.tsm", "Metadata dump failed: Error opening input Error reading non_existent_input.tsm",
)); ));
} }
@ -223,7 +223,7 @@ fn meta_bad_input_filename_gz() {
.code(2) .code(2)
.stderr(predicate::str::contains("Metadata dump failed")) .stderr(predicate::str::contains("Metadata dump failed"))
.stderr(predicate::str::contains( .stderr(predicate::str::contains(
"Metadata dump failed: Unknown input type: No extension before .gz for non_existent_input.gz", "Metadata dump failed: Error opening input Unknown input type: No extension before .gz for non_existent_input.gz",
)); ));
} }