chore: cleanup legacy TSM -> parquet code (#1639)

* chore: cleanup legacy parquet code

* chore: remove tests of removed functionality

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Raphael Taylor-Davies 2021-06-07 13:59:33 +01:00 committed by GitHub
parent afe88eeb7c
commit 5749a2c119
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 2 additions and 4969 deletions

18
Cargo.lock generated
View File

@ -1672,7 +1672,6 @@ dependencies = [
"influxdb_iox_client",
"influxdb_line_protocol",
"influxdb_tsm",
"ingest",
"internal_types",
"itertools 0.9.0",
"logfmt",
@ -1763,23 +1762,6 @@ dependencies = [
"test_helpers",
]
[[package]]
name = "ingest"
version = "0.1.0"
dependencies = [
"arrow",
"flate2",
"influxdb_line_protocol",
"influxdb_tsm",
"internal_types",
"observability_deps",
"packers",
"parking_lot",
"parquet",
"snafu",
"test_helpers",
]
[[package]]
name = "instant"
version = "0.1.9"

View File

@ -19,7 +19,6 @@ members = [
"influxdb_iox_client",
"influxdb_line_protocol",
"influxdb_tsm",
"ingest",
"internal_types",
"logfmt",
"mem_qe",
@ -53,7 +52,6 @@ influxdb_iox_client = { path = "influxdb_iox_client", features = ["format"] }
influxdb_line_protocol = { path = "influxdb_line_protocol" }
influxdb_tsm = { path = "influxdb_tsm" }
internal_types = { path = "internal_types" }
ingest = { path = "ingest" }
logfmt = { path = "logfmt" }
metrics = { path = "metrics" }
mutable_buffer = { path = "mutable_buffer" }
@ -141,10 +139,6 @@ harness = false
name = "mapper"
harness = false
[[bench]]
name = "line_protocol_to_parquet"
harness = false
[[bench]]
name = "packers"
harness = false

View File

@ -1,88 +0,0 @@
use criterion::{criterion_group, criterion_main, Criterion, Throughput};
use influxdb_line_protocol::parse_lines;
use ingest::{
parquet::writer::{CompressionLevel, IOxParquetTableWriter},
ConversionSettings, LineProtocolConverter,
};
use internal_types::schema::Schema;
use packers::{Error as TableError, IOxTableWriter, IOxTableWriterSource};
use std::time::Duration;
use parquet::file::writer::TryClone;
use std::io::{Seek, SeekFrom, Write};
static LINES: &str = include_str!("../tests/fixtures/lineproto/metrics.lp");
/// Stream that throws away all output
struct IgnoringWriteStream {}
impl Write for IgnoringWriteStream {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
Ok(buf.len())
}
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}
impl Seek for IgnoringWriteStream {
fn seek(&mut self, _pos: SeekFrom) -> std::io::Result<u64> {
Ok(0)
}
}
impl TryClone for IgnoringWriteStream {
fn try_clone(&self) -> std::result::Result<Self, std::io::Error> {
Ok(IgnoringWriteStream {})
}
}
#[derive(Debug)]
/// Creates parquet writers that write to /dev/null
struct IgnoringParquetDirectoryWriterSource {}
impl IOxTableWriterSource for IgnoringParquetDirectoryWriterSource {
fn next_writer(&mut self, schema: &Schema) -> Result<Box<dyn IOxTableWriter>, TableError> {
let dev_null = IgnoringWriteStream {};
let writer = IOxParquetTableWriter::new(schema, CompressionLevel::Compatibility, dev_null)
.expect("Creating table writer");
Ok(Box::new(writer))
}
}
fn line_parser(c: &mut Criterion) {
let mut group = c.benchmark_group("line_protocol_to_parquet");
// Expand dataset by 10x to amortize writer setup and column writer overhead
const NUM_COPIES: usize = 10;
let mut input = String::with_capacity(NUM_COPIES * (LINES.len() + 1));
for _ in 0..NUM_COPIES {
input.push_str(LINES);
input.push('\n');
}
group.throughput(Throughput::Bytes(input.len() as u64));
group.measurement_time(Duration::from_secs(30));
group.sample_size(20);
group.bench_function("all lines", |b| {
b.iter(|| {
let settings = ConversionSettings::default();
let only_good_lines = parse_lines(&input).map(|r| r.expect("Unexpected parse error"));
let writer_source = Box::new(IgnoringParquetDirectoryWriterSource {});
let mut converter = LineProtocolConverter::new(settings, writer_source);
converter
.convert(only_good_lines)
.expect("Converting all lines");
converter.finalize().expect("Finalizing writer");
})
});
group.finish();
}
criterion_group!(benches, line_parser);
criterion_main!(benches);

View File

@ -1,20 +0,0 @@
[package]
name = "ingest"
version = "0.1.0"
authors = ["Andrew Lamb <andrew@nerdnetworks.org>"]
edition = "2018"
[dependencies] # In alphabetical order
arrow = { version = "4.0", features = ["prettyprint"] }
influxdb_line_protocol = { path = "../influxdb_line_protocol" }
influxdb_tsm = { path = "../influxdb_tsm" }
internal_types = { path = "../internal_types" }
packers = { path = "../packers" }
parquet = "4.0"
snafu = "0.6.2"
observability_deps = { path = "../observability_deps" }
[dev-dependencies] # In alphabetical order
flate2 = "1.0"
parking_lot = "0.11.1"
test_helpers = { path = "../test_helpers" }

File diff suppressed because it is too large Load Diff

View File

@ -1,12 +0,0 @@
//! This module contains code for writing / reading data to parquet.
#![warn(
missing_copy_implementations,
missing_debug_implementations,
clippy::explicit_iter_loop,
clippy::use_self
)]
pub mod error;
pub mod metadata;
pub mod stats;
pub mod writer;

View File

@ -1,16 +0,0 @@
use snafu::Snafu;
use parquet::errors::ParquetError;
#[derive(Debug, Snafu)]
pub enum IOxParquetError {
#[snafu(display(r#"{}, underlying parquet error {}"#, message, source))]
#[snafu(visibility(pub(crate)))]
ParquetLibraryError {
message: String,
source: ParquetError,
},
Unsupported,
}
pub type Result<T, E = IOxParquetError> = std::result::Result<T, E>;

View File

@ -1,110 +0,0 @@
//! Provide storage statistics for parquet files
use super::error::{ParquetLibraryError, Result};
use arrow::datatypes::DataType;
use parquet::{
file::reader::{ChunkReader, FileReader, SerializedFileReader},
schema,
};
use snafu::ResultExt;
pub fn parquet_schema_as_string(parquet_schema: &schema::types::Type) -> String {
let mut parquet_schema_string = Vec::new();
schema::printer::print_schema(&mut parquet_schema_string, parquet_schema);
String::from_utf8_lossy(&parquet_schema_string).to_string()
}
/// Maps from parquet types to table schema types
pub fn data_type_from_parquet_type(parquet_type: parquet::basic::Type) -> DataType {
use parquet::basic::Type::*;
match parquet_type {
BOOLEAN => DataType::Boolean,
INT64 => DataType::Int64,
DOUBLE => DataType::Float64,
BYTE_ARRAY => DataType::Utf8,
_ => {
unimplemented!("Unsupported parquet datatype: {:?}", parquet_type);
}
}
}
/// Print parquet metadata that can be read from `input`, with a total
/// size of `input_size` byes
pub fn print_parquet_metadata<R: 'static>(input: R) -> Result<()>
where
R: ChunkReader,
{
let input_len = input.len();
let reader = SerializedFileReader::new(input).context(ParquetLibraryError {
message: "Creating parquet reader",
})?;
let parquet_metadata = reader.metadata();
let file_metadata = parquet_metadata.file_metadata();
let num_columns = file_metadata.schema_descr().num_columns();
println!("Parquet file size: {} bytes", input_len);
println!(
"Parquet file Schema: {}",
parquet_schema_as_string(file_metadata.schema()).trim_end()
);
println!("Parquet file metadata:");
println!(" row groups: {}", parquet_metadata.num_row_groups());
if let Some(created_by) = file_metadata.created_by() {
println!(" created by: {:?}", created_by);
}
println!(" version: {}", file_metadata.version());
println!(" num_rows: {}", file_metadata.num_rows());
println!(" num_columns: {}", num_columns);
let kv_meta_len = match file_metadata.key_value_metadata() {
Some(v) => v.len(),
None => 0,
};
println!(" key_value_meta.len: {:?}", kv_meta_len);
if let Some(column_orders) = file_metadata.column_orders() {
println!(" column_orders:");
for (idx, col) in column_orders.iter().enumerate() {
println!(" column_order[{}]: {:?}", idx, col.sort_order());
}
}
println!("Row groups:");
for (rg_idx, rg_metadata) in parquet_metadata.row_groups().iter().enumerate() {
println!(" Row Group [{}]:", rg_idx);
println!(
" total uncompressed byte size: {}",
rg_metadata.total_byte_size()
);
for (cc_idx, cc_metadata) in rg_metadata.columns().iter().enumerate() {
println!(" Column Chunk [{}]:", cc_idx);
println!(" file_offset: {}", cc_metadata.file_offset());
println!(" column_type: {:?}", cc_metadata.column_type());
println!(" column_path: {}", cc_metadata.column_path().string());
println!(" num_values: {}", cc_metadata.num_values());
println!(" encodings: {:?}", cc_metadata.encodings());
println!(" compression: {:?}", cc_metadata.compression());
println!(" compressed_size: {}", cc_metadata.compressed_size());
println!(" uncompressed_size: {}", cc_metadata.uncompressed_size());
println!(" data_page_offset: {}", cc_metadata.data_page_offset());
println!(" has_index_page: {}", cc_metadata.has_index_page());
if let Some(index_page_offset) = cc_metadata.index_page_offset() {
println!(" index_page_offset: {}", index_page_offset);
}
println!(
" has_dictionary_page: {}",
cc_metadata.has_dictionary_page()
);
if let Some(dictionary_page_offset) = cc_metadata.dictionary_page_offset() {
println!(" dictionary_page_offset: {}", dictionary_page_offset);
}
if let Some(statistics) = cc_metadata.statistics() {
println!(" statistics: {:?}", statistics);
} else {
println!(" NO STATISTICS");
}
}
}
Ok(())
}

View File

@ -1,131 +0,0 @@
//! Provide storage statistics for parquet files
use observability_deps::tracing::debug;
use packers::{
stats::{ColumnStatsBuilder, FileStats, FileStatsBuilder},
Name,
};
use parquet::{
basic::{Compression, Encoding},
file::reader::{ChunkReader, FileReader, SerializedFileReader},
};
use snafu::ResultExt;
use std::{collections::BTreeMap, convert::TryInto};
use super::{
error::{ParquetLibraryError, Result},
metadata::data_type_from_parquet_type,
};
/// Calculate storage statistics for a particular parquet "file" that can
/// be read from `input`, with a total size of `input_size` byes
///
/// Returns a `FileStats` object representing statistics for all
/// columns across all column chunks.
pub fn file_stats<R: 'static>(input: R) -> Result<FileStats>
where
R: ChunkReader + Name,
{
let mut file_stats_builder = FileStatsBuilder::new(&input.name(), input.len());
let reader = SerializedFileReader::new(input).context(ParquetLibraryError {
message: "Creating parquet reader",
})?;
let mut stats_builders: BTreeMap<String, ColumnStatsBuilder> = BTreeMap::new();
let parquet_metadata = reader.metadata();
for (rg_idx, rg_metadata) in parquet_metadata.row_groups().iter().enumerate() {
debug!(
"Looking at Row Group [{}] (total uncompressed byte size {})",
rg_idx,
rg_metadata.total_byte_size()
);
for (cc_idx, cc_metadata) in rg_metadata.columns().iter().enumerate() {
let col_path = cc_metadata.column_path();
let builder = stats_builders
.remove(&col_path.string())
.unwrap_or_else(|| {
let data_type = data_type_from_parquet_type(cc_metadata.column_type());
ColumnStatsBuilder::new(col_path.string(), cc_idx, data_type)
})
.compression(&format!(
"Enc: {}, Comp: {}",
encoding_display(&cc_metadata.encodings()),
compression_display(&cc_metadata.compression()),
))
.add_rows(
cc_metadata
.num_values()
.try_into()
.expect("positive number of values"),
)
.add_compressed_bytes(
cc_metadata
.compressed_size()
.try_into()
.expect("positive compressed size"),
)
.add_uncompressed_bytes(
cc_metadata
.uncompressed_size()
.try_into()
.expect("positive uncompressed size"),
);
// put the builder back in the map
stats_builders.insert(col_path.string(), builder);
}
}
// now, marshal up all the results
for (_k, b) in stats_builders {
file_stats_builder = file_stats_builder.add_column(b.build());
}
Ok(file_stats_builder.build())
}
/// Create a more user friendly display of the encodings
fn encoding_display(encodings: &[Encoding]) -> String {
if encodings.iter().any(|&e| e == Encoding::RLE_DICTIONARY) {
// parquet represents "dictionary" encoding as [PLAIN,
// RLE_DICTIONARY, RLE] , which is somewhat confusing -- it means
// to point out that the dictionary page uses plain encoding,
// whereas the data page uses RLE encoding.
"Dictionary".into()
} else {
return format!("{:?}", encodings);
}
}
/// Create a user friendly display of the encodings
fn compression_display(compression: &Compression) -> String {
return format!("{}", compression);
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_encoding_display() {
assert_eq!(&encoding_display(&[Encoding::PLAIN]), "[PLAIN]");
assert_eq!(
&encoding_display(&[Encoding::PLAIN, Encoding::RLE]),
"[PLAIN, RLE]"
);
assert_eq!(
&encoding_display(&[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]),
"Dictionary"
);
assert_eq!(
&encoding_display(&[Encoding::DELTA_BYTE_ARRAY, Encoding::RLE_DICTIONARY]),
"Dictionary"
);
}
#[test]
fn test_compression_display() {
assert_eq!(&compression_display(&Compression::GZIP), "GZIP");
}
}

View File

@ -1,709 +0,0 @@
//! This module contains the code to write table data to parquet
use internal_types::schema::{InfluxColumnType, InfluxFieldType, Schema};
use observability_deps::tracing::{debug, log::warn};
use parquet::file::writer::ParquetWriter;
use parquet::{
self,
basic::{
Compression, Encoding, IntType, LogicalType, Repetition, TimeUnit, TimestampType,
Type as PhysicalType,
},
errors::ParquetError,
file::{
properties::{WriterProperties, WriterPropertiesBuilder},
writer::{FileWriter, SerializedFileWriter, TryClone},
},
schema::types::{ColumnPath, Type},
};
use snafu::{OptionExt, ResultExt, Snafu};
use std::{
fmt,
io::{Seek, Write},
str::FromStr,
sync::Arc,
};
use super::metadata::parquet_schema_as_string;
use packers::{Error as TableError, IOxTableWriter, Packers};
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display(r#"{}, underlying parqet error {}"#, message, source))]
ParquetLibraryError {
message: String,
source: ParquetError,
},
#[snafu(display(r#"Could not get packer for column {}"#, column_number))]
MismatchedColumns { column_number: usize },
#[snafu(display(
r#"Unknown compression level '{}'. Valid options 'max' or 'compatibility'"#,
compression_level
))]
UnknownCompressionLevel { compression_level: String },
#[snafu(display(r#"Unsupported datatype for parquet writing: {:?}"#, data_type,))]
UnsupportedDataType { data_type: String },
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
impl From<Error> for TableError {
fn from(other: Error) -> Self {
Self::Data {
source: Box::new(other),
}
}
}
/// Specify the desired compression level when writing parquet files
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum CompressionLevel {
/// Minimize the size of the written parquet file
Maximum,
// Attempt to maximize interoperability with other ecosystem tools.
//
// See https://github.com/influxdata/influxdb_iox/issues/184
Compatibility,
}
impl FromStr for CompressionLevel {
type Err = Error;
fn from_str(compression_level: &str) -> Result<Self, Self::Err> {
match compression_level {
"max" => Ok(Self::Maximum),
"compatibility" => Ok(Self::Compatibility),
_ => UnknownCompressionLevel { compression_level }.fail(),
}
}
}
/// A `IOxParquetTableWriter` is used for writing batches of rows
/// parquet files.
pub struct IOxParquetTableWriter<W>
where
W: ParquetWriter,
{
parquet_schema: Arc<parquet::schema::types::Type>,
file_writer: SerializedFileWriter<W>,
}
impl<W: 'static> IOxParquetTableWriter<W>
where
W: Write + Seek + TryClone,
{
/// Create a new TableWriter that writes its rows to something
/// that implements the trait (e.g. std::File). For example:
///
/// ```
/// # use std::fs;
/// # use internal_types::schema::{builder::SchemaBuilder, InfluxFieldType};
/// # use packers::IOxTableWriter;
/// # use packers::{Packer, Packers};
/// # use ingest::parquet::writer::{IOxParquetTableWriter, CompressionLevel};
/// # use parquet::data_type::ByteArray;
///
/// let schema = SchemaBuilder::new()
/// .measurement("measurement_name")
/// .tag("tag1")
/// .influx_field("field1", InfluxFieldType::Integer)
/// .timestamp()
/// .build()
/// .unwrap();
///
/// let mut packers: Vec<Packers> = vec![
/// Packers::Bytes(Packer::new()), // 0: tag1
/// Packers::Integer(Packer::new()), // 1: field1
/// Packers::Integer(Packer::new()), // 2: timestamp
/// ];
///
/// packers[0].bytes_packer_mut().push(ByteArray::from("tag1")); // tag1 val
/// packers[1].i64_packer_mut().push(100); // field1 val
/// packers[2].push_none(); // no timestamp
///
/// // Write to '/tmp/example.parquet'
/// let mut output_file_name = std::env::temp_dir();
/// output_file_name.push("example.parquet");
/// let output_file = fs::File::create(output_file_name.as_path()).unwrap();
///
/// let compression_level = CompressionLevel::Compatibility;
///
/// let mut parquet_writer = IOxParquetTableWriter::new(
/// &schema, compression_level, output_file)
/// .unwrap();
///
/// // write the actual data to parquet
/// parquet_writer.write_batch(&packers).unwrap();
///
/// // Closing the writer closes the data and the file
/// parquet_writer.close().unwrap();
///
/// # std::fs::remove_file(output_file_name);
/// ```
pub fn new(
schema: &Schema,
compression_level: CompressionLevel,
writer: W,
) -> Result<Self, Error> {
let writer_props = create_writer_props(&schema, compression_level);
let parquet_schema = convert_to_parquet_schema(&schema)?;
let file_writer =
SerializedFileWriter::new(writer, Arc::clone(&parquet_schema), writer_props).context(
ParquetLibraryError {
message: String::from("Error trying to create a SerializedFileWriter"),
},
)?;
let parquet_writer = Self {
parquet_schema,
file_writer,
};
debug!(
"ParqutWriter created for schema: {}",
parquet_schema_as_string(&parquet_writer.parquet_schema)
);
Ok(parquet_writer)
}
}
impl<W: 'static> IOxTableWriter for IOxParquetTableWriter<W>
where
W: Write + Seek + TryClone,
{
/// Writes a batch of packed data to the output file in a single
/// column chunk
///
/// TODO: better control of column chunks
fn write_batch(&mut self, packers: &[Packers]) -> Result<(), TableError> {
// now write out the data
let mut row_group_writer =
self.file_writer
.next_row_group()
.context(ParquetLibraryError {
message: String::from("Error creating next row group writer"),
})?;
use parquet::column::writer::ColumnWriter::*;
let mut column_number = 0;
while let Some(mut col_writer) =
row_group_writer
.next_column()
.context(ParquetLibraryError {
message: String::from("Can't create the next row_group_writer"),
})?
{
let packer = packers
.get(column_number)
.context(MismatchedColumns { column_number })
.map_err(TableError::from_other)?;
// TODO(edd) This seems super awkward and not the right way to do it...
// We know we have a direct mapping between a col_writer (ColumnWriter)
// type and a Packers variant. We also know that we do exactly the same
// work for each variant (we just dispatch to the writ_batch method)
// on the column write.
//
// I think this match could be so much shorter but not sure how yet.
match col_writer {
BoolColumnWriter(ref mut w) => {
let p = packer.bool_packer();
let n = w
.write_batch(&p.some_values(), Some(&p.def_levels()), None)
.context(ParquetLibraryError {
message: String::from("Can't write_batch with bool values"),
})?;
debug!("Wrote {} rows of bool data", n);
}
Int32ColumnWriter(_) => unreachable!("ParquetWriter does not support INT32 data"),
Int64ColumnWriter(ref mut w) => {
let p = packer.i64_packer();
let n = w
.write_batch(&p.some_values(), Some(&p.def_levels()), None)
.context(ParquetLibraryError {
message: String::from("Can't write_batch with int64 values"),
})?;
debug!("Wrote {} rows of int64 data", n);
}
Int96ColumnWriter(_) => unreachable!("ParquetWriter does not support INT96 data"),
FloatColumnWriter(_) => {
unreachable!("ParquetWriter does not support FLOAT (32-bit float) data")
}
DoubleColumnWriter(ref mut w) => {
let p = packer.f64_packer();
let n = w
.write_batch(&p.some_values(), Some(&p.def_levels()), None)
.context(ParquetLibraryError {
message: String::from("Can't write_batch with f64 values"),
})?;
debug!("Wrote {} rows of f64 data", n);
}
ByteArrayColumnWriter(ref mut w) => {
let p = packer.bytes_packer();
let n = w
.write_batch(&p.some_values(), Some(&p.def_levels()), None)
.context(ParquetLibraryError {
message: String::from("Can't write_batch with byte array values"),
})?;
debug!("Wrote {} rows of byte data", n);
}
FixedLenByteArrayColumnWriter(_) => {
unreachable!("ParquetWriter does not support FIXED_LEN_BYTE_ARRAY data");
}
};
debug!("Closing column writer for {}", column_number);
row_group_writer
.close_column(col_writer)
.context(ParquetLibraryError {
message: String::from("Can't close column writer"),
})?;
column_number += 1;
}
self.file_writer
.close_row_group(row_group_writer)
.context(ParquetLibraryError {
message: String::from("Can't close row group writer"),
})?;
Ok(())
}
/// Closes this writer, and finalizes the underlying parquet file
fn close(&mut self) -> Result<(), TableError> {
self.file_writer.close().context(ParquetLibraryError {
message: String::from("Can't close file writer"),
})?;
Ok(())
}
}
impl<W> fmt::Debug for IOxParquetTableWriter<W>
where
W: Write + Seek + TryClone,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("IOxParquetTableWriter")
.field("parquet_schema", &self.parquet_schema)
.field("file_writer", &"SerializedFileWriter")
.finish()
}
}
// Converts from line protocol `Schema` to the equivalent parquet schema `Type`.
fn convert_to_parquet_schema(schema: &Schema) -> Result<Arc<parquet::schema::types::Type>, Error> {
let mut parquet_columns = Vec::new();
for (i, (influxdb_column_type, field)) in schema.iter().enumerate() {
debug!(
"Determining parquet schema for column[{}] {:?} -> {:?}",
i, influxdb_column_type, field
);
let (physical_type, logical_type) = match influxdb_column_type {
Some(InfluxColumnType::Tag) => (
PhysicalType::BYTE_ARRAY,
Some(LogicalType::STRING(Default::default())),
),
Some(InfluxColumnType::Field(InfluxFieldType::Boolean)) => {
(PhysicalType::BOOLEAN, None)
}
Some(InfluxColumnType::Field(InfluxFieldType::Float)) => (PhysicalType::DOUBLE, None),
Some(InfluxColumnType::Field(InfluxFieldType::Integer)) => (PhysicalType::INT64, None),
Some(InfluxColumnType::Field(InfluxFieldType::UInteger)) => (
PhysicalType::INT64,
Some(LogicalType::INTEGER(IntType {
bit_width: 64,
is_signed: false,
})),
),
Some(InfluxColumnType::Field(InfluxFieldType::String)) => (
PhysicalType::BYTE_ARRAY,
Some(LogicalType::STRING(Default::default())),
),
Some(InfluxColumnType::Timestamp) => {
(
PhysicalType::INT64,
Some(LogicalType::TIMESTAMP(TimestampType {
unit: TimeUnit::NANOS(Default::default()),
// Indicates that the timestamp is stored as UTC values
is_adjusted_to_u_t_c: true,
})),
)
}
None => {
return UnsupportedDataType {
data_type: format!("Arrow type: {:?}", field.data_type()),
}
.fail();
}
};
// All fields are optional
let parquet_column_builder = Type::primitive_type_builder(field.name(), physical_type)
.with_repetition(Repetition::OPTIONAL)
.with_logical_type(logical_type);
let parquet_column_type = parquet_column_builder
.build()
.context(ParquetLibraryError {
message: String::from("Can't create parquet column type"),
})?;
debug!(
"Using parquet type {} for column {:?}",
parquet_schema_as_string(&parquet_column_type),
field.name()
);
parquet_columns.push(Arc::new(parquet_column_type));
}
let measurement = schema.measurement().unwrap();
let parquet_schema = Type::group_type_builder(measurement)
.with_fields(&mut parquet_columns)
.build()
.context(ParquetLibraryError {
message: String::from("Can't create top level parquet schema"),
})?;
Ok(Arc::new(parquet_schema))
}
fn set_integer_encoding(
influxdb_column_type: InfluxColumnType,
compression_level: CompressionLevel,
col_path: ColumnPath,
builder: WriterPropertiesBuilder,
) -> WriterPropertiesBuilder {
match compression_level {
CompressionLevel::Maximum => {
debug!(
"Setting encoding of {:?} col {} to DELTA_BINARY_PACKED (Maximum)",
influxdb_column_type, col_path
);
builder
.set_column_encoding(col_path.clone(), Encoding::DELTA_BINARY_PACKED)
.set_column_dictionary_enabled(col_path, false)
}
CompressionLevel::Compatibility => {
debug!(
"Setting encoding of {:?} col {} to PLAIN/RLE (Compatibility)",
influxdb_column_type, col_path
);
builder
.set_column_encoding(col_path.clone(), Encoding::PLAIN)
.set_column_dictionary_enabled(col_path, true)
}
}
}
/// Create the parquet writer properties (which defines the encoding
/// and compression for each column) for a given schema.
fn create_writer_props(
schema: &Schema,
compression_level: CompressionLevel,
) -> Arc<WriterProperties> {
let mut builder = WriterProperties::builder();
// TODO: Maybe tweak more of these settings for maximum performance.
// start off with GZIP for maximum compression ratio (at expense of CPU
// performance...)
builder = builder.set_compression(Compression::GZIP);
// Setup encoding as defined in
// https://github.com/influxdata/influxdb_iox/blob/alamb/encoding_thoughts/docs/encoding_thoughts.md
//
// Note: the property writer builder's default is to encode
// everything with dictionary encoding, and it turns out that
// dictionary encoding overrides all other encodings. Thus, we
// must explicitly disable dictionary encoding when another
// encoding is desired.
for (i, (influxdb_column_type, field)) in schema.iter().enumerate() {
let column_name = field.name().clone();
let col_path: ColumnPath = column_name.into();
match influxdb_column_type {
Some(InfluxColumnType::Field(InfluxFieldType::Boolean)) => {
debug!(
"Setting encoding of {:?} col {} to RLE",
influxdb_column_type, i
);
builder = builder
.set_column_encoding(col_path.clone(), Encoding::RLE)
.set_column_dictionary_enabled(col_path, false);
}
Some(InfluxColumnType::Field(InfluxFieldType::Integer)) => {
builder = set_integer_encoding(
influxdb_column_type.unwrap(),
compression_level,
col_path,
builder,
)
}
Some(InfluxColumnType::Field(InfluxFieldType::UInteger)) => {
builder = set_integer_encoding(
influxdb_column_type.unwrap(),
compression_level,
col_path,
builder,
)
}
Some(InfluxColumnType::Field(InfluxFieldType::Float)) => {
debug!(
"Setting encoding of {:?} col {} to PLAIN",
influxdb_column_type, col_path
);
builder = builder
.set_column_encoding(col_path.clone(), Encoding::PLAIN)
.set_column_dictionary_enabled(col_path, false);
}
Some(InfluxColumnType::Field(InfluxFieldType::String)) => {
debug!(
"Setting encoding of non-tag val {:?} col {} to DELTA_LENGTH_BYTE_ARRAY",
influxdb_column_type, col_path
);
builder = builder
.set_column_encoding(col_path.clone(), Encoding::DELTA_LENGTH_BYTE_ARRAY)
.set_column_dictionary_enabled(col_path, false);
}
// tag values are often very much repeated
Some(InfluxColumnType::Tag) => {
debug!(
"Setting encoding of tag val {:?} col {} to dictionary",
influxdb_column_type, col_path
);
builder = builder.set_column_dictionary_enabled(col_path, true);
}
Some(InfluxColumnType::Timestamp) => {
builder = set_integer_encoding(
influxdb_column_type.unwrap(),
compression_level,
col_path,
builder,
)
}
None => {
warn!(
"Using default parquet encoding for column {} which has no LP annotations",
field.name()
)
}
};
}
// Even though the 'set_statistics_enabled()' method is called here, the
// resulting parquet file does not appear to have statistics enabled.
//
// This is due to the fact that the underlying rust parquet
// library does not support statistics generation at this time.
let props = builder
.set_statistics_enabled(true)
.set_created_by("InfluxDB IOx".to_string())
.build();
Arc::new(props)
}
#[cfg(test)]
mod tests {
use internal_types::schema::builder::SchemaBuilder;
use super::*;
// Collapses multiple spaces into a single space, and removes trailing
// whitespace
fn normalize_spaces(s: &str) -> String {
// previous non space, if any
let mut prev: Option<char> = None;
let no_double_spaces: String = s
.chars()
.filter(|c| {
if let Some(prev_c) = prev {
if prev_c == ' ' && *c == ' ' {
return false;
}
}
prev = Some(*c);
true
})
.collect();
no_double_spaces
.trim_end_matches(|c| c == ' ' || c == '\n')
.to_string()
}
#[test]
fn test_convert_to_parquet_schema() {
let schema = SchemaBuilder::new()
.measurement("measurement_name")
.tag("tag1")
.influx_field("string_field", InfluxFieldType::String)
.influx_field("float_field", InfluxFieldType::Float)
.influx_field("int_field", InfluxFieldType::Integer)
.influx_field("uint_field", InfluxFieldType::UInteger)
.influx_field("bool_field", InfluxFieldType::Boolean)
.timestamp()
.build()
.unwrap();
let parquet_schema = convert_to_parquet_schema(&schema).expect("conversion successful");
let parquet_schema_string = normalize_spaces(&parquet_schema_as_string(&parquet_schema));
let expected_schema_string = normalize_spaces(
r#"message measurement_name {
OPTIONAL BYTE_ARRAY tag1 (STRING);
OPTIONAL BYTE_ARRAY string_field (STRING);
OPTIONAL DOUBLE float_field;
OPTIONAL INT64 int_field;
OPTIONAL INT64 uint_field (INTEGER(64,false));
OPTIONAL BOOLEAN bool_field;
OPTIONAL INT64 time (TIMESTAMP(NANOS,true));
}"#,
);
assert_eq!(parquet_schema_string, expected_schema_string);
}
fn make_test_schema() -> Schema {
SchemaBuilder::new()
.measurement("measurement_name")
.tag("tag1")
.influx_field("string_field", InfluxFieldType::String)
.influx_field("float_field", InfluxFieldType::Float)
.influx_field("int_field", InfluxFieldType::Integer)
.influx_field("bool_field", InfluxFieldType::Boolean)
.timestamp()
.build()
.unwrap()
}
#[test]
fn test_create_writer_props_maximum() {
do_test_create_writer_props(CompressionLevel::Maximum);
}
#[test]
fn test_create_writer_props_compatibility() {
do_test_create_writer_props(CompressionLevel::Compatibility);
}
fn do_test_create_writer_props(compression_level: CompressionLevel) {
let schema = make_test_schema();
let writer_props = create_writer_props(&schema, compression_level);
let tag1_colpath = ColumnPath::from("tag1");
assert_eq!(writer_props.encoding(&tag1_colpath), None);
assert_eq!(writer_props.compression(&tag1_colpath), Compression::GZIP);
assert_eq!(writer_props.dictionary_enabled(&tag1_colpath), true);
assert_eq!(writer_props.statistics_enabled(&tag1_colpath), true);
let string_field_colpath = ColumnPath::from("string_field");
assert_eq!(
writer_props.encoding(&string_field_colpath),
Some(Encoding::DELTA_LENGTH_BYTE_ARRAY)
);
assert_eq!(
writer_props.compression(&string_field_colpath),
Compression::GZIP
);
assert_eq!(
writer_props.dictionary_enabled(&string_field_colpath),
false
);
assert_eq!(writer_props.statistics_enabled(&string_field_colpath), true);
let float_field_colpath = ColumnPath::from("float_field");
assert_eq!(
writer_props.encoding(&float_field_colpath),
Some(Encoding::PLAIN)
);
assert_eq!(
writer_props.compression(&float_field_colpath),
Compression::GZIP
);
assert_eq!(writer_props.dictionary_enabled(&float_field_colpath), false);
assert_eq!(writer_props.statistics_enabled(&float_field_colpath), true);
let int_field_colpath = ColumnPath::from("int_field");
match compression_level {
CompressionLevel::Maximum => {
assert_eq!(
writer_props.encoding(&int_field_colpath),
Some(Encoding::DELTA_BINARY_PACKED)
);
assert_eq!(writer_props.dictionary_enabled(&int_field_colpath), false);
}
CompressionLevel::Compatibility => {
assert_eq!(
writer_props.encoding(&int_field_colpath),
Some(Encoding::PLAIN)
);
assert_eq!(writer_props.dictionary_enabled(&int_field_colpath), true);
}
}
assert_eq!(
writer_props.compression(&int_field_colpath),
Compression::GZIP
);
assert_eq!(writer_props.statistics_enabled(&int_field_colpath), true);
let bool_field_colpath = ColumnPath::from("bool_field");
assert_eq!(
writer_props.encoding(&bool_field_colpath),
Some(Encoding::RLE)
);
assert_eq!(
writer_props.compression(&bool_field_colpath),
Compression::GZIP
);
assert_eq!(writer_props.dictionary_enabled(&bool_field_colpath), false);
assert_eq!(writer_props.statistics_enabled(&bool_field_colpath), true);
let timestamp_field_colpath = ColumnPath::from("time");
match compression_level {
CompressionLevel::Maximum => {
assert_eq!(
writer_props.encoding(&timestamp_field_colpath),
Some(Encoding::DELTA_BINARY_PACKED)
);
assert_eq!(
writer_props.dictionary_enabled(&timestamp_field_colpath),
false
);
}
CompressionLevel::Compatibility => {
assert_eq!(
writer_props.encoding(&timestamp_field_colpath),
Some(Encoding::PLAIN)
);
assert_eq!(
writer_props.dictionary_enabled(&timestamp_field_colpath),
true
);
}
}
assert_eq!(
writer_props.compression(&timestamp_field_colpath),
Compression::GZIP
);
assert_eq!(
writer_props.statistics_enabled(&timestamp_field_colpath),
true
);
}
#[test]
fn compression_level() {
assert_eq!(
CompressionLevel::from_str("max").ok().unwrap(),
CompressionLevel::Maximum
);
assert_eq!(
CompressionLevel::from_str("compatibility").ok().unwrap(),
CompressionLevel::Compatibility
);
let bad = CompressionLevel::from_str("madxxxx");
assert!(bad.is_err());
}
}

View File

@ -1,97 +0,0 @@
use ingest::parquet::writer::{CompressionLevel, IOxParquetTableWriter};
use internal_types::schema::{builder::SchemaBuilder, InfluxFieldType};
use packers::{IOxTableWriter, Packer, Packers};
use parquet::data_type::ByteArray;
use std::fs;
#[test]
fn test_write_parquet_data() {
let schema = SchemaBuilder::new()
.measurement("measurement_name")
.tag("tag1")
.influx_field("string_field", InfluxFieldType::String)
.influx_field("float_field", InfluxFieldType::Float)
.influx_field("int_field", InfluxFieldType::Integer)
.influx_field("bool_field", InfluxFieldType::Boolean)
.timestamp()
.build()
.unwrap();
assert_eq!(schema.len(), 6);
let mut packers = vec![
Packers::Bytes(Packer::new()), // 0: tag1
Packers::Bytes(Packer::new()), // 1: string_field
Packers::Float(Packer::new()), // 2: float_field
Packers::Integer(Packer::new()), // 3: int_field
Packers::Boolean(Packer::new()), // 4: bool_field
Packers::Integer(Packer::new()), // 5: timestamp
];
// create this data:
// row 0: "tag1_val0", "str_val0", 1.0, 100, true, 900000000000
// row 1: null, null , null, null, null, null
// row 2: "tag1_val2", "str_val2", 2.0, 200, false, 9100000000000
packers[0]
.bytes_packer_mut()
.push(ByteArray::from("tag1_val0"));
packers[1]
.bytes_packer_mut()
.push(ByteArray::from("str_val0"));
packers[2].f64_packer_mut().push(1.0);
packers[3].i64_packer_mut().push(100);
packers[4].bool_packer_mut().push(true);
packers[5].i64_packer_mut().push(900000000000);
packers[0].push_none();
packers[1].push_none();
packers[2].push_none();
packers[3].push_none();
packers[4].push_none();
packers[5].push_none();
packers[0]
.bytes_packer_mut()
.push(ByteArray::from("tag1_val2"));
packers[1]
.bytes_packer_mut()
.push(ByteArray::from("str_val2"));
packers[2].f64_packer_mut().push(2.0);
packers[3].i64_packer_mut().push(200);
packers[4].bool_packer_mut().push(true);
packers[5].i64_packer_mut().push(910000000000);
// write the data out to the parquet file
let output_path = test_helpers::tempfile::Builder::new()
.prefix("iox_parquet_e2e")
.suffix(".parquet")
.tempfile()
.expect("error creating temp file")
.into_temp_path();
let output_file = fs::File::create(&output_path).expect("can't open temp file for writing");
let mut parquet_writer =
IOxParquetTableWriter::new(&schema, CompressionLevel::Compatibility, output_file)
.expect("can't create parquet writer");
parquet_writer
.write_batch(&packers)
.expect("can't write batch");
parquet_writer.close().expect("can't close writer");
// verify the file has some non zero content
let file_meta = fs::metadata(&output_path).expect("Can't get file metadata");
assert!(file_meta.is_file());
assert!(file_meta.len() > 0, "Length was {}", file_meta.len());
// TODO: implement a parquet reader to read the file back in
// then read the data back in and verify it came out successfully
let output_path_string: String = output_path.to_string_lossy().to_string();
output_path.close().expect("error deleting the temp file");
assert!(
fs::metadata(&output_path_string).is_err(),
"temp file was not cleaned up {:?}",
&output_path_string
);
}

View File

@ -1,5 +1,4 @@
use observability_deps::tracing::warn;
use snafu::{OptionExt, ResultExt, Snafu};
use snafu::{ResultExt, Snafu};
use std::{
collections::{HashMap, HashSet},
convert::TryInto,
@ -12,19 +11,6 @@ use super::{InfluxColumnType, InfluxFieldType, Schema, TIME_COLUMN_NAME};
/// Database schema creation / validation errors.
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("No measurement provided",))]
NoMeasurement {},
#[snafu(display(
"Multiple measurement names not supported. Old measurement '{}', new measurement '{}'",
old_measurement,
new_measurement
))]
MultipleMeasurementNames {
old_measurement: String,
new_measurement: String,
},
#[snafu(display("Error validating schema: {}", source))]
ValidatingSchema { source: super::Error },
@ -207,107 +193,6 @@ impl SchemaBuilder {
}
}
/// Specialized Schema Builder for use building up am InfluxDB data
/// model Schema while streaming line protocol through it.
///
/// Duplicated tag and field definitions are ignored, and the
/// resulting schema always has puts tags as the initial columns (in the
/// order of appearance) followed by fields (in order of appearance)
/// and then timestamp
#[derive(Debug, Default)]
pub struct InfluxSchemaBuilder {
/// What tag names we have seen so far
tag_set: HashSet<String>,
/// What field names we have seen so far
field_set: HashMap<String, InfluxFieldType>,
/// Keep track of the tag_columns in order they were added
tag_list: Vec<String>,
/// Track the fields in order they were added
field_list: Vec<String>,
/// Keep The measurement name, if seen
measurement: Option<String>,
}
impl InfluxSchemaBuilder {
pub fn new() -> Self {
Self::default()
}
/// Set optional measurement name, erroring if previously specified
pub fn saw_measurement(mut self, measurement: impl Into<String>) -> Result<Self> {
let new_measurement = measurement.into();
if let Some(old_measurement) = &self.measurement {
if old_measurement != &new_measurement {
return MultipleMeasurementNames {
old_measurement,
new_measurement,
}
.fail();
}
} else {
self.measurement = Some(new_measurement)
}
Ok(self)
}
/// Add a new tag column to this schema, ignoring if the tag has already
/// been seen
pub fn saw_tag(mut self, column_name: &str) -> Self {
if !self.tag_set.contains(column_name) {
self.tag_set.insert(column_name.to_string());
self.tag_list.push(column_name.to_string());
};
self
}
/// Add a new field column with the specified InfluxDB data model
/// type, ignoring if that field has been seen TODO error if the
/// field is a different type (old implementation produces warn!
/// in this condition)
pub fn saw_influx_field(
mut self,
column_name: &str,
influxdb_field_type: InfluxFieldType,
) -> Self {
if let Some(existing_influxdb_field_type) = self.field_set.get(column_name) {
if &influxdb_field_type != existing_influxdb_field_type {
warn!("Ignoring new type for field '{}': Previously it had type {:?}, attempted to set type {:?}.",
column_name, existing_influxdb_field_type, influxdb_field_type);
}
} else {
self.field_set
.insert(column_name.to_string(), influxdb_field_type);
self.field_list.push(column_name.to_string())
}
self
}
/// Build a schema object from the collected schema
pub fn build(self) -> Result<Schema> {
let builder =
SchemaBuilder::new().measurement(self.measurement.as_ref().context(NoMeasurement)?);
// tags always first
let builder = self
.tag_list
.iter()
.fold(builder, |builder, tag_name| builder.tag(tag_name));
// then fields (in order they were added)
let builder = self.field_list.iter().fold(builder, |builder, field_name| {
let influxdb_field_type = self.field_set.get(field_name).unwrap();
builder.influx_field(field_name, *influxdb_field_type)
});
// and now timestamp
builder.timestamp().build()
}
}
/// Schema Merger
///
/// The usecase for merging schemas is when different chunks have
@ -522,68 +407,6 @@ mod test {
assert_eq!(res.unwrap_err().to_string(), "Error validating schema: Duplicate column name: 'time' was specified to be Tag as well as timestamp");
}
#[test]
fn test_lp_builder_basic() {
let s = InfluxSchemaBuilder::new()
.saw_influx_field("the_field", Float)
.saw_tag("the_tag")
.saw_tag("the_tag")
.saw_influx_field("the_field", Float)
.saw_measurement("the_measurement")
.unwrap()
.saw_tag("the_tag")
.saw_tag("the_second_tag")
.saw_measurement("the_measurement")
.unwrap()
.build()
.unwrap();
assert_column_eq!(s, 0, Tag, "the_tag");
assert_column_eq!(s, 1, Tag, "the_second_tag");
assert_column_eq!(s, 2, Field(Float), "the_field");
assert_column_eq!(s, 3, Timestamp, "time");
assert_eq!(s.measurement().unwrap(), "the_measurement");
assert_eq!(s.len(), 4);
}
#[test]
fn test_lp_builder_no_measurement() {
let res = InfluxSchemaBuilder::new()
.saw_tag("the_tag")
.saw_influx_field("the_field", Float)
.build();
assert_eq!(res.unwrap_err().to_string(), "No measurement provided");
}
#[test]
fn test_lp_builder_different_measurement() {
let res = InfluxSchemaBuilder::new()
.saw_measurement("m1")
.unwrap()
.saw_measurement("m2");
assert_eq!(res.unwrap_err().to_string(), "Multiple measurement names not supported. Old measurement \'m1\', new measurement \'m2\'");
}
#[test]
fn test_lp_changed_field_type() {
let s = InfluxSchemaBuilder::new()
.saw_measurement("the_measurement")
.unwrap()
.saw_influx_field("the_field", Float)
// same field name seen again as a different type
.saw_influx_field("the_field", Integer)
.build()
.unwrap();
assert_column_eq!(s, 0, Field(Float), "the_field");
assert_column_eq!(s, 1, Timestamp, "time");
assert_eq!(s.len(), 2);
}
#[test]
fn test_merge_schema_empty() {
let merged_schema_error = SchemaMerger::new().build().unwrap_err();

View File

@ -1,319 +0,0 @@
use influxdb_line_protocol::parse_lines;
use ingest::{
parquet::writer::{CompressionLevel, Error as ParquetWriterError, IOxParquetTableWriter},
ConversionSettings, Error as IngestError, LineProtocolConverter, TsmFileConverter,
};
use internal_types::schema::Schema;
use observability_deps::tracing::{debug, info, warn};
use packers::{Error as TableError, IOxTableWriter, IOxTableWriterSource};
use snafu::{OptionExt, ResultExt, Snafu};
use std::{
convert::TryInto,
fs,
fs::File,
io::{BufReader, Read},
path::{Path, PathBuf},
};
use crate::commands::input::{FileType, InputReader};
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Error reading {} ({})", name.display(), source))]
UnableToReadInput {
name: PathBuf,
source: std::io::Error,
},
#[snafu(display(
"Cannot write multiple measurements to a single file. Saw new measurement named {}",
new_measurement_name
))]
MultipleMeasurementsToSingleFile { new_measurement_name: String },
#[snafu(display("Internal error: measurement name not specified in schema",))]
InternalMeasurementNotSpecified {},
#[snafu(display("Error creating a parquet table writer {}", source))]
UnableToCreateParquetTableWriter { source: ParquetWriterError },
#[snafu(display("Conversion from Parquet format is not implemented"))]
ParquetNotImplemented,
#[snafu(display("Error writing remaining lines {}", source))]
UnableToWriteGoodLines { source: IngestError },
#[snafu(display("Error opening input {}", source))]
OpenInput { source: super::input::Error },
#[snafu(display("Error while closing the table writer {}", source))]
UnableToCloseTableWriter { source: IngestError },
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
impl From<Error> for TableError {
fn from(source: Error) -> Self {
Self::from_other(source)
}
}
/// Creates `IOxParquetTableWriter` suitable for writing to a single file
#[derive(Debug)]
struct ParquetFileWriterSource {
output_filename: String,
compression_level: CompressionLevel,
// This creator only supports a single filename at this time
// so track if it has alread been made, for errors
made_file: bool,
}
impl IOxTableWriterSource for ParquetFileWriterSource {
// Returns a `IOxTableWriter suitable for writing data from packers.
fn next_writer(&mut self, schema: &Schema) -> Result<Box<dyn IOxTableWriter>, TableError> {
let measurement = schema
.measurement()
.cloned()
.context(InternalMeasurementNotSpecified)?;
if self.made_file {
return MultipleMeasurementsToSingleFile {
new_measurement_name: measurement,
}
.fail()?;
}
let output_file = fs::File::create(&self.output_filename).map_err(|e| {
TableError::from_io(
e,
format!("Error creating output file {}", self.output_filename),
)
})?;
info!(
"Writing output for measurement {} to {} ...",
measurement, self.output_filename
);
let writer = IOxParquetTableWriter::new(schema, self.compression_level, output_file)
.context(UnableToCreateParquetTableWriter)?;
self.made_file = true;
Ok(Box::new(writer))
}
}
/// Creates `IOxParquetTableWriter` for each measurement by
/// writing each to a separate file (measurement1.parquet,
/// measurement2.parquet, etc)
#[derive(Debug)]
struct ParquetDirectoryWriterSource {
compression_level: CompressionLevel,
output_dir_path: PathBuf,
}
impl IOxTableWriterSource for ParquetDirectoryWriterSource {
/// Returns a `IOxTableWriter` suitable for writing data from packers.
/// named in the template of <measurement.parquet>
fn next_writer(&mut self, schema: &Schema) -> Result<Box<dyn IOxTableWriter>, TableError> {
let mut output_file_path: PathBuf = self.output_dir_path.clone();
let measurement = schema
.measurement()
.context(InternalMeasurementNotSpecified)?;
output_file_path.push(measurement);
output_file_path.set_extension("parquet");
let output_file = fs::File::create(&output_file_path).map_err(|e| {
TableError::from_io(
e,
format!("Error creating output file {:?}", output_file_path),
)
})?;
info!(
"Writing output for measurement {} to {:?} ...",
measurement, output_file_path
);
let writer = IOxParquetTableWriter::new(schema, self.compression_level, output_file)
.context(UnableToCreateParquetTableWriter)
.map_err(TableError::from_other)?;
Ok(Box::new(writer))
}
}
pub fn is_directory(p: impl AsRef<Path>) -> bool {
fs::metadata(p)
.map(|metadata| metadata.is_dir())
.unwrap_or(false)
}
pub fn convert(
input_path: &str,
output_path: &str,
compression_level: CompressionLevel,
) -> Result<()> {
info!("convert starting");
debug!("Reading from input path {}", input_path);
if is_directory(input_path) {
let mut files: Vec<_> = fs::read_dir(input_path)
.unwrap()
.filter_map(Result::ok)
.filter(|filename| filename.path().extension().map_or(false, |x| x == "tsm"))
.collect();
if files.is_empty() {
warn!("No TSM files found");
return Ok(());
}
// Sort files by their TSM generation to ensure any duplicate block
// data is appropriately de-duplicated.
files.sort_by_key(|a| a.file_name());
let mut index_readers = Vec::with_capacity(files.len());
let mut block_readers = Vec::with_capacity(files.len());
for file in &files {
let index_handle = File::open(file.path()).unwrap();
let index_size = index_handle.metadata().unwrap().len();
let block_handle = File::open(file.path()).unwrap();
index_readers.push((BufReader::new(index_handle), index_size as usize));
block_readers.push(BufReader::new(block_handle));
}
// setup writing
let writer_source: Box<dyn IOxTableWriterSource> = if is_directory(&output_path) {
info!("Writing to output directory {:?}", output_path);
Box::new(ParquetDirectoryWriterSource {
compression_level,
output_dir_path: PathBuf::from(output_path),
})
} else {
info!("Writing to output file {}", output_path);
Box::new(ParquetFileWriterSource {
compression_level,
output_filename: String::from(output_path),
made_file: false,
})
};
let mut converter = TsmFileConverter::new(writer_source);
return converter
.convert(index_readers, block_readers)
.context(UnableToCloseTableWriter);
}
let input_reader = InputReader::new(input_path).context(OpenInput)?;
info!(
"Preparing to convert {} bytes from {}",
input_reader.len(),
input_path
);
match input_reader.file_type() {
FileType::LineProtocol => convert_line_protocol_to_parquet(
input_path,
input_reader,
compression_level,
output_path,
),
FileType::Tsm => {
// TODO(edd): we can remove this when I figure out the best way to share
// the reader between the TSM index reader and the Block decoder.
let input_block_reader = InputReader::new(input_path).context(OpenInput)?;
let len = input_reader.len() as usize;
convert_tsm_to_parquet(
input_reader,
len,
compression_level,
input_block_reader,
output_path,
)
}
FileType::Parquet => ParquetNotImplemented.fail(),
}
}
fn convert_line_protocol_to_parquet(
input_filename: &str,
mut input_reader: InputReader,
compression_level: CompressionLevel,
output_name: &str,
) -> Result<()> {
// TODO: make a streaming parser that you can stream data through in blocks.
// for now, just read the whole input at once into a string
let mut buf = String::with_capacity(
input_reader
.len()
.try_into()
.expect("Cannot allocate buffer"),
);
input_reader
.read_to_string(&mut buf)
.context(UnableToReadInput {
name: input_filename,
})?;
// FIXME: Design something sensible to do with lines that don't
// parse rather than just dropping them on the floor
let only_good_lines = parse_lines(&buf).filter_map(|r| match r {
Ok(line) => Some(line),
Err(e) => {
warn!("Ignorning line with parse error: {}", e);
None
}
});
let writer_source: Box<dyn IOxTableWriterSource> = if is_directory(&output_name) {
info!("Writing to output directory {:?}", output_name);
Box::new(ParquetDirectoryWriterSource {
compression_level,
output_dir_path: PathBuf::from(output_name),
})
} else {
info!("Writing to output file {}", output_name);
Box::new(ParquetFileWriterSource {
output_filename: String::from(output_name),
compression_level,
made_file: false,
})
};
let settings = ConversionSettings::default();
let mut converter = LineProtocolConverter::new(settings, writer_source);
converter
.convert(only_good_lines)
.context(UnableToWriteGoodLines)?;
converter.finalize().context(UnableToCloseTableWriter)?;
info!("Completing writing to {} successfully", output_name);
Ok(())
}
fn convert_tsm_to_parquet(
index_stream: InputReader,
index_stream_size: usize,
compression_level: CompressionLevel,
block_stream: InputReader,
output_name: &str,
) -> Result<()> {
// setup writing
let writer_source: Box<dyn IOxTableWriterSource> = if is_directory(&output_name) {
info!("Writing to output directory {:?}", output_name);
Box::new(ParquetDirectoryWriterSource {
compression_level,
output_dir_path: PathBuf::from(output_name),
})
} else {
info!("Writing to output file {}", output_name);
Box::new(ParquetFileWriterSource {
compression_level,
output_filename: String::from(output_name),
made_file: false,
})
};
let mut converter = TsmFileConverter::new(writer_source);
converter
.convert(vec![(index_stream, index_stream_size)], vec![block_stream])
.context(UnableToCloseTableWriter)
}

View File

@ -1,356 +0,0 @@
use std::{
borrow::Cow,
collections::VecDeque,
fs,
fs::File,
io,
io::{BufReader, Read, Seek, SeekFrom},
path::{Path, PathBuf},
};
use snafu::{ResultExt, Snafu};
/// Module to handle input files (and maybe urls?)
use packers::Name;
use parquet::{
self,
file::{
reader::ChunkReader,
serialized_reader::{FileSource, SliceableCursor},
writer::TryClone,
},
};
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Error opening {} ({})", input_name.display(), source))]
UnableToOpenInput {
input_name: PathBuf,
source: io::Error,
},
#[snafu(display("Error reading directory {} ({})", input_name.display(), source))]
UnableToReadDirectory {
input_name: PathBuf,
source: io::Error,
},
#[snafu(display("Error calculating the size of {} ({})", input_name.display(), source))]
UnableToCalculateSize {
input_name: PathBuf,
source: io::Error,
},
#[snafu(display("Unknown input type: {} has an unknown input extension before .gz", input_name.display()))]
UnknownInputTypeGzip { input_name: PathBuf },
#[snafu(display("Unknown input type: {} has an unknown input extension", input_name.display()))]
UnknownInputType { input_name: PathBuf },
#[snafu(display("Can't read GZip data: {}", input_name.display()))]
ReadingGzip {
input_name: PathBuf,
source: std::io::Error,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug, Clone, Copy)]
pub enum FileType {
LineProtocol,
Tsm,
Parquet,
}
/// Represents an input path and can produce InputReaders for each
/// file seen while recursively traversing the path
pub struct InputPath {
// All files left to traverse. Elements of files were actual files
// when this InputPath was constructed.
files: Vec<PathBuf>,
}
/// Interface for interacting with streams
#[derive(Debug)]
pub enum InputReader {
FileInputType(FileInputReader),
MemoryInputType(MemoryInputReader),
}
/// A (file backed) reader to read raw uncompressed bytes
#[derive(Debug)]
pub struct FileInputReader {
file_type: FileType,
file_size: u64,
path: PathBuf,
reader: BufReader<std::fs::File>,
}
/// An in-memory reader
#[derive(Debug)]
pub struct MemoryInputReader {
file_type: FileType,
file_size: u64,
path: PathBuf,
cursor: SliceableCursor,
}
impl FileInputReader {
fn new(file_type: FileType, input_name: &str) -> Result<Self> {
let path = PathBuf::from(input_name);
let file = File::open(&path).context(UnableToOpenInput { input_name })?;
let file_size = file
.metadata()
.context(UnableToCalculateSize { input_name })?
.len();
Ok(Self {
file_type,
file_size,
path,
reader: BufReader::new(file),
})
}
}
impl MemoryInputReader {
fn new(file_type: FileType, path: PathBuf, buffer: Vec<u8>) -> Self {
let len = buffer.len();
Self {
file_type,
file_size: len as u64,
path,
cursor: SliceableCursor::new(buffer),
}
}
}
impl Seek for InputReader {
fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
match self {
Self::FileInputType(file_input_reader) => file_input_reader.reader.seek(pos),
Self::MemoryInputType(memory_input_reader) => memory_input_reader.cursor.seek(pos),
}
}
}
impl Read for InputReader {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
match self {
Self::FileInputType(file_input_reader) => file_input_reader.reader.read(buf),
Self::MemoryInputType(memory_input_reader) => memory_input_reader.cursor.read(buf),
}
}
}
impl parquet::file::reader::Length for InputReader {
fn len(&self) -> u64 {
match self {
Self::FileInputType(file_input_reader) => file_input_reader.file_size,
Self::MemoryInputType(memory_input_reader) => memory_input_reader.file_size,
}
}
}
impl ChunkReader for InputReader {
type T = InputSlice;
fn get_read(&self, start: u64, length: usize) -> parquet::errors::Result<Self::T> {
match self {
Self::FileInputType(file_input_reader) => Ok(InputSlice::FileSlice(FileSource::new(
file_input_reader.reader.get_ref(),
start,
length,
))),
Self::MemoryInputType(memory_input_reader) => Ok(InputSlice::Memory(
memory_input_reader.cursor.get_read(start, length)?,
)),
}
}
}
impl TryClone for InputReader {
fn try_clone(&self) -> std::result::Result<Self, std::io::Error> {
Err(io::Error::new(
io::ErrorKind::Other,
"TryClone for input reader not supported",
))
}
}
impl Name for InputReader {
/// Returns a user understandable identifier of this thing
fn name(&self) -> Cow<'_, str> {
self.path().to_string_lossy()
}
}
impl InputReader {
pub fn file_type(&self) -> &FileType {
match self {
Self::FileInputType(file_input_reader) => &file_input_reader.file_type,
Self::MemoryInputType(memory_input_reader) => &memory_input_reader.file_type,
}
}
pub fn len(&self) -> u64 {
match self {
Self::FileInputType(file_input_reader) => file_input_reader.file_size,
Self::MemoryInputType(memory_input_reader) => memory_input_reader.file_size,
}
}
pub fn path(&self) -> &Path {
match self {
Self::FileInputType(file_input_reader) => &file_input_reader.path,
Self::MemoryInputType(memory_input_reader) => &memory_input_reader.path,
}
}
// Create a new input reader suitable for reading from
// `input_name` and figures out the file input type based on
// heuristics (ahem, the filename extension)
pub fn new(input_name: &str) -> Result<Self> {
let path = Path::new(input_name);
// Initially simply use the file name's extension to determine
// the type; Maybe in the future we can be more clever and
// inspect contents.
let ext = path.extension().and_then(|p| p.to_str());
match ext {
Some("tsm") => Ok(Self::FileInputType(FileInputReader::new(
FileType::Tsm,
input_name,
)?)),
Some("lp") => Ok(Self::FileInputType(FileInputReader::new(
FileType::LineProtocol,
input_name,
)?)),
Some("parquet") => Ok(Self::FileInputType(FileInputReader::new(
FileType::Parquet,
input_name,
)?)),
Some("gz") => {
let buffer = || {
let file = File::open(input_name).context(UnableToOpenInput { input_name })?;
let mut decoder = flate2::read::GzDecoder::new(file);
let mut buffer = Vec::new();
decoder
.read_to_end(&mut buffer)
.context(ReadingGzip { input_name })?;
Ok(buffer)
};
let path = PathBuf::from(input_name);
let stem = Path::new(path.file_stem().unwrap());
let stem_ext = stem.extension().and_then(|p| p.to_str());
match stem_ext {
Some("tsm") => Ok(Self::MemoryInputType(MemoryInputReader::new(
FileType::Tsm,
path,
buffer()?,
))),
Some("lp") => Ok(Self::MemoryInputType(MemoryInputReader::new(
FileType::LineProtocol,
path,
buffer()?,
))),
Some("parquet") => Ok(Self::MemoryInputType(MemoryInputReader::new(
FileType::Parquet,
path,
buffer()?,
))),
_ => UnknownInputTypeGzip { input_name }.fail(),
}
}
_ => UnknownInputType { input_name }.fail(),
}
}
}
impl InputPath {
// Create a new InputPath with a snapshot of all the files in the
// directory tree rooted at root_path that pass predicate P
pub fn new<P>(root_path: impl Into<PathBuf>, mut pred: P) -> Result<Self>
where
P: FnMut(&Path) -> bool,
{
struct PathAndType {
path: PathBuf,
file_type: fs::FileType,
}
let mut paths = VecDeque::new();
let root_path = root_path.into();
let root_meta = fs::metadata(&root_path).context(UnableToOpenInput {
input_name: root_path.clone(),
})?;
paths.push_back(PathAndType {
path: root_path,
file_type: root_meta.file_type(),
});
let mut files = Vec::with_capacity(100);
while let Some(PathAndType { path, file_type }) = paths.pop_front() {
if file_type.is_dir() {
let dir = fs::read_dir(&path).context(UnableToReadDirectory {
input_name: path.clone(),
})?;
for entry in dir {
let entry = entry.context(UnableToReadDirectory {
input_name: path.clone(),
})?;
let file_type = entry.file_type().context(UnableToOpenInput {
input_name: entry.path(),
})?;
paths.push_back(PathAndType {
path: entry.path(),
file_type,
});
}
} else if file_type.is_file() {
if pred(&path) {
files.push(path)
}
} else {
unimplemented!(
"Unknown file type {:?} while recursing {:?}",
file_type,
path
);
}
}
// sort filenames in order to ensure repeatability between runs
files.sort();
Ok(Self { files })
}
pub fn input_readers(&self) -> impl Iterator<Item = Result<InputReader>> + '_ {
self.files
.iter()
.rev()
.map(|p| InputReader::new(&p.to_string_lossy()))
}
}
pub enum InputSlice {
FileSlice(FileSource<File>),
Memory(SliceableCursor),
}
impl Read for InputSlice {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
match self {
Self::FileSlice(src) => src.read(buf),
Self::Memory(src) => src.read(buf),
}
}
}

View File

@ -1,165 +0,0 @@
use influxdb_tsm::{reader::IndexEntry, reader::TsmIndexReader, InfluxId, TsmError};
use ingest::parquet::metadata::print_parquet_metadata;
use observability_deps::tracing::{debug, info};
use snafu::{ResultExt, Snafu};
use std::{
collections::{BTreeMap, BTreeSet},
convert::TryInto,
};
use crate::commands::input::{FileType, InputReader};
pub type Result<T, E = Error> = std::result::Result<T, E>;
pub fn dump_meta(input_filename: &str) -> Result<()> {
info!("meta starting");
debug!("Reading from input file {}", input_filename);
let input_reader = InputReader::new(input_filename).context(OpenInput)?;
match input_reader.file_type() {
FileType::LineProtocol => LineProtocolNotImplemented.fail(),
FileType::Tsm => {
let len = input_reader
.len()
.try_into()
.expect("File size more than usize");
let reader = TsmIndexReader::try_new(input_reader, len).context(CreateTsm)?;
let mut stats_builder = TsmMetadataBuilder::new();
for entry in reader {
let entry = entry.context(UnableToReadTsmEntry)?;
stats_builder.process_entry(entry)?;
}
stats_builder.print_report();
Ok(())
}
FileType::Parquet => {
print_parquet_metadata(input_reader).context(UnableDumpToParquetMetadata)
}
}
}
#[derive(Debug, Default)]
struct MeasurementMetadata {
/// tag name --> list of seen tag values
tags: BTreeMap<String, BTreeSet<String>>,
/// List of field names seen
fields: BTreeSet<String>,
}
impl MeasurementMetadata {
fn update_for_entry(&mut self, index_entry: &mut IndexEntry) -> Result<()> {
let key = index_entry.parse_key().context(UnableToParseTsmKey)?;
for (tag_name, tag_value) in key.tagset {
let tag_entry = self.tags.entry(tag_name).or_default();
tag_entry.insert(tag_value);
}
self.fields.insert(key.field_key);
Ok(())
}
fn print_report(&self, prefix: &str) {
for (tag_name, tag_values) in &self.tags {
println!("{} tag {} = {:?}", prefix, tag_name, tag_values);
}
for field_name in &self.fields {
println!("{} field {}", prefix, field_name);
}
}
}
/// Represents stats for a single bucket
#[derive(Debug, Default)]
struct BucketMetadata {
/// How many index entries have been seen
count: u64,
// Total 'records' (aka sum of the lengths of all timeseries)
total_records: u64,
// measurement ->
measurements: BTreeMap<String, MeasurementMetadata>,
}
impl BucketMetadata {
fn update_for_entry(&mut self, index_entry: &mut IndexEntry) -> Result<()> {
self.count += 1;
self.total_records += u64::from(index_entry.count);
let key = index_entry.parse_key().context(UnableToParseTsmKey)?;
let meta = self.measurements.entry(key.measurement).or_default();
meta.update_for_entry(index_entry)?;
Ok(())
}
fn print_report(&self, prefix: &str) {
for (measurement, meta) in &self.measurements {
println!("{}{}", prefix, measurement);
let indent = format!("{} ", prefix);
meta.print_report(&indent);
}
}
}
#[derive(Debug, Default)]
struct TsmMetadataBuilder {
num_entries: u32,
// (org_id, bucket_id) --> Bucket Metadata
bucket_stats: BTreeMap<(InfluxId, InfluxId), BucketMetadata>,
}
impl TsmMetadataBuilder {
fn new() -> Self {
Self::default()
}
fn process_entry(&mut self, mut index_entry: IndexEntry) -> Result<()> {
self.num_entries += 1;
let key = (index_entry.org_id(), index_entry.bucket_id());
let stats = self.bucket_stats.entry(key).or_default();
stats.update_for_entry(&mut index_entry)?;
Ok(())
}
fn print_report(&self) {
println!("TSM Metadata Report:");
println!(" Valid Index Entries: {}", self.num_entries);
println!(" Organizations/Bucket Stats:");
for (k, stats) in &self.bucket_stats {
let (org_id, bucket_id) = k;
println!(
" ({}, {}) {} index entries, {} total records",
org_id, bucket_id, stats.count, stats.total_records
);
println!(" Measurements:");
stats.print_report(" ");
}
}
}
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Error opening input {}", source))]
OpenInput { source: super::input::Error },
#[snafu(display("Line protocol metadata dump is not implemented"))]
LineProtocolNotImplemented,
#[snafu(display("Unable to dump parquet file metadata: {}", source))]
UnableDumpToParquetMetadata {
source: ingest::parquet::error::IOxParquetError,
},
#[snafu(display(r#"Unable to create TSM reader: {}"#, source))]
CreateTsm { source: TsmError },
#[snafu(display(r#"Unable to parse TSM key: {}"#, source))]
UnableToParseTsmKey { source: TsmError },
#[snafu(display(r#"Unable to read TSM entry: {}"#, source))]
UnableToReadTsmEntry { source: TsmError },
}

View File

@ -1,116 +0,0 @@
//! This module contains code to report compression statistics for storage files
use observability_deps::tracing::info;
use snafu::{ResultExt, Snafu};
use structopt::StructOpt;
use ingest::parquet::{error::IOxParquetError, stats as parquet_stats};
use packers::{
stats::{FileSetStatsBuilder, FileStats},
Name,
};
use crate::commands::input::{FileType, InputPath, InputReader};
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Not implemented: {}", operation_name))]
NotImplemented { operation_name: String },
#[snafu(display("Error opening input {}", source))]
OpenInput { source: super::input::Error },
#[snafu(display("Unable to dump parquet file metadata: {}", source))]
UnableDumpToParquetMetadata { source: IOxParquetError },
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Print out storage statistics information to stdout.
///
/// If a directory is specified, checks all files recursively
#[derive(Debug, StructOpt)]
pub struct Config {
/// The input filename or directory to read from
input: String,
/// Include detailed information per column
#[structopt(long)]
per_column: bool,
/// Include detailed information per file
#[structopt(long)]
per_file: bool,
}
/// Print statistics about all the files rooted at input_path
pub async fn stats(config: &Config) -> Result<()> {
info!("stats starting for {:?}", config);
let input_path = InputPath::new(&config.input, |p| {
p.extension() == Some(std::ffi::OsStr::new("parquet"))
})
.context(OpenInput)?;
println!("Storage statistics:");
let mut builder = FileSetStatsBuilder::default();
for input_reader in input_path.input_readers() {
let input_reader = input_reader.context(OpenInput)?;
let file_stats = stats_for_file(config, input_reader).await?;
if config.per_file {
println!("{}", file_stats);
}
builder = builder.accumulate(&file_stats);
}
let overall_stats = builder.build();
if config.per_column {
println!("-------------------------------");
println!("Overall Per Column Stats");
println!("-------------------------------");
for c in &overall_stats.col_stats {
println!("{}", c);
println!();
}
}
println!("{}", overall_stats);
Ok(())
}
/// Print statistics about the file name in input_filename to stdout
pub async fn stats_for_file(config: &Config, input_reader: InputReader) -> Result<FileStats> {
let input_name = String::from(input_reader.name());
let file_stats = match input_reader.file_type() {
FileType::LineProtocol => {
return NotImplemented {
operation_name: "Line protocol storage statistics",
}
.fail()
}
FileType::Tsm => {
return NotImplemented {
operation_name: "TSM storage statistics",
}
.fail()
}
FileType::Parquet => {
parquet_stats::file_stats(input_reader).context(UnableDumpToParquetMetadata)?
}
};
if config.per_column && config.per_file {
println!("-------------------------------");
println!("Column Stats for {}", input_name);
println!("-------------------------------");
for c in &file_stats.col_stats {
println!("{}", c);
println!();
}
}
Ok(file_stats)
}

View File

@ -7,31 +7,24 @@
clippy::clone_on_ref_ptr
)]
use std::str::FromStr;
use dotenv::dotenv;
use structopt::StructOpt;
use tokio::runtime::Runtime;
use commands::tracing::{init_logs_and_tracing, init_simple_logs};
use ingest::parquet::writer::CompressionLevel;
use observability_deps::tracing::{debug, warn};
use observability_deps::tracing::warn;
use crate::commands::tracing::TracingGuard;
use observability_deps::tracing::dispatcher::SetGlobalDefaultError;
use tikv_jemallocator::Jemalloc;
mod commands {
pub mod convert;
pub mod database;
mod input;
pub mod meta;
pub mod operations;
pub mod run;
pub mod server;
pub mod server_remote;
pub mod sql;
pub mod stats;
pub mod tracing;
}
@ -125,30 +118,9 @@ struct Config {
#[derive(Debug, StructOpt)]
enum Command {
/// Convert one storage format to another
Convert {
/// The input files to read from
input: String,
/// The filename or directory to write the output
output: String,
/// How much to compress the output data. 'max' compresses the most;
/// 'compatibility' compresses in a manner more likely to be readable by
/// other tools.
#[structopt(
short, long, default_value = "compatibility",
possible_values = & ["max", "compatibility"])]
compression_level: String,
},
/// Print out metadata information about a storage file
Meta {
/// The input filename to read from
input: String,
},
Database(commands::database::Config),
// Clippy recommended boxing this variant because it's much larger than the others
Run(Box<commands::run::Config>),
Stats(commands::stats::Config),
Server(commands::server::Config),
Operation(commands::operations::Config),
Sql(commands::sql::Config),
@ -176,41 +148,6 @@ fn main() -> Result<(), std::io::Error> {
}
match config.command {
Command::Convert {
input,
output,
compression_level,
} => {
let _tracing_guard = handle_init_logs(init_simple_logs(log_verbose_count));
let compression_level = CompressionLevel::from_str(&compression_level).unwrap();
match commands::convert::convert(&input, &output, compression_level) {
Ok(()) => debug!("Conversion completed successfully"),
Err(e) => {
eprintln!("Conversion failed: {}", e);
std::process::exit(ReturnCode::Failure as _)
}
}
}
Command::Meta { input } => {
let _tracing_guard = handle_init_logs(init_simple_logs(log_verbose_count));
match commands::meta::dump_meta(&input) {
Ok(()) => debug!("Metadata dump completed successfully"),
Err(e) => {
eprintln!("Metadata dump failed: {}", e);
std::process::exit(ReturnCode::Failure as _)
}
}
}
Command::Stats(config) => {
let _tracing_guard = handle_init_logs(init_simple_logs(log_verbose_count));
match commands::stats::stats(&config).await {
Ok(()) => debug!("Storage statistics dump completed successfully"),
Err(e) => {
eprintln!("Stats dump failed: {}", e);
std::process::exit(ReturnCode::Failure as _)
}
}
}
Command::Database(config) => {
let _tracing_guard = handle_init_logs(init_simple_logs(log_verbose_count));
if let Err(e) = commands::database::command(host, config).await {

View File

@ -1,396 +0,0 @@
use std::fs;
use std::fs::File;
use std::io::{BufReader, Read};
use std::path::Path;
use assert_cmd::assert::Assert;
use assert_cmd::Command;
use predicates::prelude::*;
/// Validates that p is a valid parquet file
fn validate_parquet_file(p: &Path) {
// Verify file extension is parquet
let file_extension = p
.extension()
.map_or(String::from(""), |ext| ext.to_string_lossy().to_string());
assert_eq!(file_extension, "parquet");
// TODO: verify we can decode the contents of the parquet file and
// it is as expected. For now, just use a check against the magic
// `PAR1` bytes all parquet files start with.
let mut reader = BufReader::new(File::open(p).expect("Error reading file"));
let mut first_four = [0; 4];
reader
.read_exact(&mut first_four)
.expect("Error reading first four bytes");
assert_eq!(&first_four, b"PAR1");
}
#[test]
fn convert_bad_input_filename() {
let mut cmd = Command::cargo_bin("influxdb_iox").unwrap();
let assert = cmd
.arg("-v")
.arg("convert")
.arg("non_existent_input.lp")
.arg("non_existent_output")
.assert();
assert
.failure()
.code(1)
.stderr(predicate::str::contains(
"Error opening non_existent_input.lp",
))
.stderr(predicate::str::contains("No such file or directory"));
}
#[test]
fn convert_bad_compression_level() {
let mut cmd = Command::cargo_bin("influxdb_iox").unwrap();
let assert = cmd
.arg("-v")
.arg("convert")
.arg("--compression-level")
.arg("maxxx")
.arg("tests/fixtures/lineproto/temperature.lp")
.arg("/tmp")
.assert();
assert.failure().code(1).stderr(predicate::str::contains(
"error: 'maxxx' isn't a valid value for '--compression-level <compression-level>",
));
}
#[test]
fn convert_line_protocol_good_input_filename() {
let mut cmd = Command::cargo_bin("influxdb_iox").unwrap();
let parquet_path = test_helpers::tempfile::Builder::new()
.prefix("convert_e2e")
.suffix(".parquet")
.tempfile()
.expect("error creating temp file")
.into_temp_path();
let parquet_filename_string = parquet_path.to_string_lossy().to_string();
let assert = cmd
.arg("-v")
.arg("convert")
.arg("--compression-level")
.arg("compatibility")
.arg("tests/fixtures/lineproto/temperature.lp")
.arg(&parquet_filename_string)
.assert();
let expected_success_string = format!(
"Completing writing to {} successfully",
parquet_filename_string
);
assert
.success()
.stdout(predicate::str::contains("convert starting"))
.stdout(predicate::str::contains(
"Writing output for measurement h2o_temperature",
))
.stdout(predicate::str::contains(expected_success_string));
validate_parquet_file(&parquet_path);
}
#[test]
fn convert_tsm_good_input_filename() {
//
// TODO: this needs to work for a temp directory...
//
// let mut cmd = Command::cargo_bin("influxdb_iox").unwrap();
// let tmp_dir = test_helpers::tmp_dir();
// let parquet_path = tmp_dir.unwrap().into_path().to_str().unwrap();
// // ::Builder::new()
// // .prefix("dstool_e2e_tsm")
// // .suffix(".parquet")
// // .tempfile()
// // .expect("error creating temp file")
// // .into_temp_path();
// // let parquet_filename_string =
// parquet_path.to_string_lossy().to_string();
// let assert = cmd
// .arg("convert")
// .arg("tests/fixtures/cpu_usage.tsm")
// .arg(&parquet_path)
// .assert();
// // TODO this should succeed when TSM -> parquet conversion is
// implemented. // assert
// // .failure()
// // .code(1)
// // .stderr(predicate::str::contains("Conversion failed"))
// // .stderr(predicate::str::contains(
// // "Not implemented: TSM Conversion not supported yet",
// // ));
// // TODO add better success expectations
// // let expected_success_string = format!(
// // "Completing writing to {} successfully",
// // parquet_filename_string
// // );
// // assert
// // .success()
// // .stderr(predicate::str::contains("dstool convert starting"))
// // .stderr(predicate::str::contains(
// // "Writing output for measurement h2o_temperature",
// // ))
// // .stderr(predicate::str::contains(expected_success_string));
// // validate_parquet_file(&parquet_path);
}
#[test]
fn convert_multiple_measurements() {
let mut cmd = Command::cargo_bin("influxdb_iox").unwrap();
// Create a directory
let parquet_output_path = test_helpers::tempfile::Builder::new()
.prefix("convert_multiple_e2e")
.tempdir()
.expect("error creating temp directory");
let parquet_output_dir_path = parquet_output_path.path().to_string_lossy().to_string();
let assert = cmd
.arg("-v")
.arg("convert")
.arg("tests/fixtures/lineproto/air_and_water.lp")
.arg(&parquet_output_dir_path)
.assert();
let expected_success_string = format!(
"Completing writing to {} successfully",
parquet_output_dir_path
);
assert
.success()
.stdout(predicate::str::contains("convert starting"))
.stdout(predicate::str::contains("Writing to output directory"))
.stdout(predicate::str::contains(
"Writing output for measurement h2o_temperature",
))
.stdout(predicate::str::contains(
"Writing output for measurement air_temperature",
))
.stdout(predicate::str::contains(expected_success_string));
// check that the two files have been written successfully
let mut output_files: Vec<_> = fs::read_dir(parquet_output_path.path())
.expect("reading directory")
.map(|dir_ent| {
let dir_ent = dir_ent.expect("error reading dir entry");
validate_parquet_file(&dir_ent.path());
dir_ent.file_name().to_string_lossy().to_string()
})
.collect();
// Ensure the order is consistent before comparing them
output_files.sort();
assert_eq!(
output_files,
vec!["air_temperature.parquet", "h2o_temperature.parquet"]
);
}
#[test]
fn meta_bad_input_filename() {
let mut cmd = Command::cargo_bin("influxdb_iox").unwrap();
let assert = cmd.arg("meta").arg("non_existent_input").assert();
assert
.failure()
.code(1)
.stderr(predicate::str::contains("Metadata dump failed"))
.stderr(predicate::str::contains(
"Metadata dump failed: Error opening input Unknown input type: non_existent_input has an unknown input extension",
));
}
#[test]
fn meta_non_existent_input_filename() {
let mut cmd = Command::cargo_bin("influxdb_iox").unwrap();
let assert = cmd.arg("meta").arg("non_existent_input.tsm").assert();
assert
.failure()
.code(1)
.stderr(predicate::str::contains("Metadata dump failed"))
.stderr(predicate::str::contains(
"Metadata dump failed: Error opening input Error opening non_existent_input.tsm",
));
}
#[test]
fn meta_bad_input_filename_gz() {
let mut cmd = Command::cargo_bin("influxdb_iox").unwrap();
let assert = cmd.arg("meta").arg("non_existent_input.gz").assert();
assert
.failure()
.code(1)
.stderr(predicate::str::contains("Metadata dump failed"))
.stderr(predicate::str::contains(
"Metadata dump failed: Error opening input Unknown input type: non_existent_input.gz has an unknown input extension before .gz",
));
}
// gunzip's the contents of the file at input_path into a temporary path
fn uncompress_gz(input_path: &str, output_extension: &str) -> test_helpers::tempfile::TempPath {
let gz_file = File::open(input_path).expect("Error opening input");
let output_path = test_helpers::tempfile::Builder::new()
.prefix("decompressed_e2e")
.suffix(output_extension)
.tempfile()
.expect("error creating temp file")
.into_temp_path();
let mut output_file = File::create(&output_path).expect("error opening output");
let mut decoder = flate2::read::GzDecoder::new(gz_file);
std::io::copy(&mut decoder, &mut output_file).expect("error copying stream");
output_path
}
/// Validates some of the metadata output content for this tsm file
fn assert_meta_000000000000005_000000002_tsm(assert: Assert) {
assert
.success()
.stdout(predicate::str::contains("TSM Metadata Report"))
.stdout(predicate::str::contains(
"(05c19117091a1000, 05c19117091a1001) 2159 index entries, 2159 total records",
))
.stdout(predicate::str::contains(
"task_scheduler_total_schedule_fails",
));
}
#[test]
fn meta_000000000000005_000000002_tsm() {
let input_tsm = uncompress_gz("tests/fixtures/000000000000005-000000002.tsm.gz", ".tsm");
let mut cmd = Command::cargo_bin("influxdb_iox").unwrap();
let assert = cmd
.arg("meta")
.arg(input_tsm.to_string_lossy().to_string())
.assert();
assert_meta_000000000000005_000000002_tsm(assert);
}
#[test]
fn meta_000000000000005_000000002_tsm_gz() {
let mut cmd = Command::cargo_bin("influxdb_iox").unwrap();
let assert = cmd
.arg("meta")
.arg("tests/fixtures/000000000000005-000000002.tsm.gz")
.assert();
assert_meta_000000000000005_000000002_tsm(assert);
}
/// Validates some of the metadata output content for this tsm file
fn assert_meta_cpu_usage_tsm(assert: Assert) {
assert
.success()
.stdout(predicate::str::contains("TSM Metadata Report"))
.stdout(predicate::str::contains(
"(05b4927b3fe38000, 05b4927b3fe38001) 2735 index entries, 2735 total records",
))
.stdout(predicate::str::contains(
"task_scheduler_total_schedule_fails",
));
}
#[test]
fn meta_cpu_usage_tsm() {
let input_tsm = uncompress_gz("tests/fixtures/cpu_usage.tsm.gz", ".tsm");
let mut cmd = Command::cargo_bin("influxdb_iox").unwrap();
let assert = cmd
.arg("meta")
.arg(input_tsm.to_string_lossy().to_string())
.assert();
assert_meta_cpu_usage_tsm(assert);
}
#[test]
fn meta_cpu_usage_tsm_gz() {
let mut cmd = Command::cargo_bin("influxdb_iox").unwrap();
let assert = cmd
.arg("meta")
.arg("tests/fixtures/cpu_usage.tsm.gz")
.assert();
assert_meta_cpu_usage_tsm(assert);
}
/// Validates some of the metadata output content for this tsm file
fn assert_meta_temperature_parquet(assert: Assert) {
assert
.success()
.stdout(predicate::str::contains("Parquet file metadata:"))
.stdout(predicate::str::contains(r#"created by: "Delorean""#))
.stdout(predicate::str::contains(
r#"Column Chunk [3]:
file_offset: 595
column_type: DOUBLE
column_path: bottom_degrees
num_values: 6
encodings: [PLAIN, RLE_DICTIONARY, RLE]
compression: GZIP
compressed_size: 125
uncompressed_size: 90
data_page_offset: 547
has_index_page: false
has_dictionary_page: true
dictionary_page_offset: 470
NO STATISTICS"#,
));
}
#[test]
fn meta_temperature_parquet() {
let mut cmd = Command::cargo_bin("influxdb_iox").unwrap();
let assert = cmd
.arg("meta")
.arg("tests/fixtures/parquet/temperature.parquet")
.assert();
assert_meta_temperature_parquet(assert);
}
#[test]
fn stats_temperature_parquet() {
let mut cmd = Command::cargo_bin("influxdb_iox").unwrap();
let assert = cmd
.arg("stats")
.arg("--per-column")
.arg("--per-file")
.arg("tests/fixtures/parquet/temperature.parquet")
.assert();
assert
.success()
.stdout(predicate::str::contains("Storage statistics:"))
.stdout(predicate::str::contains(
r#"Column Stats 'state' [1]
Total rows: 6.00 (6), DataType: Utf8, Compression: {"Enc: Dictionary, Comp: GZIP"}
Compressed/Uncompressed Bytes : 90.00 / 52.00 ( 90 / 52) 120.0000 bits per row
"#))
.stdout(predicate::str::contains(
"temperature.parquet: total columns ( 5), rows: 6.00 ( 6), size: 1.13 k ( 1128), bits per row: 1504.0000"
));
}