feat: add stats command for computing storage statistics (#160)

* feat: add stats command for computing storage statistics

* fix: Make function name less redundant

* fix: Apply suggestions from code review

Co-authored-by: Jake Goulding <jake.goulding@integer32.com>

* fix: remove changes to Cargo.lock

* fix: Cleanup to use `impl Into<String>` rather than String::from

* fix: fmt

* fix: less mut builder

* fix: fmt

* fix: clippy/fmt

* fix: fixup test

* fix: remove dstool reference

Co-authored-by: Jake Goulding <jake.goulding@integer32.com>
pull/24376/head
Andrew Lamb 2020-06-22 07:33:52 -04:00 committed by GitHub
parent ac153fa3d2
commit 506a7f19d5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 488 additions and 5 deletions

View File

@ -10,6 +10,7 @@ use std::io::{Read, Seek, SeekFrom};
pub mod error;
pub mod metadata;
pub mod stats;
pub mod writer;
/// Thing that adapts an object that implements Read+Seek to something

View File

@ -0,0 +1,133 @@
//! Provide storage statistics for parquet files
use std::collections::BTreeMap;
use std::convert::TryInto;
use std::io::{Read, Seek};
use log::debug;
use parquet::basic::{Compression, Encoding};
use parquet::file::reader::{FileReader, SerializedFileReader};
use delorean_table::stats::{ColumnStats, ColumnStatsBuilder};
use crate::{
error::{Error, Result},
metadata::data_type_from_parquet_type,
InputReaderAdapter,
};
/// Calculate storage statistics for a particular parquet file that can
/// be read from `input`, with a total size of `input_size` byes
///
/// Returns a Vec of ColumnStats, one for each column in the input
pub fn col_stats<R: 'static>(input: R, input_size: u64) -> Result<Vec<ColumnStats>>
where
R: Read + Seek,
{
let input_adapter = InputReaderAdapter::new(input, input_size);
let reader =
SerializedFileReader::new(input_adapter).map_err(|e| Error::ParquetLibraryError {
message: String::from("Creating parquet reader"),
source: e,
})?;
let mut stats_builders = BTreeMap::new();
let parquet_metadata = reader.metadata();
for (rg_idx, rg_metadata) in parquet_metadata.row_groups().iter().enumerate() {
debug!(
"Looking at Row Group [{}] (total uncompressed byte size {})",
rg_idx,
rg_metadata.total_byte_size()
);
for (cc_idx, cc_metadata) in rg_metadata.columns().iter().enumerate() {
let col_path = cc_metadata.column_path();
let builder = stats_builders.entry(col_path.string()).or_insert_with(|| {
let data_type = data_type_from_parquet_type(cc_metadata.column_type());
ColumnStatsBuilder::new(col_path.string(), cc_idx, data_type)
});
builder
.compression(&format!(
"Enc: {}, Comp: {}",
encoding_display(&cc_metadata.encodings()),
compression_display(&cc_metadata.compression()),
))
.add_rows(
cc_metadata
.num_values()
.try_into()
.expect("positive number of values"),
)
.add_compressed_bytes(
cc_metadata
.compressed_size()
.try_into()
.expect("positive compressed size"),
)
.add_uncompressed_bytes(
cc_metadata
.uncompressed_size()
.try_into()
.expect("positive uncompressed size"),
);
}
}
// now, marshal up all the results
let mut v = stats_builders
.into_iter()
.map(|(_k, b)| b.build())
.collect::<Vec<_>>();
// ensure output is not sorted by column name, but by column index
v.sort_by_key(|stats| stats.column_index);
Ok(v)
}
/// Create a more user friendly display of the encodings
fn encoding_display(encodings: &[Encoding]) -> String {
if encodings.iter().any(|&e| e == Encoding::RLE_DICTIONARY) {
// parquet represents "dictionary" encoding as [PLAIN,
// RLE_DICTIONARY, RLE] , which is somewhat confusing -- it means
// to point out that the dictionary page uses plain encoding,
// whereas the data page uses RLE encoding.
"Dictionary".into()
} else {
return format!("{:?}", encodings);
}
}
/// Create a user friendly display of the encodings
fn compression_display(compression: &Compression) -> String {
return format!("{}", compression);
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_encoding_display() {
assert_eq!(&encoding_display(&[Encoding::PLAIN]), "[PLAIN]");
assert_eq!(
&encoding_display(&[Encoding::PLAIN, Encoding::RLE]),
"[PLAIN, RLE]"
);
assert_eq!(
&encoding_display(&[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]),
"Dictionary"
);
assert_eq!(
&encoding_display(&[Encoding::DELTA_BYTE_ARRAY, Encoding::RLE_DICTIONARY]),
"Dictionary"
);
}
#[test]
fn test_compression_display() {
assert_eq!(&compression_display(&Compression::GZIP), "GZIP");
}
}

View File

@ -1,4 +1,5 @@
pub mod packers;
pub mod stats;
use snafu::Snafu;
@ -8,7 +9,9 @@ pub use packers::Packer;
#[derive(Snafu, Debug)]
pub enum Error {
#[snafu(display(r#"Data Error: {}"#, source))]
Data { source: Box<dyn std::error::Error> },
Data {
source: Box<dyn std::error::Error>,
},
#[snafu(display(r#"IO Error: {} ({})"#, message, source,))]
IO {
@ -17,7 +20,19 @@ pub enum Error {
},
#[snafu(display(r#"Other Error: {}"#, source))]
Other { source: Box<dyn std::error::Error> },
Other {
source: Box<dyn std::error::Error>,
},
#[snafu(display(r#"Column {:?} had mixed datatypes: {}"#, column_name, details))]
ColumnWithMixedTypes {
column_name: Option<String>,
details: String,
},
ColumnStatsBuilderError {
details: String,
},
}
/// Something that knows how to write a set of columns somewhere

212
delorean_table/src/stats.rs Normal file
View File

@ -0,0 +1,212 @@
//! Structures for computing and reporting on storage statistics
use delorean_table_schema::DataType;
use std::collections::BTreeSet;
/// Represents statistics for data stored in a particular chunk
#[derive(Debug, PartialEq, Eq)]
pub struct ColumnStats {
pub column_index: usize,
pub column_name: String,
pub compression_description: String,
pub num_rows: u64,
pub num_compressed_bytes: u64,
/// "uncompressed" means how large the data is after decompression
/// (e.g. GZIP) not the raw (decoded) size
pub num_uncompressed_bytes: u64,
pub data_type: DataType,
}
/// Represents statistics for data stored in a particular chunk
///
/// # Example:
/// ```
/// use delorean_table_schema::DataType;
/// use delorean_table::stats::ColumnStatsBuilder;
///
/// let stats = ColumnStatsBuilder::new("My Column", 3, DataType::Float)
/// .compression("GZIP")
/// .add_rows(3)
/// .compression("SNAPPY")
/// .add_rows(7)
/// .build();
///
/// assert_eq!(stats.compression_description, r#"{"GZIP", "SNAPPY"}"#);
/// assert_eq!(stats.num_rows, 10);
/// ```
#[derive(Debug)]
pub struct ColumnStatsBuilder {
column_index: usize,
column_name: String,
compression_descriptions: BTreeSet<String>,
num_rows: u64,
num_compressed_bytes: u64,
num_uncompressed_bytes: u64,
data_type: DataType,
}
impl ColumnStatsBuilder {
pub fn new(
column_name: impl Into<String>,
column_index: usize,
data_type: DataType,
) -> ColumnStatsBuilder {
ColumnStatsBuilder {
column_name: column_name.into(),
column_index,
compression_descriptions: BTreeSet::new(),
num_rows: 0,
num_compressed_bytes: 0,
num_uncompressed_bytes: 0,
data_type,
}
}
/// Add a compression description to this column. As there may be
/// several all compression descriptions that apply to any single column,
/// the builder tracks a list of descriptions.
pub fn compression(&mut self, compression: &str) -> &mut Self {
if !self.compression_descriptions.contains(compression) {
self.compression_descriptions.insert(compression.into());
}
self
}
/// Adds `row_count` to the running row count
pub fn add_rows(&mut self, row_count: u64) -> &mut Self {
self.num_rows += row_count;
self
}
/// Adds `byte_count` to the running compressed byte count
pub fn add_compressed_bytes(&mut self, byte_count: u64) -> &mut Self {
self.num_compressed_bytes += byte_count;
self
}
/// Adds `byte_count` to the running uncompressed byte count
pub fn add_uncompressed_bytes(&mut self, byte_count: u64) -> &mut Self {
self.num_uncompressed_bytes += byte_count;
self
}
/// Create the resulting ColumnStats
pub fn build(&self) -> ColumnStats {
ColumnStats {
column_name: self.column_name.clone(),
column_index: self.column_index,
compression_description: format!("{:?}", self.compression_descriptions),
num_rows: self.num_rows,
num_compressed_bytes: self.num_compressed_bytes,
num_uncompressed_bytes: self.num_uncompressed_bytes,
data_type: self.data_type,
}
}
}
#[cfg(test)]
mod test {
use super::*;
impl Default for ColumnStats {
fn default() -> Self {
ColumnStats {
column_index: 0,
column_name: String::from(""),
compression_description: String::from("{}"),
num_rows: 0,
num_compressed_bytes: 0,
num_uncompressed_bytes: 0,
data_type: DataType::Float,
}
}
}
#[test]
fn stats_builder_create() {
let stats = ColumnStatsBuilder::new("My Column", 7, DataType::Integer).build();
assert_eq!(
stats,
ColumnStats {
column_index: 7,
column_name: String::from("My Column"),
data_type: DataType::Integer,
..Default::default()
}
);
}
#[test]
fn stats_builder_compression() {
let stats = ColumnStatsBuilder::new("My Column", 3, DataType::Float)
.compression("GZIP")
.compression("DEFLATE")
.compression("GZIP")
.build();
assert_eq!(
stats,
ColumnStats {
column_index: 3,
column_name: String::from("My Column"),
compression_description: String::from(r#"{"DEFLATE", "GZIP"}"#),
..Default::default()
}
);
}
#[test]
fn stats_builder_add_rows() {
let stats = ColumnStatsBuilder::new("My Column", 3, DataType::Float)
.add_rows(7)
.add_rows(3)
.build();
assert_eq!(
stats,
ColumnStats {
column_index: 3,
column_name: String::from("My Column"),
num_rows: 10,
..Default::default()
}
);
}
#[test]
fn stats_builder_add_compressed_bytes() {
let stats = ColumnStatsBuilder::new("My Column", 3, DataType::Float)
.add_compressed_bytes(7)
.add_compressed_bytes(3)
.build();
assert_eq!(
stats,
ColumnStats {
column_index: 3,
column_name: String::from("My Column"),
num_compressed_bytes: 10,
..Default::default()
}
);
}
#[test]
fn stats_builder_add_uncompressed_bytes() {
let stats = ColumnStatsBuilder::new("My Column", 3, DataType::Float)
.add_uncompressed_bytes(7)
.add_uncompressed_bytes(3)
.build();
assert_eq!(
stats,
ColumnStats {
column_index: 3,
column_name: "My Column".into(),
num_uncompressed_bytes: 10,
..Default::default()
}
);
}
}

View File

@ -50,7 +50,7 @@ impl Tag {
}
}
#[derive(Debug, PartialEq, Clone, Copy)]
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
/// Line Protocol Data Types as defined in [the InfluxData documentation][influx]
///
/// [influx]: https://docs.influxdata.com/influxdb/v1.8/write_protocols/line_protocol_tutorial/#data-types

View File

@ -11,7 +11,6 @@ use std::path::{Path, PathBuf};
use crate::commands::error::{Error, Result};
use crate::commands::input::{FileType, InputReader};
/// Creates `DeloreanParquetTableWriter` suitable for writing to a single file
#[derive(Debug)]
struct ParquetFileWriterSource {

78
src/commands/stats.rs Normal file
View File

@ -0,0 +1,78 @@
//! This module contains code to report compression statistics for storage files
use log::info;
use crate::{
commands::error::{Error, Result},
commands::input::{FileType, InputReader},
};
use delorean_parquet::stats::col_stats;
/// Print statistics about the file name in input_filename to stdout
pub fn stats(input_filename: &str) -> Result<()> {
info!("stats starting");
let input_reader = InputReader::new(input_filename)?;
let (input_len, col_stats) = match input_reader.file_type() {
FileType::LineProtocol => {
return Err(Error::NotImplemented {
operation_name: String::from("Line protocol storage statistics"),
});
}
FileType::TSM => {
return Err(Error::NotImplemented {
operation_name: String::from("TSM storage statistics"),
})
}
FileType::Parquet => {
let input_len = input_reader.len();
(
input_len,
col_stats(input_reader, input_len)
.map_err(|e| Error::UnableDumpToParquetMetadata { source: e })?,
)
}
};
let mut total_rows = 0;
println!("Storage statistics:");
let num_cols = col_stats.len();
for c in col_stats {
println!("Column Stats '{}' [{}]", c.column_name, c.column_index);
println!(
" Total rows: {}, DataType: {:?}, Compression: {}",
c.num_rows, c.data_type, c.compression_description
);
println!(
" Compressed/Uncompressed Bytes: ({:8}/{:8}) {:.4} bits per row",
c.num_compressed_bytes,
c.num_uncompressed_bytes,
8.0 * (c.num_compressed_bytes as f64) / (c.num_rows as f64)
);
if total_rows == 0 {
total_rows = c.num_rows;
}
assert_eq!(
total_rows, c.num_rows,
"Internal error: columns had different numbers of rows {}, {}",
total_rows, c.num_rows
);
}
println!();
println!(
"{}: total columns/rows/bytes: ({:8}/{:8}/{:8}) {:.4} bits per row",
input_filename,
num_cols,
total_rows,
input_len,
8.0 * (input_len as f64) / (total_rows as f64)
);
Ok(())
}

View File

@ -9,6 +9,7 @@ mod commands {
mod error;
pub mod file_meta;
mod input;
pub mod stats;
}
mod rpc;
mod server;
@ -16,7 +17,8 @@ mod server;
enum ReturnCode {
ConversionFailed = 1,
MetadataDumpFailed = 2,
ServerExitedAbnormally = 3,
StatsFailed = 3,
ServerExitedAbnormally = 4,
}
fn main() {
@ -34,6 +36,9 @@ Examples:
# Dumps metadata information about 000000000013.tsm to stdout
delorean meta 000000000013.tsm
# Dumps storage statistics about out.parquet to stdout
delorean stats out.parquet
"#;
let matches = App::new(help)
@ -67,6 +72,16 @@ Examples:
.index(1),
),
)
.subcommand(
SubCommand::with_name("stats")
.about("Print out storage statistics information about a file to stdout")
.arg(
Arg::with_name("INPUT")
.help("The input filename to read from")
.required(true)
.index(1),
),
)
.subcommand(SubCommand::with_name("server").about("Runs in server mode (default)"))
.arg(
Arg::with_name("verbose")
@ -105,6 +120,16 @@ Examples:
}
}
}
("stats", Some(sub_matches)) => {
let input_filename = sub_matches.value_of("INPUT").unwrap();
match commands::stats::stats(&input_filename) {
Ok(()) => debug!("Storage statistics dump completed successfully"),
Err(e) => {
eprintln!("Stats dump failed: {}", e);
std::process::exit(ReturnCode::StatsFailed as _)
}
}
}
("server", Some(_)) | (_, _) => {
println!("Staring delorean server...");
match server::main() {

View File

@ -345,3 +345,23 @@ fn meta_temperature_parquet() {
assert_meta_temperature_parquet(assert);
}
#[test]
fn stats_temperature_parquet() {
let mut cmd = Command::cargo_bin("delorean").unwrap();
let assert = cmd
.arg("stats")
.arg("tests/fixtures/parquet/temperature.parquet")
.assert();
assert
.success()
.stdout(predicate::str::contains("Storage statistics:"))
.stdout(predicate::str::contains(
r#"Column Stats 'state' [1]
Total rows: 6, DataType: String, Compression: {"Enc: Dictionary, Comp: GZIP"}
Compressed/Uncompressed Bytes: ( 90/ 52) 120.0000 bits per row"#))
.stdout(predicate::str::contains(
"temperature.parquet: total columns/rows/bytes: ( 5/ 6/ 1128) 1504.0000 bits per row"
));
}