diff --git a/Cargo.lock b/Cargo.lock index 0569b8d931..8234c5f66e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -648,11 +648,13 @@ version = "0.1.0" dependencies = [ "assert_cmd", "clap", + "delorean", "delorean_ingest", "delorean_line_parser", "delorean_parquet", "delorean_test_helpers", "env_logger", + "libflate", "log 0.4.8", "predicates", "snafu", diff --git a/delorean_storage_tool/Cargo.toml b/delorean_storage_tool/Cargo.toml index 47e5053069..af1804875a 100644 --- a/delorean_storage_tool/Cargo.toml +++ b/delorean_storage_tool/Cargo.toml @@ -10,6 +10,8 @@ edition = "2018" delorean_line_parser = { path = "../delorean_line_parser" } delorean_ingest = { path = "../delorean_ingest" } delorean_parquet = { path = "../delorean_parquet" } +delorean = { path = ".." } +libflate = "1.0.0" clap = "2.33.1" env_logger = "0.7.1" diff --git a/delorean_storage_tool/src/error.rs b/delorean_storage_tool/src/error.rs new file mode 100644 index 0000000000..72a5930058 --- /dev/null +++ b/delorean_storage_tool/src/error.rs @@ -0,0 +1,58 @@ +use snafu::Snafu; + +use delorean::storage::StorageError; +use delorean_parquet::writer::Error as DeloreanTableWriterError; + +#[derive(Debug, Snafu)] +pub enum Error { + #[snafu(display("Error reading {} ({})", name, source))] + UnableToReadInput { + name: String, + source: std::io::Error, + }, + + #[snafu(display("Unable to create output file {} ({})", name, source))] + UnableToCreateFile { + name: String, + source: std::io::Error, + }, + + #[snafu(display("Not implemented: {}", operation_name))] + NotImplemented { operation_name: String }, + + #[snafu(display("Unknown input type: {} for {}", details, input_name))] + UnknownInputType { details: String, input_name: String }, + + #[snafu(display("Can't convert filename to utf-8, : {}", input_name))] + FileNameDecode { input_name: String }, + + #[snafu(display("Can't read gzip data : {}", input_name))] + ReadingGzip { + input_name: String, + source: std::io::Error, + }, + + #[snafu(context(false))] + #[snafu(display("Error converting data {}", source))] + Conversion { source: delorean_ingest::Error }, + + #[snafu(display("Error creating a table writer {}", source))] + UnableToCreateTableWriter { source: DeloreanTableWriterError }, + + #[snafu(display("Error writing the sample schema {}", source))] + UnableToWriteSchemaSample { source: DeloreanTableWriterError }, + + #[snafu(display("Error writing remaining lines {}", source))] + UnableToWriteGoodLines { source: DeloreanTableWriterError }, + + #[snafu(display("Error while closing the table writer {}", source))] + UnableToCloseTableWriter { source: DeloreanTableWriterError }, + + #[snafu(display(r#"Error reading TSM data: {}"#, source))] + TSM { source: StorageError }, + + #[snafu(display(r#"Error parsing data: {}"#, source))] + Parsing { source: delorean_line_parser::Error }, +} + +pub type Result = std::result::Result; diff --git a/delorean_storage_tool/src/file_meta.rs b/delorean_storage_tool/src/file_meta.rs new file mode 100644 index 0000000000..c1fd60301c --- /dev/null +++ b/delorean_storage_tool/src/file_meta.rs @@ -0,0 +1,146 @@ +#![deny(rust_2018_idioms)] +#![warn(missing_debug_implementations, clippy::explicit_iter_loop)] + +use std::collections::{BTreeMap, BTreeSet}; + +use delorean::storage::tsm::{IndexEntry, InfluxID, TSMReader}; +use delorean::storage::StorageError; + +use log::{debug, info}; + +use crate::error::{Error, Result}; +use crate::input::{FileType, InputReader}; + +pub fn dump_meta(input_filename: &str) -> Result<()> { + info!("dstool meta starting"); + debug!("Reading from input file {}", input_filename); + + let input_reader = InputReader::new(input_filename)?; + + match input_reader.file_type() { + FileType::LineProtocol => Err(Error::NotImplemented { + operation_name: String::from("Line protocol metadata dump"), + }), + FileType::TSM => { + let len = input_reader.len(); + let mut reader = TSMReader::new(input_reader, len); + let index = reader.index().map_err(|e| Error::TSM { source: e })?; + + let mut stats_builder = TSMMetadataBuilder::new(); + + for mut entry in index { + stats_builder.process_entry(&mut entry)?; + } + stats_builder.print_report(); + Ok(()) + } + } +} + +#[derive(Debug, Default)] +struct MeasurementMetadata { + /// tag name --> list of seen tag values + tags: BTreeMap>, + /// List of field names seen + fields: BTreeSet, +} + +impl MeasurementMetadata { + fn update_for_entry(&mut self, index_entry: &mut IndexEntry) -> Result<()> { + let tagset = index_entry.tagset().map_err(|e| Error::TSM { source: e })?; + for (tag_name, tag_value) in tagset { + let tag_entry = self.tags.entry(tag_name).or_default(); + tag_entry.insert(tag_value); + } + let field_name = index_entry + .field_key() + .map_err(|e| Error::TSM { source: e })?; + self.fields.insert(field_name); + Ok(()) + } + + fn print_report(&self, prefix: &str) { + for (tag_name, tag_values) in &self.tags { + println!("{} tag {} = {:?}", prefix, tag_name, tag_values); + } + for field_name in &self.fields { + println!("{} field {}", prefix, field_name); + } + } +} + +/// Represents stats for a single bucket +#[derive(Debug, Default)] +struct BucketMetadata { + /// How many index entries have been seen + count: u64, + + // Total 'records' (aka sum of the lengths of all timeseries) + total_records: u64, + + // measurement -> + measurements: BTreeMap, +} + +impl BucketMetadata { + fn update_for_entry(&mut self, index_entry: &mut IndexEntry) -> Result<()> { + self.count += 1; + self.total_records += u64::from(index_entry.count); + let measurement = index_entry + .measurement() + .map_err(|e| Error::TSM { source: e })?; + let meta = self.measurements.entry(measurement).or_default(); + meta.update_for_entry(index_entry)?; + Ok(()) + } + + fn print_report(&self, prefix: &str) { + for (measurement, meta) in &self.measurements { + println!("{}{}", prefix, measurement); + let indent = format!("{} ", prefix); + meta.print_report(&indent); + } + } +} + +#[derive(Debug, Default)] +struct TSMMetadataBuilder { + num_entries: u32, + + // (org_id, bucket_id) --> Bucket Metadata + bucket_stats: BTreeMap<(InfluxID, InfluxID), BucketMetadata>, +} + +impl TSMMetadataBuilder { + fn new() -> TSMMetadataBuilder { + Self::default() + } + + fn process_entry(&mut self, entry: &mut Result) -> Result<()> { + match entry { + Ok(index_entry) => { + self.num_entries += 1; + let key = (index_entry.org_id(), index_entry.bucket_id()); + let stats = self.bucket_stats.entry(key).or_default(); + stats.update_for_entry(index_entry)?; + Ok(()) + } + Err(e) => Err(Error::TSM { source: e.clone() }), + } + } + + fn print_report(&self) { + println!("TSM Metadata Report:"); + println!(" Valid Index Entries: {}", self.num_entries); + println!(" Organizations/Bucket Stats:"); + for (k, stats) in &self.bucket_stats { + let (org_id, bucket_id) = k; + println!( + " ({}, {}) {} index entries, {} total records", + org_id, bucket_id, stats.count, stats.total_records + ); + println!(" Measurements:"); + stats.print_report(" "); + } + } +} diff --git a/delorean_storage_tool/src/input.rs b/delorean_storage_tool/src/input.rs new file mode 100644 index 0000000000..e7e0dd6bcf --- /dev/null +++ b/delorean_storage_tool/src/input.rs @@ -0,0 +1,206 @@ +/// Module to handle input files (and maybe urls?) from dstool +use libflate::gzip; +use std::fs::File; +use std::io; +use std::io::{BufRead, BufReader, Cursor, Read, Seek, SeekFrom}; +use std::path::Path; + +use crate::error::{Error, Result}; + +pub enum FileType { + LineProtocol, + TSM, +} + +// Interface for interacting with streams +pub enum InputReader { + FileInputType(FileInputReader), + MemoryInputType(MemoryInputReader), +} + +// Contains a (file backed) reader to read raw uncompressed bytes +pub struct FileInputReader { + file_type: FileType, + file_size: usize, + reader: BufReader, +} + +// Contains an in-memory reader... +pub struct MemoryInputReader { + file_type: FileType, + file_size: usize, + cursor: Cursor>, +} + +impl FileInputReader { + fn new(file_type: FileType, input_name: &str) -> Result { + let file = File::open(input_name).map_err(|e| Error::UnableToReadInput { + name: String::from(input_name), + source: e, + })?; + + let file_size = file + .metadata() + .map_err(|e| Error::UnableToReadInput { + name: String::from(input_name), + source: e, + })? + .len(); + + Ok(FileInputReader { + file_type, + file_size: file_size as usize, + reader: BufReader::new(file), + }) + } +} + +impl MemoryInputReader { + fn new(file_type: FileType, buffer: Vec) -> MemoryInputReader { + let len = buffer.len(); + MemoryInputReader { + file_type, + file_size: len, + cursor: Cursor::new(buffer), + } + } +} + +impl Seek for InputReader { + fn seek(&mut self, pos: SeekFrom) -> io::Result { + match self { + InputReader::FileInputType(file_input_reader) => file_input_reader.reader.seek(pos), + InputReader::MemoryInputType(memory_input_reader) => { + memory_input_reader.cursor.seek(pos) + } + } + } +} + +impl Read for InputReader { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + match self { + InputReader::FileInputType(file_input_reader) => file_input_reader.reader.read(buf), + InputReader::MemoryInputType(memory_input_reader) => { + memory_input_reader.cursor.read(buf) + } + } + } +} + +impl BufRead for InputReader { + fn fill_buf(&mut self) -> io::Result<&[u8]> { + match self { + InputReader::FileInputType(file_input_reader) => file_input_reader.reader.fill_buf(), + InputReader::MemoryInputType(memory_input_reader) => { + memory_input_reader.cursor.fill_buf() + } + } + } + fn consume(&mut self, amt: usize) { + match self { + InputReader::FileInputType(file_input_reader) => file_input_reader.reader.consume(amt), + InputReader::MemoryInputType(memory_input_reader) => { + memory_input_reader.cursor.consume(amt) + } + } + } +} + +impl InputReader { + pub fn file_type(&self) -> &FileType { + match self { + InputReader::FileInputType(file_input_reader) => &file_input_reader.file_type, + InputReader::MemoryInputType(memory_input_reader) => &memory_input_reader.file_type, + } + } + + pub fn len(&self) -> usize { + match self { + InputReader::FileInputType(file_input_reader) => file_input_reader.file_size, + InputReader::MemoryInputType(memory_input_reader) => memory_input_reader.file_size, + } + } + + // Create a new input reader suitable for reading from + // `input_name` and figures out the file input type based on + // heuristics (ahem, the filename extension) + pub fn new(input_name: &str) -> Result { + let path = Path::new(input_name); + + // Initially simply use the file name's extension to determine + // the type; Maybe in the future we can be more clever and + // inspect contents. + let ext = path + .extension() + .ok_or(Error::UnknownInputType { + details: String::from("No extension"), + input_name: path.display().to_string(), + })? + .to_str() + .ok_or(Error::FileNameDecode { + input_name: path.display().to_string(), + })?; + + match ext { + "tsm" => Ok(InputReader::FileInputType(FileInputReader::new( + FileType::TSM, + input_name, + )?)), + "lp" => Ok(InputReader::FileInputType(FileInputReader::new( + FileType::LineProtocol, + input_name, + )?)), + "gz" => { + let stem = Path::new(path.file_stem().unwrap()); + + let stem_ext = stem + .extension() + .ok_or(Error::UnknownInputType { + details: String::from("No extension before .gz"), + input_name: path.display().to_string(), + })? + .to_str() + .ok_or(Error::FileNameDecode { + input_name: path.display().to_string(), + })?; + + let file = File::open(input_name).map_err(|e| Error::UnableToReadInput { + name: input_name.to_string(), + source: e, + })?; + let mut decoder = + gzip::Decoder::new(file).map_err(|gzip_err| Error::UnableToReadInput { + name: input_name.to_string(), + source: gzip_err, + })?; + let mut buffer = Vec::new(); + decoder + .read_to_end(&mut buffer) + .map_err(|e| Error::ReadingGzip { + input_name: input_name.to_string(), + source: e, + })?; + + match stem_ext { + "tsm" => Ok(InputReader::MemoryInputType(MemoryInputReader::new( + FileType::TSM, + buffer, + ))), + "lp" => Ok(InputReader::MemoryInputType(MemoryInputReader::new( + FileType::LineProtocol, + buffer, + ))), + _ => Err(Error::UnknownInputType { + details: String::from("Unknown input extension before .gz"), + input_name: input_name.to_string(), + }), + } + } + _ => Err(Error::UnknownInputType { + details: String::from("Unknown input extension"), + input_name: input_name.to_string(), + }), + } + } +} diff --git a/delorean_storage_tool/src/main.rs b/delorean_storage_tool/src/main.rs index a5d88166c6..3262c6328e 100644 --- a/delorean_storage_tool/src/main.rs +++ b/delorean_storage_tool/src/main.rs @@ -1,67 +1,41 @@ #![deny(rust_2018_idioms)] #![warn(missing_debug_implementations, clippy::explicit_iter_loop)] -use clap::{crate_authors, crate_version, App, Arg, SubCommand}; -use delorean_ingest::LineProtocolConverter; -use delorean_line_parser::{parse_lines, ParsedLine}; -use delorean_parquet::writer::{DeloreanTableWriter, Error as DeloreanTableWriterError}; use log::{debug, info, warn}; -use snafu::{ResultExt, Snafu}; use std::fs; -#[derive(Debug, Snafu)] -pub enum Error { - #[snafu(display("Error reading {} ({})", name.display(), source))] - UnableToReadInput { - name: std::path::PathBuf, - source: std::io::Error, - }, +use clap::{crate_authors, crate_version, App, Arg, SubCommand}; - UnableToCreateFile { - name: std::path::PathBuf, - source: std::io::Error, - }, +use delorean_ingest::LineProtocolConverter; +use delorean_line_parser::{parse_lines, ParsedLine}; +use delorean_parquet::writer::DeloreanTableWriter; - #[snafu(context(false))] - Conversion { - source: delorean_ingest::Error, - }, +mod error; +mod file_meta; +mod input; - UnableToCreateTableWriter { - source: DeloreanTableWriterError, - }, +use error::{Error, Result}; - UnableToWriteSchemaSample { - source: DeloreanTableWriterError, - }, - - UnableToWriteGoodLines { - source: DeloreanTableWriterError, - }, - - UnableToCloseTableWriter { - source: DeloreanTableWriterError, - }, -} - -type Result = std::result::Result; +use file_meta::dump_meta; enum ReturnCode { InternalError = 1, ConversionFailed = 2, + MetadataDumpFailed = 3, } static SCHEMA_SAMPLE_SIZE: usize = 5; fn convert(input_filename: &str, output_filename: &str) -> Result<()> { - info!("dstool starting"); + info!("dstool convert starting"); debug!("Reading from input file {}", input_filename); debug!("Writing to output file {}", output_filename); // TODO: make a streaming parser that you can stream data through in blocks. // for now, just read the whole input file into RAM... - let buf = fs::read_to_string(input_filename).context(UnableToReadInput { - name: input_filename, + let buf = fs::read_to_string(input_filename).map_err(|e| Error::UnableToReadInput { + name: String::from(input_filename), + source: e, })?; info!("Read {} bytes from {}", buf.len(), input_filename); @@ -83,21 +57,24 @@ fn convert(input_filename: &str, output_filename: &str) -> Result<()> { debug!("Using schema deduced from sample: {:?}", converter.schema()); info!("Schema deduced. Writing output to {} ...", output_filename); - let output_file = fs::File::create(output_filename).context(UnableToCreateFile { - name: output_filename, + let output_file = fs::File::create(output_filename).map_err(|e| Error::UnableToCreateFile { + name: String::from(output_filename), + source: e, })?; let mut writer = DeloreanTableWriter::new(converter.schema(), output_file) - .context(UnableToCreateTableWriter)?; + .map_err(|e| Error::UnableToCreateTableWriter { source: e })?; // Write the sample and then the remaining lines writer .write_batch(&converter.pack_lines(schema_sample.into_iter())) - .context(UnableToWriteSchemaSample)?; + .map_err(|e| Error::UnableToWriteSchemaSample { source: e })?; writer .write_batch(&converter.pack_lines(only_good_lines)) - .context(UnableToWriteGoodLines)?; - writer.close().context(UnableToCloseTableWriter)?; + .map_err(|e| Error::UnableToWriteGoodLines { source: e })?; + writer + .close() + .map_err(|e| Error::UnableToCloseTableWriter { source: e })?; info!("Completing writing {} successfully", output_filename); Ok(()) } @@ -132,6 +109,16 @@ Examples: .index(2), ), ) + .subcommand( + SubCommand::with_name("meta") + .about("Print out metadata information about a storage file") + .arg( + Arg::with_name("INPUT") + .help("The input filename to read from") + .required(true) + .index(1), + ), + ) .arg( Arg::with_name("verbose") .short("v") @@ -160,6 +147,15 @@ Examples: std::process::exit(ReturnCode::ConversionFailed as _) } } + } else if let Some(matches) = matches.subcommand_matches("meta") { + let input_filename = matches.value_of("INPUT").unwrap(); + match dump_meta(&input_filename) { + Ok(()) => debug!("Metadata dump completed successfully"), + Err(e) => { + eprintln!("Metadata dump failed: {}", e); + std::process::exit(ReturnCode::MetadataDumpFailed as _) + } + } } else { eprintln!("Internal error: no command found"); std::process::exit(ReturnCode::InternalError as _); diff --git a/delorean_storage_tool/tests/dstool.rs b/delorean_storage_tool/tests/dstool.rs index d7ededbd40..f65b1b14d9 100644 --- a/delorean_storage_tool/tests/dstool.rs +++ b/delorean_storage_tool/tests/dstool.rs @@ -1,7 +1,10 @@ mod dstool_tests { - use assert_cmd::Command; - use predicates::prelude::*; + use std::fs::File; + use assert_cmd::assert::Assert; + use assert_cmd::Command; + use libflate::gzip; + use predicates::prelude::*; #[test] fn convert_bad_input_filename() { let mut cmd = Command::cargo_bin("dstool").unwrap(); @@ -43,13 +46,144 @@ mod dstool_tests { assert .success() - .stderr(predicate::str::contains("dstool starting")) + .stderr(predicate::str::contains("dstool convert starting")) .stderr(predicate::str::contains("Schema deduced")) .stderr(predicate::str::contains(expected_success_string)); // TODO: add a dump command to dstool and verify that the dump // of the written parquet file is as expected. + } - parquet_path.close().expect("deleting temporary file"); + #[test] + fn meta_bad_input_filename() { + let mut cmd = Command::cargo_bin("dstool").unwrap(); + let assert = cmd.arg("meta").arg("non_existent_input").assert(); + + assert + .failure() + .code(3) + .stderr(predicate::str::contains("Metadata dump failed")) + .stderr(predicate::str::contains( + "Metadata dump failed: Unknown input type: No extension for non_existent_input", + )); + } + + #[test] + fn meta_non_existent_input_filename() { + let mut cmd = Command::cargo_bin("dstool").unwrap(); + let assert = cmd.arg("meta").arg("non_existent_input.tsm").assert(); + + assert + .failure() + .code(3) + .stderr(predicate::str::contains("Metadata dump failed")) + .stderr(predicate::str::contains( + "Metadata dump failed: Error reading non_existent_input.tsm", + )); + } + + #[test] + fn meta_bad_input_filename_gz() { + let mut cmd = Command::cargo_bin("dstool").unwrap(); + let assert = cmd.arg("meta").arg("non_existent_input.gz").assert(); + + assert + .failure() + .code(3) + .stderr(predicate::str::contains("Metadata dump failed")) + .stderr(predicate::str::contains( + "Metadata dump failed: Unknown input type: No extension before .gz for non_existent_input.gz", + )); + } + + // gunzip's the contents of the file at input_path into a temporary path + fn uncompress_gz( + input_path: &str, + output_extension: &str, + ) -> delorean_test_helpers::tempfile::TempPath { + let gz_file = File::open(input_path).expect("Error opening input"); + + let output_path = delorean_test_helpers::tempfile::Builder::new() + .prefix("dstool_decompressed_e2e") + .suffix(output_extension) + .tempfile() + .expect("error creating temp file") + .into_temp_path(); + + let mut output_file = File::create(&output_path).expect("error opening output"); + let mut decoder = gzip::Decoder::new(gz_file).expect("error creating gzip decoder"); + std::io::copy(&mut decoder, &mut output_file).expect("error copying stream"); + output_path + } + + /// Validates some of the metadata output content for this tsm file + fn assert_meta_000000000000005_000000002_tsm(assert: Assert) { + assert + .success() + .stdout(predicate::str::contains("TSM Metadata Report")) + .stdout(predicate::str::contains( + "(05c19117091a1000, 05c19117091a1001) 2159 index entries, 2159 total records", + )) + .stdout(predicate::str::contains( + "task_scheduler_total_schedule_fails", + )); + } + + #[test] + fn meta_000000000000005_000000002_tsm() { + let input_tsm = uncompress_gz("../tests/fixtures/000000000000005-000000002.tsm.gz", ".tsm"); + let mut cmd = Command::cargo_bin("dstool").unwrap(); + let assert = cmd + .arg("meta") + .arg(input_tsm.to_string_lossy().to_string()) + .assert(); + assert_meta_000000000000005_000000002_tsm(assert); + } + + #[test] + fn meta_000000000000005_000000002_tsm_gz() { + let mut cmd = Command::cargo_bin("dstool").unwrap(); + let assert = cmd + .arg("meta") + .arg("../tests/fixtures/000000000000005-000000002.tsm.gz") + .assert(); + + assert_meta_000000000000005_000000002_tsm(assert); + } + + /// Validates some of the metadata output content for this tsm file + fn assert_meta_cpu_usage_tsm(assert: Assert) { + assert + .success() + .stdout(predicate::str::contains("TSM Metadata Report")) + .stdout(predicate::str::contains( + "(05b4927b3fe38000, 05b4927b3fe38001) 2735 index entries, 2735 total records", + )) + .stdout(predicate::str::contains( + "task_scheduler_total_schedule_fails", + )); + } + + #[test] + fn meta_cpu_usage_tsm() { + let input_tsm = uncompress_gz("../tests/fixtures/cpu_usage.tsm.gz", ".tsm"); + let mut cmd = Command::cargo_bin("dstool").unwrap(); + let assert = cmd + .arg("meta") + .arg(input_tsm.to_string_lossy().to_string()) + .assert(); + + assert_meta_cpu_usage_tsm(assert); + } + + #[test] + fn meta_cpu_usage_tsm_gz() { + let mut cmd = Command::cargo_bin("dstool").unwrap(); + let assert = cmd + .arg("meta") + .arg("../tests/fixtures/cpu_usage.tsm.gz") + .assert(); + + assert_meta_cpu_usage_tsm(assert); } } diff --git a/src/storage/tsm.rs b/src/storage/tsm.rs index d7c8e25733..547288be2a 100644 --- a/src/storage/tsm.rs +++ b/src/storage/tsm.rs @@ -509,7 +509,7 @@ pub enum BlockData { Unsigned { ts: Vec, values: Vec }, } -#[derive(Copy, Clone, Debug, PartialEq)] +#[derive(Copy, Clone, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)] /// `InfluxID` represents an InfluxDB ID used in InfluxDB 2.x to represent /// organization and bucket identifiers. pub struct InfluxID(u64);