feat: Add file / metadata inspection + dumping with dstool (#112)

* feat: Add file / metadata inspection + dumping

* fix: apply some PR review comments

* fix: apply suggestions from code review

Co-authored-by: Jake Goulding <jake.goulding@integer32.com>

* feat: Add tests, rearrange code into modules, add gzip aware interface

* fix: fix comment and test

* fix: test output and fmt

Co-authored-by: Jake Goulding <jake.goulding@integer32.com>
pull/24376/head
Andrew Lamb 2020-06-09 10:10:55 -04:00 committed by GitHub
parent 9844cafc5d
commit f1a3058b24
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 595 additions and 51 deletions

2
Cargo.lock generated
View File

@ -648,11 +648,13 @@ version = "0.1.0"
dependencies = [
"assert_cmd",
"clap",
"delorean",
"delorean_ingest",
"delorean_line_parser",
"delorean_parquet",
"delorean_test_helpers",
"env_logger",
"libflate",
"log 0.4.8",
"predicates",
"snafu",

View File

@ -10,6 +10,8 @@ edition = "2018"
delorean_line_parser = { path = "../delorean_line_parser" }
delorean_ingest = { path = "../delorean_ingest" }
delorean_parquet = { path = "../delorean_parquet" }
delorean = { path = ".." }
libflate = "1.0.0"
clap = "2.33.1"
env_logger = "0.7.1"

View File

@ -0,0 +1,58 @@
use snafu::Snafu;
use delorean::storage::StorageError;
use delorean_parquet::writer::Error as DeloreanTableWriterError;
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Error reading {} ({})", name, source))]
UnableToReadInput {
name: String,
source: std::io::Error,
},
#[snafu(display("Unable to create output file {} ({})", name, source))]
UnableToCreateFile {
name: String,
source: std::io::Error,
},
#[snafu(display("Not implemented: {}", operation_name))]
NotImplemented { operation_name: String },
#[snafu(display("Unknown input type: {} for {}", details, input_name))]
UnknownInputType { details: String, input_name: String },
#[snafu(display("Can't convert filename to utf-8, : {}", input_name))]
FileNameDecode { input_name: String },
#[snafu(display("Can't read gzip data : {}", input_name))]
ReadingGzip {
input_name: String,
source: std::io::Error,
},
#[snafu(context(false))]
#[snafu(display("Error converting data {}", source))]
Conversion { source: delorean_ingest::Error },
#[snafu(display("Error creating a table writer {}", source))]
UnableToCreateTableWriter { source: DeloreanTableWriterError },
#[snafu(display("Error writing the sample schema {}", source))]
UnableToWriteSchemaSample { source: DeloreanTableWriterError },
#[snafu(display("Error writing remaining lines {}", source))]
UnableToWriteGoodLines { source: DeloreanTableWriterError },
#[snafu(display("Error while closing the table writer {}", source))]
UnableToCloseTableWriter { source: DeloreanTableWriterError },
#[snafu(display(r#"Error reading TSM data: {}"#, source))]
TSM { source: StorageError },
#[snafu(display(r#"Error parsing data: {}"#, source))]
Parsing { source: delorean_line_parser::Error },
}
pub type Result<T, E = Error> = std::result::Result<T, E>;

View File

@ -0,0 +1,146 @@
#![deny(rust_2018_idioms)]
#![warn(missing_debug_implementations, clippy::explicit_iter_loop)]
use std::collections::{BTreeMap, BTreeSet};
use delorean::storage::tsm::{IndexEntry, InfluxID, TSMReader};
use delorean::storage::StorageError;
use log::{debug, info};
use crate::error::{Error, Result};
use crate::input::{FileType, InputReader};
pub fn dump_meta(input_filename: &str) -> Result<()> {
info!("dstool meta starting");
debug!("Reading from input file {}", input_filename);
let input_reader = InputReader::new(input_filename)?;
match input_reader.file_type() {
FileType::LineProtocol => Err(Error::NotImplemented {
operation_name: String::from("Line protocol metadata dump"),
}),
FileType::TSM => {
let len = input_reader.len();
let mut reader = TSMReader::new(input_reader, len);
let index = reader.index().map_err(|e| Error::TSM { source: e })?;
let mut stats_builder = TSMMetadataBuilder::new();
for mut entry in index {
stats_builder.process_entry(&mut entry)?;
}
stats_builder.print_report();
Ok(())
}
}
}
#[derive(Debug, Default)]
struct MeasurementMetadata {
/// tag name --> list of seen tag values
tags: BTreeMap<String, BTreeSet<String>>,
/// List of field names seen
fields: BTreeSet<String>,
}
impl MeasurementMetadata {
fn update_for_entry(&mut self, index_entry: &mut IndexEntry) -> Result<()> {
let tagset = index_entry.tagset().map_err(|e| Error::TSM { source: e })?;
for (tag_name, tag_value) in tagset {
let tag_entry = self.tags.entry(tag_name).or_default();
tag_entry.insert(tag_value);
}
let field_name = index_entry
.field_key()
.map_err(|e| Error::TSM { source: e })?;
self.fields.insert(field_name);
Ok(())
}
fn print_report(&self, prefix: &str) {
for (tag_name, tag_values) in &self.tags {
println!("{} tag {} = {:?}", prefix, tag_name, tag_values);
}
for field_name in &self.fields {
println!("{} field {}", prefix, field_name);
}
}
}
/// Represents stats for a single bucket
#[derive(Debug, Default)]
struct BucketMetadata {
/// How many index entries have been seen
count: u64,
// Total 'records' (aka sum of the lengths of all timeseries)
total_records: u64,
// measurement ->
measurements: BTreeMap<String, MeasurementMetadata>,
}
impl BucketMetadata {
fn update_for_entry(&mut self, index_entry: &mut IndexEntry) -> Result<()> {
self.count += 1;
self.total_records += u64::from(index_entry.count);
let measurement = index_entry
.measurement()
.map_err(|e| Error::TSM { source: e })?;
let meta = self.measurements.entry(measurement).or_default();
meta.update_for_entry(index_entry)?;
Ok(())
}
fn print_report(&self, prefix: &str) {
for (measurement, meta) in &self.measurements {
println!("{}{}", prefix, measurement);
let indent = format!("{} ", prefix);
meta.print_report(&indent);
}
}
}
#[derive(Debug, Default)]
struct TSMMetadataBuilder {
num_entries: u32,
// (org_id, bucket_id) --> Bucket Metadata
bucket_stats: BTreeMap<(InfluxID, InfluxID), BucketMetadata>,
}
impl TSMMetadataBuilder {
fn new() -> TSMMetadataBuilder {
Self::default()
}
fn process_entry(&mut self, entry: &mut Result<IndexEntry, StorageError>) -> Result<()> {
match entry {
Ok(index_entry) => {
self.num_entries += 1;
let key = (index_entry.org_id(), index_entry.bucket_id());
let stats = self.bucket_stats.entry(key).or_default();
stats.update_for_entry(index_entry)?;
Ok(())
}
Err(e) => Err(Error::TSM { source: e.clone() }),
}
}
fn print_report(&self) {
println!("TSM Metadata Report:");
println!(" Valid Index Entries: {}", self.num_entries);
println!(" Organizations/Bucket Stats:");
for (k, stats) in &self.bucket_stats {
let (org_id, bucket_id) = k;
println!(
" ({}, {}) {} index entries, {} total records",
org_id, bucket_id, stats.count, stats.total_records
);
println!(" Measurements:");
stats.print_report(" ");
}
}
}

View File

@ -0,0 +1,206 @@
/// Module to handle input files (and maybe urls?) from dstool
use libflate::gzip;
use std::fs::File;
use std::io;
use std::io::{BufRead, BufReader, Cursor, Read, Seek, SeekFrom};
use std::path::Path;
use crate::error::{Error, Result};
pub enum FileType {
LineProtocol,
TSM,
}
// Interface for interacting with streams
pub enum InputReader {
FileInputType(FileInputReader),
MemoryInputType(MemoryInputReader),
}
// Contains a (file backed) reader to read raw uncompressed bytes
pub struct FileInputReader {
file_type: FileType,
file_size: usize,
reader: BufReader<std::fs::File>,
}
// Contains an in-memory reader...
pub struct MemoryInputReader {
file_type: FileType,
file_size: usize,
cursor: Cursor<Vec<u8>>,
}
impl FileInputReader {
fn new(file_type: FileType, input_name: &str) -> Result<FileInputReader> {
let file = File::open(input_name).map_err(|e| Error::UnableToReadInput {
name: String::from(input_name),
source: e,
})?;
let file_size = file
.metadata()
.map_err(|e| Error::UnableToReadInput {
name: String::from(input_name),
source: e,
})?
.len();
Ok(FileInputReader {
file_type,
file_size: file_size as usize,
reader: BufReader::new(file),
})
}
}
impl MemoryInputReader {
fn new(file_type: FileType, buffer: Vec<u8>) -> MemoryInputReader {
let len = buffer.len();
MemoryInputReader {
file_type,
file_size: len,
cursor: Cursor::new(buffer),
}
}
}
impl Seek for InputReader {
fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
match self {
InputReader::FileInputType(file_input_reader) => file_input_reader.reader.seek(pos),
InputReader::MemoryInputType(memory_input_reader) => {
memory_input_reader.cursor.seek(pos)
}
}
}
}
impl Read for InputReader {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
match self {
InputReader::FileInputType(file_input_reader) => file_input_reader.reader.read(buf),
InputReader::MemoryInputType(memory_input_reader) => {
memory_input_reader.cursor.read(buf)
}
}
}
}
impl BufRead for InputReader {
fn fill_buf(&mut self) -> io::Result<&[u8]> {
match self {
InputReader::FileInputType(file_input_reader) => file_input_reader.reader.fill_buf(),
InputReader::MemoryInputType(memory_input_reader) => {
memory_input_reader.cursor.fill_buf()
}
}
}
fn consume(&mut self, amt: usize) {
match self {
InputReader::FileInputType(file_input_reader) => file_input_reader.reader.consume(amt),
InputReader::MemoryInputType(memory_input_reader) => {
memory_input_reader.cursor.consume(amt)
}
}
}
}
impl InputReader {
pub fn file_type(&self) -> &FileType {
match self {
InputReader::FileInputType(file_input_reader) => &file_input_reader.file_type,
InputReader::MemoryInputType(memory_input_reader) => &memory_input_reader.file_type,
}
}
pub fn len(&self) -> usize {
match self {
InputReader::FileInputType(file_input_reader) => file_input_reader.file_size,
InputReader::MemoryInputType(memory_input_reader) => memory_input_reader.file_size,
}
}
// Create a new input reader suitable for reading from
// `input_name` and figures out the file input type based on
// heuristics (ahem, the filename extension)
pub fn new(input_name: &str) -> Result<InputReader> {
let path = Path::new(input_name);
// Initially simply use the file name's extension to determine
// the type; Maybe in the future we can be more clever and
// inspect contents.
let ext = path
.extension()
.ok_or(Error::UnknownInputType {
details: String::from("No extension"),
input_name: path.display().to_string(),
})?
.to_str()
.ok_or(Error::FileNameDecode {
input_name: path.display().to_string(),
})?;
match ext {
"tsm" => Ok(InputReader::FileInputType(FileInputReader::new(
FileType::TSM,
input_name,
)?)),
"lp" => Ok(InputReader::FileInputType(FileInputReader::new(
FileType::LineProtocol,
input_name,
)?)),
"gz" => {
let stem = Path::new(path.file_stem().unwrap());
let stem_ext = stem
.extension()
.ok_or(Error::UnknownInputType {
details: String::from("No extension before .gz"),
input_name: path.display().to_string(),
})?
.to_str()
.ok_or(Error::FileNameDecode {
input_name: path.display().to_string(),
})?;
let file = File::open(input_name).map_err(|e| Error::UnableToReadInput {
name: input_name.to_string(),
source: e,
})?;
let mut decoder =
gzip::Decoder::new(file).map_err(|gzip_err| Error::UnableToReadInput {
name: input_name.to_string(),
source: gzip_err,
})?;
let mut buffer = Vec::new();
decoder
.read_to_end(&mut buffer)
.map_err(|e| Error::ReadingGzip {
input_name: input_name.to_string(),
source: e,
})?;
match stem_ext {
"tsm" => Ok(InputReader::MemoryInputType(MemoryInputReader::new(
FileType::TSM,
buffer,
))),
"lp" => Ok(InputReader::MemoryInputType(MemoryInputReader::new(
FileType::LineProtocol,
buffer,
))),
_ => Err(Error::UnknownInputType {
details: String::from("Unknown input extension before .gz"),
input_name: input_name.to_string(),
}),
}
}
_ => Err(Error::UnknownInputType {
details: String::from("Unknown input extension"),
input_name: input_name.to_string(),
}),
}
}
}

View File

@ -1,67 +1,41 @@
#![deny(rust_2018_idioms)]
#![warn(missing_debug_implementations, clippy::explicit_iter_loop)]
use clap::{crate_authors, crate_version, App, Arg, SubCommand};
use delorean_ingest::LineProtocolConverter;
use delorean_line_parser::{parse_lines, ParsedLine};
use delorean_parquet::writer::{DeloreanTableWriter, Error as DeloreanTableWriterError};
use log::{debug, info, warn};
use snafu::{ResultExt, Snafu};
use std::fs;
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Error reading {} ({})", name.display(), source))]
UnableToReadInput {
name: std::path::PathBuf,
source: std::io::Error,
},
use clap::{crate_authors, crate_version, App, Arg, SubCommand};
UnableToCreateFile {
name: std::path::PathBuf,
source: std::io::Error,
},
use delorean_ingest::LineProtocolConverter;
use delorean_line_parser::{parse_lines, ParsedLine};
use delorean_parquet::writer::DeloreanTableWriter;
#[snafu(context(false))]
Conversion {
source: delorean_ingest::Error,
},
mod error;
mod file_meta;
mod input;
UnableToCreateTableWriter {
source: DeloreanTableWriterError,
},
use error::{Error, Result};
UnableToWriteSchemaSample {
source: DeloreanTableWriterError,
},
UnableToWriteGoodLines {
source: DeloreanTableWriterError,
},
UnableToCloseTableWriter {
source: DeloreanTableWriterError,
},
}
type Result<T, E = Error> = std::result::Result<T, E>;
use file_meta::dump_meta;
enum ReturnCode {
InternalError = 1,
ConversionFailed = 2,
MetadataDumpFailed = 3,
}
static SCHEMA_SAMPLE_SIZE: usize = 5;
fn convert(input_filename: &str, output_filename: &str) -> Result<()> {
info!("dstool starting");
info!("dstool convert starting");
debug!("Reading from input file {}", input_filename);
debug!("Writing to output file {}", output_filename);
// TODO: make a streaming parser that you can stream data through in blocks.
// for now, just read the whole input file into RAM...
let buf = fs::read_to_string(input_filename).context(UnableToReadInput {
name: input_filename,
let buf = fs::read_to_string(input_filename).map_err(|e| Error::UnableToReadInput {
name: String::from(input_filename),
source: e,
})?;
info!("Read {} bytes from {}", buf.len(), input_filename);
@ -83,21 +57,24 @@ fn convert(input_filename: &str, output_filename: &str) -> Result<()> {
debug!("Using schema deduced from sample: {:?}", converter.schema());
info!("Schema deduced. Writing output to {} ...", output_filename);
let output_file = fs::File::create(output_filename).context(UnableToCreateFile {
name: output_filename,
let output_file = fs::File::create(output_filename).map_err(|e| Error::UnableToCreateFile {
name: String::from(output_filename),
source: e,
})?;
let mut writer = DeloreanTableWriter::new(converter.schema(), output_file)
.context(UnableToCreateTableWriter)?;
.map_err(|e| Error::UnableToCreateTableWriter { source: e })?;
// Write the sample and then the remaining lines
writer
.write_batch(&converter.pack_lines(schema_sample.into_iter()))
.context(UnableToWriteSchemaSample)?;
.map_err(|e| Error::UnableToWriteSchemaSample { source: e })?;
writer
.write_batch(&converter.pack_lines(only_good_lines))
.context(UnableToWriteGoodLines)?;
writer.close().context(UnableToCloseTableWriter)?;
.map_err(|e| Error::UnableToWriteGoodLines { source: e })?;
writer
.close()
.map_err(|e| Error::UnableToCloseTableWriter { source: e })?;
info!("Completing writing {} successfully", output_filename);
Ok(())
}
@ -132,6 +109,16 @@ Examples:
.index(2),
),
)
.subcommand(
SubCommand::with_name("meta")
.about("Print out metadata information about a storage file")
.arg(
Arg::with_name("INPUT")
.help("The input filename to read from")
.required(true)
.index(1),
),
)
.arg(
Arg::with_name("verbose")
.short("v")
@ -160,6 +147,15 @@ Examples:
std::process::exit(ReturnCode::ConversionFailed as _)
}
}
} else if let Some(matches) = matches.subcommand_matches("meta") {
let input_filename = matches.value_of("INPUT").unwrap();
match dump_meta(&input_filename) {
Ok(()) => debug!("Metadata dump completed successfully"),
Err(e) => {
eprintln!("Metadata dump failed: {}", e);
std::process::exit(ReturnCode::MetadataDumpFailed as _)
}
}
} else {
eprintln!("Internal error: no command found");
std::process::exit(ReturnCode::InternalError as _);

View File

@ -1,7 +1,10 @@
mod dstool_tests {
use assert_cmd::Command;
use predicates::prelude::*;
use std::fs::File;
use assert_cmd::assert::Assert;
use assert_cmd::Command;
use libflate::gzip;
use predicates::prelude::*;
#[test]
fn convert_bad_input_filename() {
let mut cmd = Command::cargo_bin("dstool").unwrap();
@ -43,13 +46,144 @@ mod dstool_tests {
assert
.success()
.stderr(predicate::str::contains("dstool starting"))
.stderr(predicate::str::contains("dstool convert starting"))
.stderr(predicate::str::contains("Schema deduced"))
.stderr(predicate::str::contains(expected_success_string));
// TODO: add a dump command to dstool and verify that the dump
// of the written parquet file is as expected.
}
parquet_path.close().expect("deleting temporary file");
#[test]
fn meta_bad_input_filename() {
let mut cmd = Command::cargo_bin("dstool").unwrap();
let assert = cmd.arg("meta").arg("non_existent_input").assert();
assert
.failure()
.code(3)
.stderr(predicate::str::contains("Metadata dump failed"))
.stderr(predicate::str::contains(
"Metadata dump failed: Unknown input type: No extension for non_existent_input",
));
}
#[test]
fn meta_non_existent_input_filename() {
let mut cmd = Command::cargo_bin("dstool").unwrap();
let assert = cmd.arg("meta").arg("non_existent_input.tsm").assert();
assert
.failure()
.code(3)
.stderr(predicate::str::contains("Metadata dump failed"))
.stderr(predicate::str::contains(
"Metadata dump failed: Error reading non_existent_input.tsm",
));
}
#[test]
fn meta_bad_input_filename_gz() {
let mut cmd = Command::cargo_bin("dstool").unwrap();
let assert = cmd.arg("meta").arg("non_existent_input.gz").assert();
assert
.failure()
.code(3)
.stderr(predicate::str::contains("Metadata dump failed"))
.stderr(predicate::str::contains(
"Metadata dump failed: Unknown input type: No extension before .gz for non_existent_input.gz",
));
}
// gunzip's the contents of the file at input_path into a temporary path
fn uncompress_gz(
input_path: &str,
output_extension: &str,
) -> delorean_test_helpers::tempfile::TempPath {
let gz_file = File::open(input_path).expect("Error opening input");
let output_path = delorean_test_helpers::tempfile::Builder::new()
.prefix("dstool_decompressed_e2e")
.suffix(output_extension)
.tempfile()
.expect("error creating temp file")
.into_temp_path();
let mut output_file = File::create(&output_path).expect("error opening output");
let mut decoder = gzip::Decoder::new(gz_file).expect("error creating gzip decoder");
std::io::copy(&mut decoder, &mut output_file).expect("error copying stream");
output_path
}
/// Validates some of the metadata output content for this tsm file
fn assert_meta_000000000000005_000000002_tsm(assert: Assert) {
assert
.success()
.stdout(predicate::str::contains("TSM Metadata Report"))
.stdout(predicate::str::contains(
"(05c19117091a1000, 05c19117091a1001) 2159 index entries, 2159 total records",
))
.stdout(predicate::str::contains(
"task_scheduler_total_schedule_fails",
));
}
#[test]
fn meta_000000000000005_000000002_tsm() {
let input_tsm = uncompress_gz("../tests/fixtures/000000000000005-000000002.tsm.gz", ".tsm");
let mut cmd = Command::cargo_bin("dstool").unwrap();
let assert = cmd
.arg("meta")
.arg(input_tsm.to_string_lossy().to_string())
.assert();
assert_meta_000000000000005_000000002_tsm(assert);
}
#[test]
fn meta_000000000000005_000000002_tsm_gz() {
let mut cmd = Command::cargo_bin("dstool").unwrap();
let assert = cmd
.arg("meta")
.arg("../tests/fixtures/000000000000005-000000002.tsm.gz")
.assert();
assert_meta_000000000000005_000000002_tsm(assert);
}
/// Validates some of the metadata output content for this tsm file
fn assert_meta_cpu_usage_tsm(assert: Assert) {
assert
.success()
.stdout(predicate::str::contains("TSM Metadata Report"))
.stdout(predicate::str::contains(
"(05b4927b3fe38000, 05b4927b3fe38001) 2735 index entries, 2735 total records",
))
.stdout(predicate::str::contains(
"task_scheduler_total_schedule_fails",
));
}
#[test]
fn meta_cpu_usage_tsm() {
let input_tsm = uncompress_gz("../tests/fixtures/cpu_usage.tsm.gz", ".tsm");
let mut cmd = Command::cargo_bin("dstool").unwrap();
let assert = cmd
.arg("meta")
.arg(input_tsm.to_string_lossy().to_string())
.assert();
assert_meta_cpu_usage_tsm(assert);
}
#[test]
fn meta_cpu_usage_tsm_gz() {
let mut cmd = Command::cargo_bin("dstool").unwrap();
let assert = cmd
.arg("meta")
.arg("../tests/fixtures/cpu_usage.tsm.gz")
.assert();
assert_meta_cpu_usage_tsm(assert);
}
}

View File

@ -509,7 +509,7 @@ pub enum BlockData {
Unsigned { ts: Vec<i64>, values: Vec<u64> },
}
#[derive(Copy, Clone, Debug, PartialEq)]
#[derive(Copy, Clone, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)]
/// `InfluxID` represents an InfluxDB ID used in InfluxDB 2.x to represent
/// organization and bucket identifiers.
pub struct InfluxID(u64);