Merge pull request #198 from influxdata/cn/use-more-clap
commit
7d42a1db83
|
@ -45,7 +45,7 @@ impl DeloreanTableWriterSource for IgnoringParquetDirectoryWriterSource {
|
|||
fn next_writer(&mut self, schema: &Schema) -> Result<Box<dyn DeloreanTableWriter>, TableError> {
|
||||
let dev_null = IgnoringWriteStream {};
|
||||
let writer =
|
||||
DeloreanParquetTableWriter::new(schema, CompressionLevel::COMPATIBILITY, dev_null)
|
||||
DeloreanParquetTableWriter::new(schema, CompressionLevel::Compatibility, dev_null)
|
||||
.expect("Creating table writer");
|
||||
Ok(Box::new(writer))
|
||||
}
|
||||
|
|
|
@ -53,12 +53,12 @@ impl From<Error> for TableError {
|
|||
#[derive(Debug, Clone, Copy, PartialEq)]
|
||||
pub enum CompressionLevel {
|
||||
/// Minimize the size of the written parquet file
|
||||
MAXIMUM,
|
||||
Maximum,
|
||||
|
||||
// Attempt to maximize interoperability with other ecosystem tools.
|
||||
//
|
||||
// See https://github.com/influxdata/delorean/issues/184
|
||||
COMPATIBILITY,
|
||||
Compatibility,
|
||||
}
|
||||
|
||||
impl FromStr for CompressionLevel {
|
||||
|
@ -66,8 +66,8 @@ impl FromStr for CompressionLevel {
|
|||
|
||||
fn from_str(compression_level: &str) -> Result<Self, Self::Err> {
|
||||
match compression_level {
|
||||
"max" => Ok(Self::MAXIMUM),
|
||||
"compatibility" => Ok(Self::COMPATIBILITY),
|
||||
"max" => Ok(Self::Maximum),
|
||||
"compatibility" => Ok(Self::Compatibility),
|
||||
_ => UnknownCompressionLevel { compression_level }.fail(),
|
||||
}
|
||||
}
|
||||
|
@ -119,7 +119,7 @@ where
|
|||
/// output_file_name.push("example.parquet");
|
||||
/// let output_file = fs::File::create(output_file_name.as_path()).unwrap();
|
||||
///
|
||||
/// let compression_level = CompressionLevel::COMPATIBILITY;
|
||||
/// let compression_level = CompressionLevel::Compatibility;
|
||||
///
|
||||
/// let mut parquet_writer = DeloreanParquetTableWriter::new(
|
||||
/// &schema, compression_level, output_file)
|
||||
|
@ -353,18 +353,18 @@ fn set_integer_encoding(
|
|||
builder: WriterPropertiesBuilder,
|
||||
) -> WriterPropertiesBuilder {
|
||||
match compression_level {
|
||||
CompressionLevel::MAXIMUM => {
|
||||
CompressionLevel::Maximum => {
|
||||
debug!(
|
||||
"Setting encoding of {:?} col {} to DELTA_BINARY_PACKED (MAXIMUM)",
|
||||
"Setting encoding of {:?} col {} to DELTA_BINARY_PACKED (Maximum)",
|
||||
data_type, col_path
|
||||
);
|
||||
builder
|
||||
.set_column_encoding(col_path.clone(), Encoding::DELTA_BINARY_PACKED)
|
||||
.set_column_dictionary_enabled(col_path, false)
|
||||
}
|
||||
CompressionLevel::COMPATIBILITY => {
|
||||
CompressionLevel::Compatibility => {
|
||||
debug!(
|
||||
"Setting encoding of {:?} col {} to PLAIN/RLE (COMPATIBILITY)",
|
||||
"Setting encoding of {:?} col {} to PLAIN/RLE (Compatibility)",
|
||||
data_type, col_path
|
||||
);
|
||||
builder
|
||||
|
@ -520,12 +520,12 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn test_create_writer_props_maximum() {
|
||||
do_test_create_writer_props(CompressionLevel::MAXIMUM);
|
||||
do_test_create_writer_props(CompressionLevel::Maximum);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_create_writer_props_compatibility() {
|
||||
do_test_create_writer_props(CompressionLevel::COMPATIBILITY);
|
||||
do_test_create_writer_props(CompressionLevel::Compatibility);
|
||||
}
|
||||
|
||||
fn do_test_create_writer_props(compression_level: CompressionLevel) {
|
||||
|
@ -567,14 +567,14 @@ mod tests {
|
|||
|
||||
let int_field_colpath = ColumnPath::from("int_field");
|
||||
match compression_level {
|
||||
CompressionLevel::MAXIMUM => {
|
||||
CompressionLevel::Maximum => {
|
||||
assert_eq!(
|
||||
writer_props.encoding(&int_field_colpath),
|
||||
Some(Encoding::DELTA_BINARY_PACKED)
|
||||
);
|
||||
assert_eq!(writer_props.dictionary_enabled(&int_field_colpath), false);
|
||||
}
|
||||
CompressionLevel::COMPATIBILITY => {
|
||||
CompressionLevel::Compatibility => {
|
||||
assert_eq!(
|
||||
writer_props.encoding(&int_field_colpath),
|
||||
Some(Encoding::PLAIN)
|
||||
|
@ -602,7 +602,7 @@ mod tests {
|
|||
|
||||
let timestamp_field_colpath = ColumnPath::from("time");
|
||||
match compression_level {
|
||||
CompressionLevel::MAXIMUM => {
|
||||
CompressionLevel::Maximum => {
|
||||
assert_eq!(
|
||||
writer_props.encoding(×tamp_field_colpath),
|
||||
Some(Encoding::DELTA_BINARY_PACKED)
|
||||
|
@ -612,7 +612,7 @@ mod tests {
|
|||
false
|
||||
);
|
||||
}
|
||||
CompressionLevel::COMPATIBILITY => {
|
||||
CompressionLevel::Compatibility => {
|
||||
assert_eq!(
|
||||
writer_props.encoding(×tamp_field_colpath),
|
||||
Some(Encoding::PLAIN)
|
||||
|
@ -639,11 +639,11 @@ mod tests {
|
|||
fn compression_level() {
|
||||
assert_eq!(
|
||||
CompressionLevel::from_str("max").ok().unwrap(),
|
||||
CompressionLevel::MAXIMUM
|
||||
CompressionLevel::Maximum
|
||||
);
|
||||
assert_eq!(
|
||||
CompressionLevel::from_str("compatibility").ok().unwrap(),
|
||||
CompressionLevel::COMPATIBILITY
|
||||
CompressionLevel::Compatibility
|
||||
);
|
||||
|
||||
let bad = CompressionLevel::from_str("madxxxx");
|
||||
|
|
|
@ -68,7 +68,7 @@ fn test_write_parquet_data() {
|
|||
let output_file = fs::File::create(&output_path).expect("can't open temp file for writing");
|
||||
|
||||
let mut parquet_writer =
|
||||
DeloreanParquetTableWriter::new(&schema, CompressionLevel::COMPATIBILITY, output_file)
|
||||
DeloreanParquetTableWriter::new(&schema, CompressionLevel::Compatibility, output_file)
|
||||
.expect("can't create parquet writer");
|
||||
parquet_writer
|
||||
.write_batch(&packers)
|
||||
|
|
|
@ -13,7 +13,6 @@ use std::{
|
|||
fs,
|
||||
io::Read,
|
||||
path::{Path, PathBuf},
|
||||
str::FromStr,
|
||||
};
|
||||
|
||||
use crate::commands::input::{FileType, InputReader};
|
||||
|
@ -107,7 +106,7 @@ pub fn is_directory(p: impl AsRef<Path>) -> bool {
|
|||
pub fn convert(
|
||||
input_filename: &str,
|
||||
output_name: &str,
|
||||
compression_level_request: &str,
|
||||
compression_level: CompressionLevel,
|
||||
) -> Result<()> {
|
||||
info!("convert starting");
|
||||
debug!("Reading from input file {}", input_filename);
|
||||
|
@ -119,10 +118,6 @@ pub fn convert(
|
|||
input_filename
|
||||
);
|
||||
|
||||
// setup writing
|
||||
let compression_level = CompressionLevel::from_str(compression_level_request)
|
||||
.context(UnableToCreateParquetTableWriter)?;
|
||||
|
||||
match input_reader.file_type() {
|
||||
FileType::LineProtocol => convert_line_protocol_to_parquet(
|
||||
input_filename,
|
||||
|
|
13
src/main.rs
13
src/main.rs
|
@ -6,7 +6,8 @@
|
|||
clippy::use_self
|
||||
)]
|
||||
|
||||
use clap::{crate_authors, crate_version, App, Arg, SubCommand};
|
||||
use clap::{crate_authors, crate_version, value_t, App, Arg, SubCommand};
|
||||
use delorean_parquet::writer::CompressionLevel;
|
||||
use log::{debug, error, warn};
|
||||
|
||||
mod commands {
|
||||
|
@ -69,7 +70,9 @@ Examples:
|
|||
Arg::with_name("compression_level")
|
||||
.short("c")
|
||||
.long("compression-level")
|
||||
.help("Compression level: max or compatibility (default).")
|
||||
.help("How much to compress the output data. 'max' compresses the most; 'compatibility' compresses in a manner more likely to be readable by other tools.")
|
||||
.takes_value(true)
|
||||
.possible_values(&["max", "compatibility"])
|
||||
.default_value("compatibility"),
|
||||
),
|
||||
)
|
||||
|
@ -113,9 +116,9 @@ Examples:
|
|||
("convert", Some(sub_matches)) => {
|
||||
let input_filename = sub_matches.value_of("INPUT").unwrap();
|
||||
let output_filename = sub_matches.value_of("OUTPUT").unwrap();
|
||||
let compression_level = sub_matches.value_of("compression_level").unwrap();
|
||||
match commands::convert::convert(&input_filename, &output_filename, &compression_level)
|
||||
{
|
||||
let compression_level =
|
||||
value_t!(sub_matches, "compression_level", CompressionLevel).unwrap();
|
||||
match commands::convert::convert(&input_filename, &output_filename, compression_level) {
|
||||
Ok(()) => debug!("Conversion completed successfully"),
|
||||
Err(e) => {
|
||||
eprintln!("Conversion failed: {}", e);
|
||||
|
|
|
@ -57,15 +57,9 @@ fn convert_bad_compression_level() {
|
|||
.arg("/tmp")
|
||||
.assert();
|
||||
|
||||
assert
|
||||
.failure()
|
||||
.code(1)
|
||||
.stderr(predicate::str::contains(
|
||||
"Conversion failed: Error creating a parquet table writer",
|
||||
))
|
||||
.stderr(predicate::str::contains(
|
||||
r#"Unknown compression level 'maxxx'. Valid options 'max' or 'compatibility'"#,
|
||||
));
|
||||
assert.failure().code(1).stderr(predicate::str::contains(
|
||||
"error: 'maxxx' isn't a valid value for '--compression-level <compression_level>",
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
|
Loading…
Reference in New Issue