From 5749a2c119b566d189647584ed4c43d202e2cdbb Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> Date: Mon, 7 Jun 2021 13:59:33 +0100 Subject: [PATCH] chore: cleanup legacy TSM -> parquet code (#1639) * chore: cleanup legacy parquet code * chore: remove tests of removed functionality Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- Cargo.lock | 18 - Cargo.toml | 6 - benches/line_protocol_to_parquet.rs | 88 -- ingest/Cargo.toml | 20 - ingest/src/lib.rs | 2168 -------------------------- ingest/src/parquet.rs | 12 - ingest/src/parquet/error.rs | 16 - ingest/src/parquet/metadata.rs | 110 -- ingest/src/parquet/stats.rs | 131 -- ingest/src/parquet/writer.rs | 709 --------- ingest/tests/read_write.rs | 97 -- internal_types/src/schema/builder.rs | 179 +-- src/commands/convert.rs | 319 ---- src/commands/input.rs | 356 ----- src/commands/meta.rs | 165 -- src/commands/stats.rs | 116 -- src/main.rs | 65 +- tests/commands.rs | 396 ----- 18 files changed, 2 insertions(+), 4969 deletions(-) delete mode 100644 benches/line_protocol_to_parquet.rs delete mode 100644 ingest/Cargo.toml delete mode 100644 ingest/src/lib.rs delete mode 100644 ingest/src/parquet.rs delete mode 100644 ingest/src/parquet/error.rs delete mode 100644 ingest/src/parquet/metadata.rs delete mode 100644 ingest/src/parquet/stats.rs delete mode 100644 ingest/src/parquet/writer.rs delete mode 100644 ingest/tests/read_write.rs delete mode 100644 src/commands/convert.rs delete mode 100644 src/commands/input.rs delete mode 100644 src/commands/meta.rs delete mode 100644 src/commands/stats.rs delete mode 100644 tests/commands.rs diff --git a/Cargo.lock b/Cargo.lock index a03b78b6c0..34d48a6f60 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1672,7 +1672,6 @@ dependencies = [ "influxdb_iox_client", "influxdb_line_protocol", "influxdb_tsm", - "ingest", "internal_types", "itertools 0.9.0", "logfmt", @@ -1763,23 +1762,6 @@ dependencies = [ "test_helpers", ] -[[package]] -name = "ingest" -version = "0.1.0" -dependencies = [ - "arrow", - "flate2", - "influxdb_line_protocol", - "influxdb_tsm", - "internal_types", - "observability_deps", - "packers", - "parking_lot", - "parquet", - "snafu", - "test_helpers", -] - [[package]] name = "instant" version = "0.1.9" diff --git a/Cargo.toml b/Cargo.toml index c5d0ac048c..30b1b28031 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,7 +19,6 @@ members = [ "influxdb_iox_client", "influxdb_line_protocol", "influxdb_tsm", - "ingest", "internal_types", "logfmt", "mem_qe", @@ -53,7 +52,6 @@ influxdb_iox_client = { path = "influxdb_iox_client", features = ["format"] } influxdb_line_protocol = { path = "influxdb_line_protocol" } influxdb_tsm = { path = "influxdb_tsm" } internal_types = { path = "internal_types" } -ingest = { path = "ingest" } logfmt = { path = "logfmt" } metrics = { path = "metrics" } mutable_buffer = { path = "mutable_buffer" } @@ -141,10 +139,6 @@ harness = false name = "mapper" harness = false -[[bench]] -name = "line_protocol_to_parquet" -harness = false - [[bench]] name = "packers" harness = false diff --git a/benches/line_protocol_to_parquet.rs b/benches/line_protocol_to_parquet.rs deleted file mode 100644 index 88e905dc4b..0000000000 --- a/benches/line_protocol_to_parquet.rs +++ /dev/null @@ -1,88 +0,0 @@ -use criterion::{criterion_group, criterion_main, Criterion, Throughput}; -use influxdb_line_protocol::parse_lines; -use ingest::{ - parquet::writer::{CompressionLevel, IOxParquetTableWriter}, - ConversionSettings, LineProtocolConverter, -}; -use internal_types::schema::Schema; -use packers::{Error as TableError, IOxTableWriter, IOxTableWriterSource}; -use std::time::Duration; - -use parquet::file::writer::TryClone; -use std::io::{Seek, SeekFrom, Write}; - -static LINES: &str = include_str!("../tests/fixtures/lineproto/metrics.lp"); - -/// Stream that throws away all output -struct IgnoringWriteStream {} - -impl Write for IgnoringWriteStream { - fn write(&mut self, buf: &[u8]) -> std::io::Result { - Ok(buf.len()) - } - fn flush(&mut self) -> std::io::Result<()> { - Ok(()) - } -} - -impl Seek for IgnoringWriteStream { - fn seek(&mut self, _pos: SeekFrom) -> std::io::Result { - Ok(0) - } -} - -impl TryClone for IgnoringWriteStream { - fn try_clone(&self) -> std::result::Result { - Ok(IgnoringWriteStream {}) - } -} - -#[derive(Debug)] -/// Creates parquet writers that write to /dev/null -struct IgnoringParquetDirectoryWriterSource {} - -impl IOxTableWriterSource for IgnoringParquetDirectoryWriterSource { - fn next_writer(&mut self, schema: &Schema) -> Result, TableError> { - let dev_null = IgnoringWriteStream {}; - let writer = IOxParquetTableWriter::new(schema, CompressionLevel::Compatibility, dev_null) - .expect("Creating table writer"); - Ok(Box::new(writer)) - } -} - -fn line_parser(c: &mut Criterion) { - let mut group = c.benchmark_group("line_protocol_to_parquet"); - - // Expand dataset by 10x to amortize writer setup and column writer overhead - const NUM_COPIES: usize = 10; - let mut input = String::with_capacity(NUM_COPIES * (LINES.len() + 1)); - for _ in 0..NUM_COPIES { - input.push_str(LINES); - input.push('\n'); - } - - group.throughput(Throughput::Bytes(input.len() as u64)); - group.measurement_time(Duration::from_secs(30)); - group.sample_size(20); - - group.bench_function("all lines", |b| { - b.iter(|| { - let settings = ConversionSettings::default(); - let only_good_lines = parse_lines(&input).map(|r| r.expect("Unexpected parse error")); - - let writer_source = Box::new(IgnoringParquetDirectoryWriterSource {}); - - let mut converter = LineProtocolConverter::new(settings, writer_source); - converter - .convert(only_good_lines) - .expect("Converting all lines"); - converter.finalize().expect("Finalizing writer"); - }) - }); - - group.finish(); -} - -criterion_group!(benches, line_parser); - -criterion_main!(benches); diff --git a/ingest/Cargo.toml b/ingest/Cargo.toml deleted file mode 100644 index 5753297369..0000000000 --- a/ingest/Cargo.toml +++ /dev/null @@ -1,20 +0,0 @@ -[package] -name = "ingest" -version = "0.1.0" -authors = ["Andrew Lamb "] -edition = "2018" - -[dependencies] # In alphabetical order -arrow = { version = "4.0", features = ["prettyprint"] } -influxdb_line_protocol = { path = "../influxdb_line_protocol" } -influxdb_tsm = { path = "../influxdb_tsm" } -internal_types = { path = "../internal_types" } -packers = { path = "../packers" } -parquet = "4.0" -snafu = "0.6.2" -observability_deps = { path = "../observability_deps" } - -[dev-dependencies] # In alphabetical order -flate2 = "1.0" -parking_lot = "0.11.1" -test_helpers = { path = "../test_helpers" } diff --git a/ingest/src/lib.rs b/ingest/src/lib.rs deleted file mode 100644 index d1d80e4ec4..0000000000 --- a/ingest/src/lib.rs +++ /dev/null @@ -1,2168 +0,0 @@ -//! Library with code for (aspirationally) ingesting various data -//! formats into InfluxDB IOx -//! -//! Currently supports converting LineProtocol -#![deny(broken_intra_doc_links, rust_2018_idioms)] -#![warn( - missing_copy_implementations, - missing_debug_implementations, - clippy::explicit_iter_loop, - clippy::use_self, - clippy::clone_on_ref_ptr -)] - -use influxdb_line_protocol::{FieldValue, ParsedLine}; -use influxdb_tsm::{ - mapper::{ColumnData, MeasurementTable, TsmMeasurementMapper}, - reader::{BlockDecoder, TsmBlockReader, TsmIndexReader}, - BlockType, TsmError, -}; -use internal_types::schema::{ - builder::InfluxSchemaBuilder, InfluxFieldType, Schema, TIME_COLUMN_NAME, -}; -use observability_deps::tracing::debug; -use packers::{ - ByteArray, Error as TableError, IOxTableWriter, IOxTableWriterSource, Packer, Packers, -}; -use snafu::{ensure, OptionExt, ResultExt, Snafu}; -use std::{ - collections::{BTreeMap, BTreeSet}, - io::{Read, Seek}, -}; - -pub mod parquet; - -#[derive(Debug, Clone, Copy)] -pub struct ConversionSettings { - /// How many `ParsedLine` structures to buffer before determining the schema - sample_size: usize, - // Buffer up tp this many ParsedLines per measurement before writing them - measurement_write_buffer_size: usize, -} - -impl ConversionSettings {} - -impl Default for ConversionSettings { - /// Reasonable default settings - fn default() -> Self { - Self { - sample_size: 5, - measurement_write_buffer_size: 8000, - } - } -} - -/// Converts `ParsedLines` into the packers internal columnar -/// data format and then passes that converted data to a -/// `IOxTableWriter` -pub struct LineProtocolConverter<'a> { - settings: ConversionSettings, - - // The converters for each measurement. - // Key: measurement_name - // - // NB Use owned strings as key so we can look up by str - converters: BTreeMap>, - - table_writer_source: Box, -} - -#[derive(Snafu, Debug)] -pub enum Error { - #[snafu(display(r#"Conversion needs at least one line of data"#))] - NeedsAtLeastOneLine, - - #[snafu(display(r#"Error building schema: {}"#, source))] - BuildingSchema { - source: internal_types::schema::builder::Error, - }, - - #[snafu(display(r#"Error writing to TableWriter: {}"#, source))] - Writing { source: TableError }, - - #[snafu(display(r#"Error creating TableWriter: {}"#, source))] - WriterCreation { source: TableError }, - - #[snafu(display(r#"Error processing TSM File: {}"#, source))] - TsmProcessing { source: TsmError }, - - // TODO clean this error up - #[snafu(display(r#"could not find ts column"#))] - CouldNotFindTsColumn, - - // TODO clean this error up - #[snafu(display(r#"could not find column"#))] - CouldNotFindColumn, -} - -impl From for Error { - fn from(source: internal_types::schema::builder::Error) -> Self { - Self::BuildingSchema { source } - } -} - -/// Handles buffering `ParsedLine` objects and deducing a schema from that -/// sample -#[derive(Debug)] -struct MeasurementSampler<'a> { - settings: ConversionSettings, - - /// The buffered lines to use as a sample - schema_sample: Vec>, -} - -/// Handles actually packing (copy/reformat) of ParsedLines and -/// writing them to a table writer. -struct MeasurementWriter<'a> { - settings: ConversionSettings, - - /// Schema which describes the lines being written - schema: Schema, - - /// The sink to which tables are being written - table_writer: Box, - - /// lines buffered - write_buffer: Vec>, -} - -/// Tracks the conversation state for each measurement: either in -/// "UnknownSchema" mode when the schema is still unknown or "KnownSchema" mode -/// once the schema is known. -#[derive(Debug)] -enum MeasurementConverter<'a> { - UnknownSchema(MeasurementSampler<'a>), - KnownSchema(MeasurementWriter<'a>), -} - -impl std::fmt::Debug for MeasurementWriter<'_> { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("MeasurementWriter") - .field("settings", &self.settings) - .field("schema", &self.schema) - .field("table_writer", &"DYNAMIC") - .field("write_buffer.size", &self.write_buffer.len()) - .finish() - } -} - -impl std::fmt::Debug for LineProtocolConverter<'_> { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("LineProtocolConverter") - .field("settings", &self.settings) - .field("converters", &self.converters) - .field("table_writer_source", &"DYNAMIC") - .finish() - } -} - -impl<'a> MeasurementConverter<'a> { - /// Changes from `MeasurementSampler` -> `MeasurementWriter` if - /// not yet in writing mode and the schema sample is full. The - /// conversion can be forced even if there are not enough samples - /// by specifing `force=true` (e.g. at the end of the input stream). - fn prepare_for_writing( - &mut self, - table_writer_source: &mut dyn IOxTableWriterSource, - force: bool, - ) -> Result<(), Error> { - match self { - MeasurementConverter::UnknownSchema(sampler) => { - if force || sampler.sample_full() { - debug!( - "Preparing for write, deducing schema (sample_full={}, force={})", - sampler.sample_full(), - force - ); - - let schema = sampler.deduce_schema_from_sample()?; - debug!("Deduced line protocol schema: {:#?}", schema); - let table_writer = table_writer_source - .next_writer(&schema) - .context(WriterCreation)?; - - let mut writer = MeasurementWriter::new(sampler.settings, schema, table_writer); - - debug!("Completed change to writing mode"); - for line in sampler.schema_sample.drain(..) { - writer.buffer_line(line)?; - } - *self = MeasurementConverter::KnownSchema(writer); - } else { - debug!("Schema sample not yet full, waiting for more lines"); - } - } - // Already in writing mode - MeasurementConverter::KnownSchema(_) => {} - }; - Ok(()) - } -} - -impl<'a> LineProtocolConverter<'a> { - /// Construct a converter. All converted data will be written to - /// the respective IOxTableWriter returned by - /// `table_writer_source`. - pub fn new( - settings: ConversionSettings, - table_writer_source: Box, - ) -> Self { - LineProtocolConverter { - settings, - table_writer_source, - converters: BTreeMap::new(), - } - } - - /// Converts `ParesdLine`s from any number of measurements and - /// writes them out to `IOxTableWriters`. Note that data is - /// internally buffered and may not be written until a call to - /// `finalize`. - pub fn convert( - &mut self, - lines: impl IntoIterator>, - ) -> Result<&mut Self, Error> { - for line in lines { - let series = &line.series; - - let series_measurement = series.measurement.as_str(); - - // do not use entry API to avoid copying the key unless it is not present - let mut converter = match self.converters.get_mut(series_measurement) { - Some(converter) => converter, - None => { - self.converters.insert( - series_measurement.into(), - MeasurementConverter::UnknownSchema(MeasurementSampler::new(self.settings)), - ); - self.converters.get_mut(series_measurement).unwrap() - } - }; - - // This currently dispatches row by row. It might help - // group `ParsedLines` by measurement first. - match &mut converter { - MeasurementConverter::UnknownSchema(sampler) => { - sampler.add_sample(line); - } - MeasurementConverter::KnownSchema(writer) => { - writer.buffer_line(line)?; - } - } - converter.prepare_for_writing(&mut self.table_writer_source, false)?; - } - Ok(self) - } - - /// Finalizes all work of this converter and calls `close()` on the - /// underlying writer. - pub fn finalize(&mut self) -> Result<&mut Self, Error> { - // If we haven't yet switched to writing mode, do so now - for converter in self.converters.values_mut() { - converter.prepare_for_writing(&mut self.table_writer_source, true)?; - - match converter { - MeasurementConverter::UnknownSchema(_) => { - unreachable!("Should be prepared for writing"); - } - MeasurementConverter::KnownSchema(writer) => writer.finalize()?, - } - } - Ok(self) - } -} - -impl<'a> MeasurementSampler<'a> { - fn new(settings: ConversionSettings) -> Self { - let schema_sample = Vec::with_capacity(settings.sample_size); - MeasurementSampler { - settings, - schema_sample, - } - } - - fn sample_full(&self) -> bool { - self.schema_sample.len() >= self.settings.sample_size - } - - fn add_sample(&mut self, line: ParsedLine<'a>) { - self.schema_sample.push(line); - } - - /// Use the contents of self.schema_sample to deduce the Schema of - /// `ParsedLine`s and return the deduced schema - fn deduce_schema_from_sample(&mut self) -> Result { - ensure!(!self.schema_sample.is_empty(), NeedsAtLeastOneLine); - - let mut builder = InfluxSchemaBuilder::new(); - - for line in &self.schema_sample { - let series = &line.series; - builder = builder.saw_measurement(&series.measurement)?; - - if let Some(tag_set) = &series.tag_set { - for (tag_name, _) in tag_set { - builder = builder.saw_tag(tag_name.as_str()); - } - } - for (field_name, field_value) in &line.field_set { - let field_type = match field_value { - FieldValue::F64(_) => InfluxFieldType::Float, - FieldValue::I64(_) => InfluxFieldType::Integer, - FieldValue::U64(_) => InfluxFieldType::UInteger, - FieldValue::String(_) => InfluxFieldType::String, - FieldValue::Boolean(_) => InfluxFieldType::Boolean, - }; - builder = builder.saw_influx_field(field_name.as_str(), field_type); - } - } - - builder.build().context(BuildingSchema) - } -} - -impl<'a> MeasurementWriter<'a> { - /// Create a new measurement writer which will buffer up to - /// the number of samples specified in settings before writing to the output - pub fn new( - settings: ConversionSettings, - schema: Schema, - table_writer: Box, - ) -> Self { - let write_buffer = Vec::with_capacity(settings.measurement_write_buffer_size); - - MeasurementWriter { - settings, - schema, - table_writer, - write_buffer, - } - } - - fn buffer_full(&self) -> bool { - self.write_buffer.len() >= self.settings.measurement_write_buffer_size - } - - /// Buffers a `ParsedLine`s (which are row-based) in preparation for column - /// packing and writing - pub fn buffer_line(&mut self, line: ParsedLine<'a>) -> Result<(), Error> { - if self.buffer_full() { - self.flush_buffer()?; - } - self.write_buffer.push(line); - Ok(()) - } - - /// Flushes all ParsedLines and writes them to the underlying - /// table writer in a single chunk - fn flush_buffer(&mut self) -> Result<(), Error> { - debug!("Flushing buffer {} rows", self.write_buffer.len()); - let packers = pack_lines(&self.schema, &self.write_buffer); - self.table_writer.write_batch(&packers).context(Writing)?; - self.write_buffer.clear(); - Ok(()) - } - - /// Finalizes all work of this converter and closes the underlying writer. - pub fn finalize(&mut self) -> Result<(), Error> { - self.flush_buffer()?; - self.table_writer.close().context(Writing) - } -} - -/// Keeps track of if we have written a value to a particular row -struct PackersForRow<'a> { - packer: &'a mut Packers, - wrote_value_for_row: bool, -} - -impl<'a> PackersForRow<'a> { - fn new(packer: &'a mut Packers) -> Self { - PackersForRow { - packer, - wrote_value_for_row: false, - } - } - /// Retrieve the packer and note that we have written to this packer - fn packer(&mut self) -> &mut Packers { - assert!( - !self.wrote_value_for_row, - "Should only write one value to each column per row" - ); - self.wrote_value_for_row = true; - self.packer - } - /// Finish writing a row and prepare for the next. If no value has - /// been written, write a NULL - fn finish_row(&mut self) { - if !self.wrote_value_for_row { - self.packer.push_none(); - } - self.wrote_value_for_row = false; - } -} - -/// Internal implementation: packs the `ParsedLine` structures for a -/// single measurement into a format suitable for writing -/// -/// # Panics -/// -/// The caller is responsible for ensuring that all `ParsedLines` come -/// from the same measurement. This function will panic if that is -/// not true. -/// -/// -/// TODO: improve performance by reusing the the Vec rather -/// than always making new ones -fn pack_lines<'a>(schema: &Schema, lines: &[ParsedLine<'a>]) -> Vec { - let mut packers: Vec<_> = schema - .iter() - .enumerate() - .map(|(idx, (influxdb_column_type, _))| { - debug!(" Column definition [{}] = {:?}", idx, influxdb_column_type); - - // Initialise a Packer for the matching data type wrapped in a - // Packers enum variant to allow it to live in a vector. - let mut packer = Packers::from(influxdb_column_type.unwrap()); - packer.reserve_exact(lines.len()); - packer - }) - .collect(); - - // map col_name -> PackerForRow; - // Use a String as a key (rather than &String) so we can look up via str - let mut packer_map: BTreeMap<_, _> = packers - .iter_mut() - .enumerate() - .map(|(i, packer)| (schema.field(i).1.name().clone(), PackersForRow::new(packer))) - .collect(); - - for line in lines { - let series = &line.series; - - assert_eq!( - Some(series.measurement.as_str()), - schema.measurement().map(|s| s.as_str()), - "Different measurements detected. Expected {:?} found {}", - schema.measurement(), - series.measurement - ); - - if let Some(tag_set) = &series.tag_set { - for (tag_name, tag_value) in tag_set { - if let Some(packer_for_row) = packer_map.get_mut(tag_name.as_str()) { - packer_for_row - .packer() - .bytes_packer_mut() - .push(ByteArray::from(tag_value.as_str())); - } else { - panic!( - "tag {} seen in input that has no matching column in schema", - tag_name - ) - } - } - } - - for (field_name, field_value) in &line.field_set { - if let Some(packer_for_row) = packer_map.get_mut(field_name.as_str()) { - let packer = packer_for_row.packer(); - match *field_value { - FieldValue::F64(f) => { - packer.f64_packer_mut().push(f); - } - FieldValue::I64(i) => { - packer.i64_packer_mut().push(i); - } - FieldValue::U64(i) => { - packer.u64_packer_mut().push(i); - } - FieldValue::String(ref s) => { - packer.bytes_packer_mut().push(ByteArray::from(s.as_str())); - } - FieldValue::Boolean(b) => { - packer.bool_packer_mut().push(b); - } - } - } else { - panic!( - "field {} seen in input that has no matching column in schema", - field_name - ) - } - } - - if let Some(packer_for_row) = packer_map.get_mut(TIME_COLUMN_NAME) { - // TODO(edd) why would line _not_ have a timestamp??? We should always have them - packer_for_row - .packer() - .i64_packer_mut() - .push_option(line.timestamp) - } else { - panic!("No {} field present in schema...", TIME_COLUMN_NAME); - } - - // Now, go over all packers and add missing values if needed - for packer_for_row in packer_map.values_mut() { - packer_for_row.finish_row(); - } - } - packers -} - -// use arrow::array; -// use arrow::datatypes; -// use arrow::ipc::writer; -// use arrow::record_batch; -// use std::fs::File; -// use std::sync::Arc; - -// fn arrow_datatype(datatype: DataType) -> datatypes::DataType { -// match datatype { -// DataType::Float => datatypes::DataType::Float64, -// DataType::Integer => datatypes::DataType::Int64, -// DataType::String => datatypes::DataType::Utf8, -// // DataType::String => datatypes::DataType::Dictionary( -// // std::boxed::Box::new(datatypes::DataType::Int16), -// // std::boxed::Box::new(datatypes::DataType::Utf8), -// // ), -// DataType::Boolean => datatypes::DataType::Boolean, -// DataType::Timestamp => datatypes::DataType::Int64, -// } -// } - -// fn write_arrow_file(parquet_schema: Schema, packers: Vec) -> -// Result<(), Error> { let file = -// File::create("/tmp/http_api_requests_total.arrow").unwrap(); - -// let mut record_batch_fields: Vec = vec![]; -// // no default() on Field... -// record_batch_fields.resize( -// parquet_schema.get_col_defs().len(), -// datatypes::Field::new("foo", datatypes::DataType::Int64, false), -// ); - -// for col_def in parquet_schema.get_col_defs() { -// let nullable = col_def.data_type != DataType::Timestamp; -// // if col_def.data_type == DataType::Timestamp { -// // nullable = false; -// // } else { -// // nullable = true; -// // } - -// record_batch_fields[col_def.index as usize] = datatypes::Field::new( -// col_def.name.as_str(), -// arrow_datatype(col_def.data_type), -// nullable, -// ); -// } -// println!("{:?}", record_batch_fields); -// println!("{:?}", parquet_schema.get_col_defs()); -// let schema = datatypes::Schema::new(record_batch_fields); - -// let mut writer = writer::StreamWriter::try_new(file, &schema).unwrap(); - -// // let num_rows = packers[0].num_rows(); -// let batch_size = 60_000; - -// let mut packer_chunkers: Vec> = vec![]; -// for packer in &packers { -// packer_chunkers.push(packer.chunk_values(batch_size)); -// } - -// loop { -// let mut chunked_packers: Vec = -// Vec::with_capacity(packers.len()); for chunker in &mut -// packer_chunkers { match chunker { -// PackerChunker::Float(c) => { -// if let Some(chunk) = c.next() { -// -// chunked_packers.push(Packers::Float(Packer::from(chunk))); -// } } -// PackerChunker::Integer(c) => { -// if let Some(chunk) = c.next() { -// -// chunked_packers.push(Packers::Integer(Packer::from(chunk))); -// } } -// PackerChunker::String(c) => { -// if let Some(chunk) = c.next() { -// -// chunked_packers.push(Packers::String(Packer::from(chunk))); -// } } -// PackerChunker::Boolean(c) => { -// if let Some(chunk) = c.next() { -// -// chunked_packers.push(Packers::Boolean(Packer::from(chunk))); -// } } -// } -// } - -// if chunked_packers.is_empty() { -// break; -// } - -// // let sort = [0, 7, 6, 12]; -// // let sort = [8, 4, 9, 0, 1, 7, 10, 6, 5, 2, 3, 12]; -// let sort = [3, 2, 5, 6, 10, 7, 1, 0, 9, 4, 8, 12]; -// packers::sorter::sort(&mut chunked_packers, &sort).unwrap(); - -// println!( -// "Writing {:?} packers with size: {:?}", -// chunked_packers.len(), -// chunked_packers[0].num_rows() -// ); -// write_arrow_batch(&mut writer, Arc::new(schema.clone()), -// chunked_packers); } - -// writer.finish().unwrap(); -// Ok(()) -// } - -// fn write_arrow_batch( -// w: &mut writer::StreamWriter, -// schema: Arc, -// packers: Vec, -// ) { -// let mut record_batch_arrays: Vec = vec![]; - -// for packer in packers { -// match packer { -// Packers::Float(p) => { -// -// record_batch_arrays.push(Arc::new(array::Float64Array::from(p.values(). -// to_vec()))); } -// Packers::Integer(p) => { -// -// record_batch_arrays.push(Arc::new(array::Int64Array::from(p.values(). -// to_vec()))); } -// Packers::String(p) => { -// let mut builder = array::StringBuilder::new(p.num_rows()); -// for v in p.values() { -// match v { -// Some(v) => { -// -// builder.append_value(v.as_utf8().unwrap()).unwrap(); -// } None => { -// builder.append_null().unwrap(); -// } -// } -// } -// let array = builder.finish(); -// record_batch_arrays.push(Arc::new(array)); -// } -// Packers::Boolean(p) => { -// let array = array::BooleanArray::from(p.values().to_vec()); -// record_batch_arrays.push(Arc::new(array)); -// } -// } -// } - -// let record_batch = record_batch::RecordBatch::try_new(schema, -// record_batch_arrays).unwrap(); w.write(&record_batch).unwrap(); -// } - -/// Converts one or more TSM files into the packers internal columnar -/// data format and then passes that converted data to a `IOxTableWriter`. -pub struct TsmFileConverter { - table_writer_source: Box, -} - -impl TsmFileConverter { - pub fn new(table_writer_source: Box) -> Self { - Self { - table_writer_source, - } - } - - /// Given one or more sets of readers, converts the underlying TSM data into - /// a set of Parquet files segmented by measurement name. - /// - /// It is the caller's responsibility to order the input readers such that - /// duplicate block data will be overwritten by later readers. - pub fn convert( - &mut self, - index_readers: Vec<(R, usize)>, - mut block_readers: Vec, - ) -> Result<(), Error> - where - R: Read + Seek, - { - if index_readers.is_empty() { - return Err(Error::TsmProcessing { - source: TsmError { - description: "at least one reader required".to_string(), - }, - }); - } else if index_readers.len() != block_readers.len() { - return Err(Error::TsmProcessing { - source: TsmError { - description: "different number of readers".to_string(), - }, - }); - } - - let mut dst = vec![None; index_readers.len()]; - let mut mappers = Vec::with_capacity(index_readers.len()); - - for (i, (reader, size)) in index_readers.into_iter().enumerate() { - let index_reader = TsmIndexReader::try_new(reader, size).context(TsmProcessing)?; - mappers.push(TsmMeasurementMapper::new(index_reader.peekable(), i)); - } - - // track all the block readers for each file, so that the correct reader - // can be used to decode each block - let mut block_reader = TsmBlockReader::new(block_readers.remove(0)); - for reader in block_readers.into_iter() { - block_reader.add_reader(reader); - } - - loop { - dst = Self::refill_input_tables(&mut mappers, dst)?; - let res = Self::merge_input_tables(dst)?; - let next_measurement = res.0; - dst = res.1; - - match next_measurement { - Some(mut table) => { - // convert (potentially merged) measurement.. - let (schema, packed_columns) = - Self::process_measurement_table(&mut block_reader, &mut table)?; - let mut table_writer = self - .table_writer_source - .next_writer(&schema) - .context(WriterCreation)?; - - table_writer - .write_batch(&packed_columns) - .context(WriterCreation)?; - table_writer.close().context(WriterCreation)?; - } - None => break, - } - } - Ok(()) - } - - // Given a set of input tables, identifies the next table (lexicographically) - // and then merges any identical tables into the first one. - // - // Returns the merged table and the remaining set of inputs to be - // subsequently processed. - fn merge_input_tables( - mut inputs: Vec>, - ) -> Result<(Option, Vec>), Error> { - // track each table's position in the input vector. If tables are chosen - // they will be moved out of the input vector later on. - let mut input_map: BTreeMap> = BTreeMap::new(); - for (i, table) in inputs.iter().enumerate() { - if let Some(table) = table { - let v = input_map.entry(table.name.clone()).or_default(); - v.push(i); - } - } - - // tables are now organised together by the table's measurement name and - // ordered lexicographically. The first item in the sorted map is the - // next measurement that needs to be merged and processed. - if let Some((_, table_indexes)) = input_map.into_iter().next() { - let mut iter = table_indexes.into_iter(); // merge each of these into first one - - // unwrap is safe because all hashmap entries have at least one table index. - let first_table_index = iter.next().unwrap(); - // unwrap is safe because the indexes in iter all point to non-none - // tables. - let mut first_table = inputs[first_table_index].take().unwrap(); - - // if there are multiple tables for this measurement merge them - // into the first one. - for idx in iter { - // TODO(edd): perf - could calculate a more efficient merge - // order. - let mut next_table = inputs[idx].take().unwrap(); - first_table.merge(&mut next_table).context(TsmProcessing)?; - } - return Ok((Some(first_table), inputs)); - } - - // no measurements to process; all inputs drained - Ok((None, inputs)) - } - - // Ensures that the destination vector has the next - // measurement table for each input iterator. - fn refill_input_tables( - inputs: &mut Vec>>, - mut dst: Vec>, - ) -> Result>, Error> { - for (input, dst) in inputs.iter_mut().zip(dst.iter_mut()) { - if dst.is_none() { - match input.next() { - Some(res) => { - let table = res.context(TsmProcessing)?; - *dst = Some(table); - } - None => continue, - } - } - } - Ok(dst) - } - - // Given a measurement table `process_measurement_table` produces an - // appropriate schema and set of Packers. - fn process_measurement_table( - mut block_reader: impl BlockDecoder, - m: &mut MeasurementTable, - ) -> Result<(Schema, Vec), Error> { - let mut builder = - internal_types::schema::builder::SchemaBuilder::new().measurement(&m.name); - let mut packed_columns: Vec = Vec::new(); - - let mut tks = Vec::new(); - for tag in m.tag_columns() { - builder = builder.tag(tag); - tks.push(tag.clone()); - packed_columns.push(Packers::Bytes(Packer::new())); - } - - let mut fks = Vec::new(); - for (field_key, block_type) in m.field_columns().to_owned() { - builder = builder.influx_field(&field_key, to_data_type(&block_type)); - fks.push((field_key.clone(), block_type)); - packed_columns.push(Packers::from(block_type)); - } - - // Account for timestamp - let builder = builder.timestamp(); - packed_columns.push(Packers::Integer(Packer::new())); - - let schema = builder.build()?; - - // get mapping between named columns and packer indexes. - let name_packer = schema - .iter() - .enumerate() - .map(|(idx, (_, arrow_field))| (arrow_field.name().clone(), idx)) - .collect::>(); - - // Process the measurement to build out a table. - // - // The processing function we supply to `process` does the following: - // - // - Append the timestamp column to the packer timestamp column - // - Materialise the same tag value for any tag key columns where the emitted - // section has a none-null value for that column. - // - Materialise NULL values for any tag key columns that we don't have data - // for in the emitted section. - // - Append the field columns to the packer field columns. The emitted section - // will already have fully materialised the data for these columns, including - // any NULL entries. - // - Materialise NULL values for any field columns that the emitted section - // does not have any data for. - // - m.process( - &mut block_reader, - |section: influxdb_tsm::mapper::TableSection| -> Result<(), TsmError> { - // number of rows in each column in this table section. - let col_len = section.len(); - - // if this is the first section of the table then we can avoid - // extending slices and just move the slice over to the packer - // vector. - // - // TODO(edd): will the compiler just figure this out for us w.r.t - // `extend_from_slice`?? - let first_table_section = section.is_first(); - - // Process the timestamp column. - let ts_idx = name_packer - .get(TIME_COLUMN_NAME) - .context(CouldNotFindTsColumn) - .map_err(|e| TsmError { - description: e.to_string(), - })?; - - if section.is_first() { - packed_columns[*ts_idx] = Packers::from(section.ts); - } else { - packed_columns[*ts_idx] - .i64_packer_mut() - .extend_from_slice(§ion.ts); - } - - // Process any tag columns that this section has values for. - // We have to materialise the values for the column, which are - // guaranteed to be the same. - for (tag_key, tag_value) in §ion.tag_cols { - let idx = name_packer - .get(tag_key) - .context(CouldNotFindColumn) - .map_err(|e| TsmError { - description: e.to_string(), - })?; - - // this will create a column of repeated values. - if first_table_section { - packed_columns[*idx] = Packers::from_elem_str(tag_value, col_len); - } else { - packed_columns[*idx] - .bytes_packer_mut() - .extend_from_slice(&vec![ByteArray::from(tag_value.as_ref()); col_len]); - } - } - - // Not all tag columns may be present in the section. For those - // that are not present we need to materialise NULL values for - // every row. - let tag_keys = section - .tag_cols - .iter() - .map(|pair| pair.0.clone()) - .collect::>(); - for key in &tks { - if tag_keys.contains(key) { - continue; - } - - let idx = name_packer - .get(key) - .context(CouldNotFindColumn) - .map_err(|e| TsmError { - description: e.to_string(), - })?; - - if first_table_section { - // creates a column of repeated None values. - let col: Vec>> = vec![None; col_len]; - packed_columns[*idx] = Packers::from(col); - } else { - // pad out column with None values because we don't have a - // value for it. - packed_columns[*idx] - .bytes_packer_mut() - .fill_with_null(col_len); - } - } - - // Next we will write out all of the field columns for this - // section. - let mut got_field_cols = Vec::new(); - for (field_key, field_values) in section.field_cols { - let idx = name_packer - .get(&field_key) - .context(CouldNotFindColumn) - .map_err(|e| TsmError { - description: e.to_string(), - })?; - - if first_table_section { - match field_values { - ColumnData::Float(v) => packed_columns[*idx] = Packers::from(v), - ColumnData::Integer(v) => packed_columns[*idx] = Packers::from(v), - ColumnData::Str(v) => packed_columns[*idx] = Packers::from(v), - ColumnData::Bool(v) => packed_columns[*idx] = Packers::from(v), - ColumnData::Unsigned(v) => packed_columns[*idx] = Packers::from(v), - } - } else { - match field_values { - ColumnData::Float(v) => packed_columns[*idx] - .f64_packer_mut() - .extend_from_option_slice(&v), - ColumnData::Integer(v) => packed_columns[*idx] - .i64_packer_mut() - .extend_from_option_slice(&v), - ColumnData::Str(values) => { - let col = packed_columns[*idx].bytes_packer_mut(); - for value in values { - match value { - Some(v) => col.push(ByteArray::from(v)), - None => col.push_option(None), - } - } - } - ColumnData::Bool(v) => packed_columns[*idx] - .bool_packer_mut() - .extend_from_option_slice(&v), - ColumnData::Unsigned(values) => { - let col = packed_columns[*idx].i64_packer_mut(); - for value in values { - match value { - Some(v) => col.push(v as i64), - None => col.push_option(None), - } - } - } - } - } - got_field_cols.push(field_key); - } - - // Finally, materialise NULL values for all of the field columns - // that this section does not have any values for - for (key, field_type) in &fks { - if got_field_cols.contains(key) { - continue; - } - - let idx = name_packer - .get(key) - .context(CouldNotFindColumn) - .map_err(|e| TsmError { - description: e.to_string(), - })?; - - // this will create a column of repeated None values. - if first_table_section { - match field_type { - BlockType::Float => { - let col: Vec> = vec![None; col_len]; - packed_columns[*idx] = Packers::from(col); - } - BlockType::Integer => { - let col: Vec> = vec![None; col_len]; - packed_columns[*idx] = Packers::from(col); - } - BlockType::Bool => { - let col: Vec> = vec![None; col_len]; - packed_columns[*idx] = Packers::from(col); - } - BlockType::Str => { - let col: Vec>> = vec![None; col_len]; - packed_columns[*idx] = Packers::from(col); - } - BlockType::Unsigned => { - let col: Vec> = vec![None; col_len]; - packed_columns[*idx] = Packers::from(col); - } - } - } else { - match field_type { - BlockType::Float => { - packed_columns[*idx] - .f64_packer_mut() - .fill_with_null(col_len); - } - BlockType::Integer => { - packed_columns[*idx] - .i64_packer_mut() - .fill_with_null(col_len); - } - BlockType::Bool => { - packed_columns[*idx] - .bool_packer_mut() - .fill_with_null(col_len); - } - BlockType::Str => { - packed_columns[*idx] - .bytes_packer_mut() - .fill_with_null(col_len); - } - BlockType::Unsigned => { - packed_columns[*idx] - .i64_packer_mut() - .fill_with_null(col_len); - } - } - } - } - Ok(()) - }, - ) - .context(TsmProcessing)?; - Ok((schema, packed_columns)) - } -} - -fn to_data_type(value: &BlockType) -> InfluxFieldType { - match value { - BlockType::Float => InfluxFieldType::Float, - BlockType::Integer => InfluxFieldType::Integer, - BlockType::Bool => InfluxFieldType::Boolean, - BlockType::Str => InfluxFieldType::String, - BlockType::Unsigned => InfluxFieldType::Integer, - } -} - -impl std::fmt::Debug for TsmFileConverter { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("TSMFileConverter") - .field("table_writer_source", &"DYNAMIC") - .finish() - } -} - -#[cfg(test)] -mod tests { - use super::*; - use influxdb_tsm::{ - reader::{BlockData, MockBlockDecoder}, - Block, - }; - use internal_types::{assert_column_eq, schema::InfluxColumnType}; - use packers::{Error as TableError, IOxTableWriter, IOxTableWriterSource, Packers}; - use test_helpers::approximately_equal; - - use flate2::read::GzDecoder; - use parking_lot::Mutex; - use std::fs::File; - use std::io::{BufReader, Cursor, Read}; - use std::sync::Arc; - - /// Record what happens when the writer is created so we can - /// inspect it as part of the tests. It uses string manipulation - /// for quick test writing and easy debugging - struct WriterLog { - events: Vec, - } - - impl WriterLog { - fn new() -> Self { - Self { events: Vec::new() } - } - } - - /// copies the events out of the shared log - fn get_events(log: &Arc>) -> Vec { - let log_mut = log.lock(); - log_mut.events.to_vec() - } - - /// Adds a new event to the log - fn log_event(log: &Arc>, event: String) { - let mut mut_log = log.lock(); - mut_log.events.push(event); - } - - struct NoOpWriter { - /// use a ptr and mutex so we can inspect the shared value of the - /// log during tests. Could probably use an Rc instead, but Arc may - /// be useful when implementing this multi threaded - log: Arc>, - measurement_name: String, - } - impl NoOpWriter { - fn new(log: Arc>, measurement_name: String) -> Self { - Self { - log, - measurement_name, - } - } - } - - impl IOxTableWriter for NoOpWriter { - fn write_batch(&mut self, packers: &[Packers]) -> Result<(), TableError> { - if packers.is_empty() { - log_event( - &self.log, - format!( - "[{}] Wrote no data; no packers passed", - self.measurement_name - ), - ); - } - - let rows_written = packers - .iter() - .fold(packers[0].num_rows(), |cur_len, packer| { - assert_eq!( - packer.num_rows(), - cur_len, - "Some packer had a different number of rows" - ); - cur_len - }); - - log_event( - &self.log, - format!( - "[{}] Wrote batch of {} cols, {} rows", - self.measurement_name, - packers.len(), - rows_written - ), - ); - Ok(()) - } - - fn close(&mut self) -> Result<(), TableError> { - log_event(&self.log, format!("[{}] Closed", self.measurement_name)); - Ok(()) - } - } - - /// Constructs NoOpWriters - struct NoOpWriterSource { - log: Arc>, - } - - impl NoOpWriterSource { - fn new(log: Arc>) -> Box { - Box::new(Self { log }) - } - } - - impl IOxTableWriterSource for NoOpWriterSource { - fn next_writer(&mut self, schema: &Schema) -> Result, TableError> { - let measurement_name = schema.measurement().unwrap(); - log_event( - &self.log, - format!("Created writer for measurement {}", measurement_name), - ); - Ok(Box::new(NoOpWriter::new( - Arc::clone(&self.log), - measurement_name.to_string(), - ))) - } - } - - fn only_good_lines(data: &str) -> Vec> { - influxdb_line_protocol::parse_lines(data) - .filter_map(|r| { - assert!(r.is_ok()); - r.ok() - }) - .collect() - } - - fn get_sampler_settings() -> ConversionSettings { - ConversionSettings { - sample_size: 2, - ..Default::default() - } - } - - #[test] - fn measurement_sampler_add_sample() { - let mut parsed_lines = only_good_lines( - r#" - cpu usage_system=64i 1590488773254420000 - cpu usage_system=67i 1590488773254430000 - cpu usage_system=68i 1590488773254440000"#, - ) - .into_iter(); - - let mut sampler = MeasurementSampler::new(get_sampler_settings()); - assert_eq!(sampler.sample_full(), false); - - sampler.add_sample(parsed_lines.next().unwrap()); - assert_eq!(sampler.sample_full(), false); - - sampler.add_sample(parsed_lines.next().unwrap()); - assert_eq!(sampler.sample_full(), true); - - // note it is ok to put more lines in than sample - sampler.add_sample(parsed_lines.next().unwrap()); - assert_eq!(sampler.sample_full(), true); - - assert_eq!(sampler.schema_sample.len(), 3); - } - - #[test] - fn measurement_sampler_deduce_schema_no_lines() { - let mut sampler = MeasurementSampler::new(get_sampler_settings()); - let schema_result = sampler.deduce_schema_from_sample(); - assert!(matches!(schema_result, Err(Error::NeedsAtLeastOneLine))); - } - - /// Creates a sampler and feeds all the lines found in data into it - fn make_sampler_from_data(data: &str) -> MeasurementSampler<'_> { - let parsed_lines = only_good_lines(data); - let mut sampler = MeasurementSampler::new(get_sampler_settings()); - for line in parsed_lines { - sampler.add_sample(line) - } - sampler - } - - #[test] - fn measurement_sampler_deduce_schema_one_line() { - let mut sampler = - make_sampler_from_data("cpu,host=A,region=west usage_system=64i 1590488773254420000"); - - let schema = sampler - .deduce_schema_from_sample() - .expect("Successful schema conversion"); - - assert_eq!(schema.measurement().unwrap(), "cpu"); - - println!("Converted to {:#?}", schema); - assert_column_eq!(schema, 0, InfluxColumnType::Tag, "host"); - assert_column_eq!(schema, 1, InfluxColumnType::Tag, "region"); - assert_column_eq!( - schema, - 2, - InfluxColumnType::Field(InfluxFieldType::Integer), - "usage_system" - ); - assert_column_eq!(schema, 3, InfluxColumnType::Timestamp, "time"); - } - - #[test] - fn measurement_sampler_deduce_schema_multi_line_same_schema() { - let mut sampler = make_sampler_from_data( - r#" - cpu,host=A,region=west usage_system=64i 1590488773254420000 - cpu,host=A,region=east usage_system=67i 1590488773254430000"#, - ); - - let schema = sampler - .deduce_schema_from_sample() - .expect("Successful schema conversion"); - assert_eq!(schema.measurement().unwrap(), "cpu"); - - println!("Converted to {:#?}", schema); - assert_column_eq!(schema, 0, InfluxColumnType::Tag, "host"); - assert_column_eq!(schema, 1, InfluxColumnType::Tag, "region"); - assert_column_eq!( - schema, - 2, - InfluxColumnType::Field(InfluxFieldType::Integer), - "usage_system" - ); - assert_column_eq!(schema, 3, InfluxColumnType::Timestamp, "time"); - } - - #[test] - fn measurement_sampler_deduce_schema_multi_line_new_field() { - // given two lines of protocol data that have different field names - let mut sampler = make_sampler_from_data( - r#" - cpu,host=A,region=west usage_system=64i 1590488773254420000 - cpu,host=A,region=east usage_user=61.32 1590488773254430000"#, - ); - - // when we extract the schema - let schema = sampler - .deduce_schema_from_sample() - .expect("Successful schema conversion"); - assert_eq!(schema.measurement().unwrap(), "cpu"); - - // then both field names appear in the resulting schema - println!("Converted to {:#?}", schema); - assert_column_eq!(schema, 0, InfluxColumnType::Tag, "host"); - assert_column_eq!(schema, 1, InfluxColumnType::Tag, "region"); - assert_column_eq!( - schema, - 2, - InfluxColumnType::Field(InfluxFieldType::Integer), - "usage_system" - ); - assert_column_eq!( - schema, - 3, - InfluxColumnType::Field(InfluxFieldType::Float), - "usage_user" - ); - assert_column_eq!(schema, 4, InfluxColumnType::Timestamp, "time"); - } - - #[test] - fn measurement_sampler_deduce_schema_multi_line_new_tags() { - // given two lines of protocol data that have different tags - let mut sampler = make_sampler_from_data( - r#" - cpu,host=A usage_system=64i 1590488773254420000 - cpu,host=A,fail_group=Z usage_system=61i 1590488773254430000"#, - ); - - // when we extract the schema - let schema = sampler - .deduce_schema_from_sample() - .expect("Successful schema conversion"); - assert_eq!(schema.measurement().unwrap(), "cpu"); - - // Then both tag names appear in the resulting schema - println!("Converted to {:#?}", schema); - assert_column_eq!(schema, 0, InfluxColumnType::Tag, "host"); - assert_column_eq!(schema, 1, InfluxColumnType::Tag, "fail_group"); - assert_column_eq!( - schema, - 2, - InfluxColumnType::Field(InfluxFieldType::Integer), - "usage_system" - ); - assert_column_eq!(schema, 3, InfluxColumnType::Timestamp, "time"); - } - - #[test] - fn measurement_sampler_deduce_schema_multi_line_field_changed() { - // given two lines of protocol data that have apparently different data types - // for the field: - let mut sampler = make_sampler_from_data( - r#" - cpu,host=A usage_system=64i 1590488773254420000 - cpu,host=A usage_system=61.1 1590488773254430000"#, - ); - - // when we extract the schema - let schema = sampler - .deduce_schema_from_sample() - .expect("Successful schema conversion"); - assert_eq!(schema.measurement().unwrap(), "cpu"); - - // Then the first field type appears in the resulting schema (TBD is this what - // we want??) - println!("Converted to {:#?}", schema); - - assert_column_eq!(schema, 0, InfluxColumnType::Tag, "host"); - assert_column_eq!( - schema, - 1, - InfluxColumnType::Field(InfluxFieldType::Integer), - "usage_system" - ); - assert_column_eq!(schema, 2, InfluxColumnType::Timestamp, "time"); - } - - #[test] - fn measurement_sampler_deduce_schema_multi_line_measurement_changed() { - // given two lines of protocol data for two different measurements - let mut sampler = make_sampler_from_data( - r#" - cpu,host=A usage_system=64i 1590488773254420000 - vcpu,host=A usage_system=61i 1590488773254430000"#, - ); - - // when we extract the schema - let schema_result = sampler.deduce_schema_from_sample(); - - // Then the converter does not support it - assert_eq!(schema_result.unwrap_err().to_string(), "Error building schema: Multiple measurement names not supported. Old measurement 'cpu', new measurement 'vcpu'"); - } - - // --- Tests for MeasurementWriter - fn get_writer_settings() -> ConversionSettings { - ConversionSettings { - measurement_write_buffer_size: 2, - ..Default::default() - } - } - - #[test] - fn measurement_writer_buffering() { - let log = Arc::new(Mutex::new(WriterLog::new())); - let table_writer = Box::new(NoOpWriter::new(Arc::clone(&log), String::from("cpu"))); - - let schema = InfluxSchemaBuilder::new() - .saw_measurement("cpu") - .unwrap() - .saw_influx_field("usage_system", InfluxFieldType::Integer) - .build() - .unwrap(); - - let mut writer = MeasurementWriter::new(get_writer_settings(), schema, table_writer); - assert_eq!(writer.write_buffer.capacity(), 2); - - let mut parsed_lines = only_good_lines( - r#" - cpu usage_system=64i 1590488773254420000 - cpu usage_system=67i 1590488773254430000 - cpu usage_system=68i 1590488773254440000"#, - ) - .into_iter(); - - // no rows should have been written - assert_eq!(get_events(&log).len(), 0); - - // buffer size is 2 we don't expect any writes until three rows are pushed - writer - .buffer_line(parsed_lines.next().expect("parse success")) - .unwrap(); - assert_eq!(get_events(&log).len(), 0); - writer - .buffer_line(parsed_lines.next().expect("parse success")) - .unwrap(); - assert_eq!(get_events(&log).len(), 0); - - // this should cause a flush and write - writer - .buffer_line(parsed_lines.next().expect("parse success")) - .unwrap(); - assert_eq!( - get_events(&log), - vec!["[cpu] Wrote batch of 2 cols, 2 rows"] - ); - - // finalize should write out the last line - writer.finalize().unwrap(); - assert_eq!( - get_events(&log), - vec![ - "[cpu] Wrote batch of 2 cols, 2 rows", - "[cpu] Wrote batch of 2 cols, 1 rows", - "[cpu] Closed", - ] - ); - } - - // ----- Tests for pack_data ----- - - // given protocol data for each datatype, ensure it is packed - // as expected. - // - // Note this table has a row with a tag and each field type and - // then a row where the tag, fields and timestamps each hold a null - static LP_DATA: &str = r#" - cpu,tag1=A int_field=64i,float_field=100.0,str_field="foo1",bool_field=t 1590488773254420000 - cpu,tag1=B int_field=65i,float_field=101.0,str_field="foo2",bool_field=t 1590488773254430000 - cpu int_field=66i,float_field=102.0,str_field="foo3",bool_field=t 1590488773254440000 - cpu,tag1=C float_field=103.0,str_field="foo4",bool_field=t 1590488773254450000 - cpu,tag1=D int_field=67i,str_field="foo5",bool_field=t 1590488773254460000 - cpu,tag1=E int_field=68i,float_field=104.0,bool_field=t 1590488773254470000 - cpu,tag1=F int_field=69i,float_field=105.0,str_field="foo6" 1590488773254480000 - cpu,tag1=G int_field=70i,float_field=106.0,str_field="foo7",bool_field=t - cpu,tag1=H int_field=71i,float_field=107.0,str_field="foo8",bool_field=t 1590488773254490000 - "#; - static EXPECTED_NUM_LINES: usize = 9; - - fn parse_data_into_sampler() -> MeasurementSampler<'static> { - let mut sampler = MeasurementSampler::new(get_sampler_settings()); - - for line in only_good_lines(LP_DATA) { - sampler.add_sample(line); - } - sampler - } - - #[test] - fn pack_data_schema() { - let schema = parse_data_into_sampler() - .deduce_schema_from_sample() - .unwrap(); - - // Then the correct schema is extracted - println!("Converted to {:#?}", schema); - - assert_column_eq!(schema, 0, InfluxColumnType::Tag, "tag1"); - assert_column_eq!( - schema, - 1, - InfluxColumnType::Field(InfluxFieldType::Integer), - "int_field" - ); - assert_column_eq!( - schema, - 2, - InfluxColumnType::Field(InfluxFieldType::Float), - "float_field" - ); - assert_column_eq!( - schema, - 3, - InfluxColumnType::Field(InfluxFieldType::String), - "str_field" - ); - assert_column_eq!( - schema, - 4, - InfluxColumnType::Field(InfluxFieldType::Boolean), - "bool_field" - ); - assert_column_eq!(schema, 5, InfluxColumnType::Timestamp, "time"); - } - - #[test] - fn pack_data_value() { - let mut sampler = parse_data_into_sampler(); - let schema = sampler.deduce_schema_from_sample().unwrap(); - - let packers = pack_lines(&schema, &sampler.schema_sample); - - // 6 columns so 6 packers - assert_eq!(packers.len(), 6); - - // all packers should have packed all lines - for p in &packers { - assert_eq!(p.num_rows(), EXPECTED_NUM_LINES); - } - - // Tag values - let tag_packer = packers[0].bytes_packer(); - assert_eq!(tag_packer.get(0).unwrap(), &ByteArray::from("A")); - assert_eq!(tag_packer.get(1).unwrap(), &ByteArray::from("B")); - assert!(packers[0].is_null(2)); - assert_eq!(tag_packer.get(3).unwrap(), &ByteArray::from("C")); - assert_eq!(tag_packer.get(4).unwrap(), &ByteArray::from("D")); - assert_eq!(tag_packer.get(5).unwrap(), &ByteArray::from("E")); - assert_eq!(tag_packer.get(6).unwrap(), &ByteArray::from("F")); - assert_eq!(tag_packer.get(7).unwrap(), &ByteArray::from("G")); - assert_eq!(tag_packer.get(8).unwrap(), &ByteArray::from("H")); - - // int_field values - let int_field_packer = &packers[1].i64_packer(); - assert_eq!(int_field_packer.get(0).unwrap(), &64); - assert_eq!(int_field_packer.get(1).unwrap(), &65); - assert_eq!(int_field_packer.get(2).unwrap(), &66); - assert!(int_field_packer.is_null(3)); - assert_eq!(int_field_packer.get(4).unwrap(), &67); - assert_eq!(int_field_packer.get(5).unwrap(), &68); - assert_eq!(int_field_packer.get(6).unwrap(), &69); - assert_eq!(int_field_packer.get(7).unwrap(), &70); - assert_eq!(int_field_packer.get(8).unwrap(), &71); - - // float_field values - let float_field_packer = &packers[2].f64_packer(); - assert!(approximately_equal( - *float_field_packer.get(0).unwrap(), - 100.0 - )); - assert!(approximately_equal( - *float_field_packer.get(1).unwrap(), - 101.0 - )); - assert!(approximately_equal( - *float_field_packer.get(2).unwrap(), - 102.0 - )); - assert!(approximately_equal( - *float_field_packer.get(3).unwrap(), - 103.0 - )); - assert!(float_field_packer.is_null(4)); - assert!(approximately_equal( - *float_field_packer.get(5).unwrap(), - 104.0 - )); - assert!(approximately_equal( - *float_field_packer.get(6).unwrap(), - 105.0 - )); - assert!(approximately_equal( - *float_field_packer.get(7).unwrap(), - 106.0 - )); - assert!(approximately_equal( - *float_field_packer.get(8).unwrap(), - 107.0 - )); - - // str_field values - let str_field_packer = &packers[3].bytes_packer(); - assert_eq!(str_field_packer.get(0).unwrap(), &ByteArray::from("foo1")); - assert_eq!(str_field_packer.get(1).unwrap(), &ByteArray::from("foo2")); - assert_eq!(str_field_packer.get(2).unwrap(), &ByteArray::from("foo3")); - assert_eq!(str_field_packer.get(3).unwrap(), &ByteArray::from("foo4")); - assert_eq!(str_field_packer.get(4).unwrap(), &ByteArray::from("foo5")); - assert!(str_field_packer.is_null(5)); - assert_eq!(str_field_packer.get(6).unwrap(), &ByteArray::from("foo6")); - assert_eq!(str_field_packer.get(7).unwrap(), &ByteArray::from("foo7")); - assert_eq!(str_field_packer.get(8).unwrap(), &ByteArray::from("foo8")); - - // bool_field values - let bool_field_packer = &packers[4].bool_packer(); - assert_eq!(bool_field_packer.get(0).unwrap(), &true); - assert_eq!(bool_field_packer.get(1).unwrap(), &true); - assert_eq!(bool_field_packer.get(2).unwrap(), &true); - assert_eq!(bool_field_packer.get(3).unwrap(), &true); - assert_eq!(bool_field_packer.get(4).unwrap(), &true); - assert_eq!(bool_field_packer.get(5).unwrap(), &true); - assert!(bool_field_packer.is_null(6)); - assert_eq!(bool_field_packer.get(7).unwrap(), &true); - assert_eq!(bool_field_packer.get(8).unwrap(), &true); - - // timestamp valuess - let timestamp_packer = &packers[5].i64_packer(); - assert_eq!(timestamp_packer.get(0).unwrap(), &1_590_488_773_254_420_000); - assert_eq!(timestamp_packer.get(1).unwrap(), &1_590_488_773_254_430_000); - assert_eq!(timestamp_packer.get(2).unwrap(), &1_590_488_773_254_440_000); - assert_eq!(timestamp_packer.get(3).unwrap(), &1_590_488_773_254_450_000); - assert_eq!(timestamp_packer.get(4).unwrap(), &1_590_488_773_254_460_000); - assert_eq!(timestamp_packer.get(5).unwrap(), &1_590_488_773_254_470_000); - assert_eq!(timestamp_packer.get(6).unwrap(), &1_590_488_773_254_480_000); - assert!(timestamp_packer.is_null(7)); - assert_eq!(timestamp_packer.get(8).unwrap(), &1_590_488_773_254_490_000); - } - - // ----- Tests for LineProtocolConverter ----- - - #[test] - fn conversion_of_no_lines() { - let parsed_lines = only_good_lines(""); - let log = Arc::new(Mutex::new(WriterLog::new())); - - let settings = ConversionSettings::default(); - let mut converter = - LineProtocolConverter::new(settings, NoOpWriterSource::new(Arc::clone(&log))); - converter - .convert(parsed_lines) - .expect("conversion ok") - .finalize() - .expect("finalize"); - - // no rows should have been written - assert_eq!(get_events(&log).len(), 0); - } - - #[test] - fn conversion_with_multiple_measurements() { - // These lines have interleaved measurements to force the - // state machine in LineProtocolConverter::convert through all - // the branches - let parsed_lines = only_good_lines( - r#"h2o_temperature,location=santa_monica surface_degrees=65.2,bottom_degrees=50.4 1568756160 - air_temperature,location=santa_monica sea_level_degrees=77.3,tenk_feet_feet_degrees=40.0 1568756160 - h2o_temperature,location=santa_monica surface_degrees=63.6,bottom_degrees=49.2 1600756160 - air_temperature,location=santa_monica sea_level_degrees=77.6,tenk_feet_feet_degrees=40.9 1600756160 - h2o_temperature,location=coyote_creek surface_degrees=55.1,bottom_degrees=51.3 1568756160 - air_temperature,location=coyote_creek sea_level_degrees=77.2,tenk_feet_feet_degrees=40.8 1568756160 - air_temperature,location=puget_sound sea_level_degrees=77.5,tenk_feet_feet_degrees=41.1 1568756160 - h2o_temperature,location=coyote_creek surface_degrees=50.2,bottom_degrees=50.9 1600756160 - h2o_temperature,location=puget_sound surface_degrees=55.8,bottom_degrees=40.2 1568756160 -"#, - ); - let log = Arc::new(Mutex::new(WriterLog::new())); - - let settings = ConversionSettings { - sample_size: 2, - measurement_write_buffer_size: 3, - }; - - let mut converter = - LineProtocolConverter::new(settings, NoOpWriterSource::new(Arc::clone(&log))); - - converter - .convert(parsed_lines) - .expect("conversion ok") - .finalize() - .expect("finalize"); - - assert_eq!( - get_events(&log), - vec![ - "Created writer for measurement h2o_temperature", - "Created writer for measurement air_temperature", - "[air_temperature] Wrote batch of 4 cols, 3 rows", - "[h2o_temperature] Wrote batch of 4 cols, 3 rows", - "[air_temperature] Wrote batch of 4 cols, 1 rows", - "[air_temperature] Closed", - "[h2o_temperature] Wrote batch of 4 cols, 2 rows", - "[h2o_temperature] Closed", - ] - ); - } - - // ----- Tests for TSM Data ----- - - #[test] - fn process_measurement_table() { - // Input data - in line protocol format - // - // cpu,region=east temp=1.2 0 - // cpu,region=east voltage=10.2 0 - // - // cpu,region=east temp=1.2 1000 - // cpu,region=east voltage=10.2 1000 - // - // cpu,region=east temp=1.4 2000 - // cpu,region=east voltage=10.4 2000 - // cpu,region=west,server=a temp=100.2 2000 - // - // cpu,az=b watts=1000 3000 - // cpu,region=west,server=a temp=99.5 3000 - // - // cpu,az=b watts=2000 4000 - // cpu,region=west,server=a temp=100.3 4000 - // - // cpu,az=b watts=3000 5000 - - #[rustfmt::skip] - // Expected output table - // - // | az | region | server | temp | voltage | watts | time | - // |------|--------|--------|-------|---------|---------|------| - // | b | NULL | NULL | NULL | NULL | 1000 | 3000 | - // | b | NULL | NULL | NULL | NULL | 2000 | 4000 | - // | b | NULL | NULL | NULL | NULL | 3000 | 5000 | - // | NULL | east | NULL | 1.2 | 10.2 | NULL | 0000 | <-- notice series joined on ts column - // | NULL | east | NULL | 1.2 | 10.2 | NULL | 1000 | <-- notice series joined on ts column - // | NULL | east | NULL | 1.4 | 10.4 | NULL | 2000 | <-- notice series joined on ts column - // | NULL | west | a | 100.2 | NULL | NULL | 2000 | - // | NULL | west | a | 99.5 | NULL | NULL | 3000 | - // | NULL | west | a | 100.3 | NULL | NULL | 4000 | - - let mut table = MeasurementTable::new("cpu".to_string(), 0); - // cpu region=east temp= - table - .add_series_data( - vec![("region".to_string(), "east".to_string())], - "temp".to_string(), - Block { - min_time: 0, - max_time: 0, - offset: 0, - size: 0, - typ: BlockType::Float, - reader_idx: 0, - }, - ) - .unwrap(); - - // cpu region=east voltage= - table - .add_series_data( - vec![("region".to_string(), "east".to_string())], - "voltage".to_string(), - Block { - min_time: 1, - max_time: 0, - offset: 0, - size: 0, - typ: BlockType::Float, - reader_idx: 0, - }, - ) - .unwrap(); - - // cpu region=west,server=a temp= - table - .add_series_data( - vec![ - ("region".to_string(), "west".to_string()), - ("server".to_string(), "a".to_string()), - ], - "temp".to_string(), - Block { - min_time: 2, - max_time: 0, - offset: 0, - size: 0, - typ: BlockType::Float, - reader_idx: 0, - }, - ) - .unwrap(); - - // cpu az=b watts= - table - .add_series_data( - vec![("az".to_string(), "b".to_string())], - "watts".to_string(), - Block { - min_time: 3, - max_time: 0, - offset: 0, - size: 0, - typ: BlockType::Unsigned, - reader_idx: 0, - }, - ) - .unwrap(); - - let mut block_map = BTreeMap::new(); - block_map.insert( - 0, - BlockData::Float { - i: 0, - ts: vec![0, 1000, 2000], - values: vec![1.2, 1.2, 1.4], - }, - ); - block_map.insert( - 1, - BlockData::Float { - i: 0, - ts: vec![0, 1000, 2000], - values: vec![10.2, 10.2, 10.4], - }, - ); - block_map.insert( - 2, - BlockData::Float { - i: 0, - ts: vec![2000, 3000, 4000], - values: vec![100.2, 99.5, 100.3], - }, - ); - block_map.insert( - 3, - BlockData::Unsigned { - i: 0, - ts: vec![3000, 4000, 5000], - values: vec![1000, 2000, 3000], - }, - ); - - let decoder = MockBlockDecoder::new(block_map); - let (schema, packers) = - TsmFileConverter::process_measurement_table(decoder, &mut table).unwrap(); - - assert_column_eq!(schema, 0, InfluxColumnType::Tag, "az"); - assert_column_eq!(schema, 1, InfluxColumnType::Tag, "region"); - assert_column_eq!(schema, 2, InfluxColumnType::Tag, "server"); - assert_column_eq!( - schema, - 3, - InfluxColumnType::Field(InfluxFieldType::Float), - "temp" - ); - assert_column_eq!( - schema, - 4, - InfluxColumnType::Field(InfluxFieldType::Float), - "voltage" - ); - assert_column_eq!( - schema, - 5, - InfluxColumnType::Field(InfluxFieldType::Integer), - "watts" - ); - assert_column_eq!(schema, 6, InfluxColumnType::Timestamp, "time"); - - // az column - assert_eq!( - packers[0], - Packers::Bytes(Packer::from(vec![ - Some(ByteArray::from("b")), - Some(ByteArray::from("b")), - Some(ByteArray::from("b")), - None, - None, - None, - None, - None, - None, - ])) - ); - // region column - assert_eq!( - packers[1], - Packers::Bytes(Packer::from(vec![ - None, - None, - None, - Some(ByteArray::from("east")), - Some(ByteArray::from("east")), - Some(ByteArray::from("east")), - Some(ByteArray::from("west")), - Some(ByteArray::from("west")), - Some(ByteArray::from("west")), - ])) - ); - // server column - assert_eq!( - packers[2], - Packers::Bytes(Packer::from(vec![ - None, - None, - None, - None, - None, - None, - Some(ByteArray::from("a")), - Some(ByteArray::from("a")), - Some(ByteArray::from("a")), - ])) - ); - // temp column - assert_eq!( - packers[3], - Packers::Float(Packer::from(vec![ - None, - None, - None, - Some(1.2), - Some(1.2), - Some(1.4), - Some(100.2), - Some(99.5), - Some(100.3), - ])) - ); - // voltage column - assert_eq!( - packers[4], - Packers::Float(Packer::from(vec![ - None, - None, - None, - Some(10.2), - Some(10.2), - Some(10.4), - None, - None, - None, - ])) - ); - // watts column - assert_eq!( - packers[5], - Packers::Integer(Packer::from(vec![ - Some(1000), - Some(2000), - Some(3000), - None, - None, - None, - None, - None, - None, - ])) - ); - // timestamp column - assert_eq!( - packers[6], - Packers::Integer(Packer::from(vec![ - Some(3000), - Some(4000), - Some(5000), - Some(0), - Some(1000), - Some(2000), - Some(2000), - Some(3000), - Some(4000), - ])) - ); - } - - fn empty_block() -> Block { - Block { - min_time: 0, - max_time: 0, - offset: 0, - size: 0, - typ: BlockType::Float, - reader_idx: 0, - } - } - - #[test] - fn merge_input_tables() { - let mut inputs = vec![]; - let mut table = MeasurementTable::new("cpu".to_string(), 0); - table - .add_series_data( - vec![("region".to_string(), "east".to_string())], - "temp".to_string(), - empty_block(), - ) - .unwrap(); - inputs.push(Some(table.clone())); - - table = MeasurementTable::new("cpu".to_string(), 1); - table - .add_series_data( - vec![("server".to_string(), "a".to_string())], - "temp".to_string(), - empty_block(), - ) - .unwrap(); - inputs.push(Some(table.clone())); - - table = MeasurementTable::new("disk".to_string(), 2); - table - .add_series_data( - vec![("region".to_string(), "west".to_string())], - "temp".to_string(), - empty_block(), - ) - .unwrap(); - inputs.push(Some(table)); - - let mut res = TsmFileConverter::merge_input_tables(inputs).unwrap(); - let mut merged = res.0.unwrap(); - inputs = res.1; - assert_eq!(merged.name, "cpu".to_string()); - assert_eq!(merged.tag_columns(), vec!["region", "server"]); - assert_eq!(inputs[0], None); - assert_eq!(inputs[1], None); - - res = TsmFileConverter::merge_input_tables(inputs).unwrap(); - merged = res.0.unwrap(); - assert_eq!(merged.name, "disk".to_string()); - assert_eq!(res.1, vec![None, None, None]); - } - - #[test] - fn conversion_tsm_file_single() { - let file = File::open("../tests/fixtures/merge-tsm/merge_a.tsm.gz"); - let mut decoder = GzDecoder::new(file.unwrap()); - let mut buf = Vec::new(); - decoder.read_to_end(&mut buf).unwrap(); - - let log = Arc::new(Mutex::new(WriterLog::new())); - let mut converter = TsmFileConverter::new(NoOpWriterSource::new(Arc::clone(&log))); - let index_steam = BufReader::new(Cursor::new(&buf)); - let block_stream = BufReader::new(Cursor::new(&buf)); - converter - .convert(vec![(index_steam, 39475)], vec![block_stream]) - .unwrap(); - - // CPU columns: - tags: cpu, host. (2) - // fields: usage_guest, usage_guest_nice - // usage_idle, usage_iowait, usage_irq, - // usage_nice, usage_softirq, usage_steal, - // usage_system, usage_user (10) - // timestamp (1) - // - // disk columns: - tags: device, fstype, host, mode, path (5) - // fields: free, inodes_free, inodes_total, inodes_used, - // total, used, used_percent (7) - // timestamp (1) - assert_eq!( - get_events(&log), - vec![ - "Created writer for measurement cpu", - "[cpu] Wrote batch of 13 cols, 85 rows", - "[cpu] Closed", - "Created writer for measurement disk", - "[disk] Wrote batch of 13 cols, 36 rows", - "[disk] Closed" - ], - ); - } - - #[test] - fn conversion_tsm_files_none_overlapping() { - let mut index_streams = Vec::new(); - let mut block_streams = Vec::new(); - - let file_a = File::open("../tests/fixtures/merge-tsm/merge_a.tsm.gz"); - let mut decoder_a = GzDecoder::new(file_a.unwrap()); - let mut buf_a = Vec::new(); - decoder_a.read_to_end(&mut buf_a).unwrap(); - index_streams.push((BufReader::new(Cursor::new(&buf_a)), 39475)); - block_streams.push(BufReader::new(Cursor::new(&buf_a))); - - let file_b = File::open("../tests/fixtures/merge-tsm/merge_b.tsm.gz"); - let mut decoder_b = GzDecoder::new(file_b.unwrap()); - let mut buf_b = Vec::new(); - decoder_b.read_to_end(&mut buf_b).unwrap(); - index_streams.push((BufReader::new(Cursor::new(&buf_b)), 45501)); - block_streams.push(BufReader::new(Cursor::new(&buf_b))); - - let log = Arc::new(Mutex::new(WriterLog::new())); - let mut converter = TsmFileConverter::new(NoOpWriterSource::new(Arc::clone(&log))); - - converter.convert(index_streams, block_streams).unwrap(); - - // CPU columns: - tags: cpu, host. (2) - // fields: usage_guest, usage_guest_nice - // usage_idle, usage_iowait, usage_irq, - // usage_nice, usage_softirq, usage_steal, - // usage_system, usage_user (10) - // timestamp (1) - // - // disk columns: - tags: device, fstype, host, mode, path (5) - // fields: free, inodes_free, inodes_total, inodes_used, - // total, used, used_percent (7) - // timestamp (1) - // - // In this case merge_a.tsm has 85 rows of data for cpu measurement and - // merge_b.tsm 340. - // - // For the disk measurement merge_a.tsm has 36 rows and merge_b.tsm 126 - assert_eq!( - get_events(&log), - vec![ - "Created writer for measurement cpu", - "[cpu] Wrote batch of 13 cols, 425 rows", - "[cpu] Closed", - "Created writer for measurement disk", - "[disk] Wrote batch of 13 cols, 162 rows", - "[disk] Closed" - ], - ); - } -} diff --git a/ingest/src/parquet.rs b/ingest/src/parquet.rs deleted file mode 100644 index 70ad7e6ff6..0000000000 --- a/ingest/src/parquet.rs +++ /dev/null @@ -1,12 +0,0 @@ -//! This module contains code for writing / reading data to parquet. -#![warn( - missing_copy_implementations, - missing_debug_implementations, - clippy::explicit_iter_loop, - clippy::use_self -)] - -pub mod error; -pub mod metadata; -pub mod stats; -pub mod writer; diff --git a/ingest/src/parquet/error.rs b/ingest/src/parquet/error.rs deleted file mode 100644 index d6dabec224..0000000000 --- a/ingest/src/parquet/error.rs +++ /dev/null @@ -1,16 +0,0 @@ -use snafu::Snafu; - -use parquet::errors::ParquetError; - -#[derive(Debug, Snafu)] -pub enum IOxParquetError { - #[snafu(display(r#"{}, underlying parquet error {}"#, message, source))] - #[snafu(visibility(pub(crate)))] - ParquetLibraryError { - message: String, - source: ParquetError, - }, - Unsupported, -} - -pub type Result = std::result::Result; diff --git a/ingest/src/parquet/metadata.rs b/ingest/src/parquet/metadata.rs deleted file mode 100644 index 453c9379f5..0000000000 --- a/ingest/src/parquet/metadata.rs +++ /dev/null @@ -1,110 +0,0 @@ -//! Provide storage statistics for parquet files -use super::error::{ParquetLibraryError, Result}; -use arrow::datatypes::DataType; -use parquet::{ - file::reader::{ChunkReader, FileReader, SerializedFileReader}, - schema, -}; -use snafu::ResultExt; - -pub fn parquet_schema_as_string(parquet_schema: &schema::types::Type) -> String { - let mut parquet_schema_string = Vec::new(); - schema::printer::print_schema(&mut parquet_schema_string, parquet_schema); - String::from_utf8_lossy(&parquet_schema_string).to_string() -} - -/// Maps from parquet types to table schema types -pub fn data_type_from_parquet_type(parquet_type: parquet::basic::Type) -> DataType { - use parquet::basic::Type::*; - - match parquet_type { - BOOLEAN => DataType::Boolean, - INT64 => DataType::Int64, - DOUBLE => DataType::Float64, - BYTE_ARRAY => DataType::Utf8, - _ => { - unimplemented!("Unsupported parquet datatype: {:?}", parquet_type); - } - } -} - -/// Print parquet metadata that can be read from `input`, with a total -/// size of `input_size` byes -pub fn print_parquet_metadata(input: R) -> Result<()> -where - R: ChunkReader, -{ - let input_len = input.len(); - - let reader = SerializedFileReader::new(input).context(ParquetLibraryError { - message: "Creating parquet reader", - })?; - - let parquet_metadata = reader.metadata(); - let file_metadata = parquet_metadata.file_metadata(); - let num_columns = file_metadata.schema_descr().num_columns(); - - println!("Parquet file size: {} bytes", input_len); - println!( - "Parquet file Schema: {}", - parquet_schema_as_string(file_metadata.schema()).trim_end() - ); - println!("Parquet file metadata:"); - println!(" row groups: {}", parquet_metadata.num_row_groups()); - if let Some(created_by) = file_metadata.created_by() { - println!(" created by: {:?}", created_by); - } - println!(" version: {}", file_metadata.version()); - println!(" num_rows: {}", file_metadata.num_rows()); - println!(" num_columns: {}", num_columns); - let kv_meta_len = match file_metadata.key_value_metadata() { - Some(v) => v.len(), - None => 0, - }; - println!(" key_value_meta.len: {:?}", kv_meta_len); - if let Some(column_orders) = file_metadata.column_orders() { - println!(" column_orders:"); - for (idx, col) in column_orders.iter().enumerate() { - println!(" column_order[{}]: {:?}", idx, col.sort_order()); - } - } - - println!("Row groups:"); - for (rg_idx, rg_metadata) in parquet_metadata.row_groups().iter().enumerate() { - println!(" Row Group [{}]:", rg_idx); - println!( - " total uncompressed byte size: {}", - rg_metadata.total_byte_size() - ); - for (cc_idx, cc_metadata) in rg_metadata.columns().iter().enumerate() { - println!(" Column Chunk [{}]:", cc_idx); - println!(" file_offset: {}", cc_metadata.file_offset()); - println!(" column_type: {:?}", cc_metadata.column_type()); - println!(" column_path: {}", cc_metadata.column_path().string()); - println!(" num_values: {}", cc_metadata.num_values()); - println!(" encodings: {:?}", cc_metadata.encodings()); - println!(" compression: {:?}", cc_metadata.compression()); - println!(" compressed_size: {}", cc_metadata.compressed_size()); - println!(" uncompressed_size: {}", cc_metadata.uncompressed_size()); - println!(" data_page_offset: {}", cc_metadata.data_page_offset()); - println!(" has_index_page: {}", cc_metadata.has_index_page()); - if let Some(index_page_offset) = cc_metadata.index_page_offset() { - println!(" index_page_offset: {}", index_page_offset); - } - println!( - " has_dictionary_page: {}", - cc_metadata.has_dictionary_page() - ); - if let Some(dictionary_page_offset) = cc_metadata.dictionary_page_offset() { - println!(" dictionary_page_offset: {}", dictionary_page_offset); - } - if let Some(statistics) = cc_metadata.statistics() { - println!(" statistics: {:?}", statistics); - } else { - println!(" NO STATISTICS"); - } - } - } - - Ok(()) -} diff --git a/ingest/src/parquet/stats.rs b/ingest/src/parquet/stats.rs deleted file mode 100644 index edf6aee385..0000000000 --- a/ingest/src/parquet/stats.rs +++ /dev/null @@ -1,131 +0,0 @@ -//! Provide storage statistics for parquet files -use observability_deps::tracing::debug; -use packers::{ - stats::{ColumnStatsBuilder, FileStats, FileStatsBuilder}, - Name, -}; -use parquet::{ - basic::{Compression, Encoding}, - file::reader::{ChunkReader, FileReader, SerializedFileReader}, -}; -use snafu::ResultExt; -use std::{collections::BTreeMap, convert::TryInto}; - -use super::{ - error::{ParquetLibraryError, Result}, - metadata::data_type_from_parquet_type, -}; - -/// Calculate storage statistics for a particular parquet "file" that can -/// be read from `input`, with a total size of `input_size` byes -/// -/// Returns a `FileStats` object representing statistics for all -/// columns across all column chunks. -pub fn file_stats(input: R) -> Result -where - R: ChunkReader + Name, -{ - let mut file_stats_builder = FileStatsBuilder::new(&input.name(), input.len()); - let reader = SerializedFileReader::new(input).context(ParquetLibraryError { - message: "Creating parquet reader", - })?; - - let mut stats_builders: BTreeMap = BTreeMap::new(); - - let parquet_metadata = reader.metadata(); - for (rg_idx, rg_metadata) in parquet_metadata.row_groups().iter().enumerate() { - debug!( - "Looking at Row Group [{}] (total uncompressed byte size {})", - rg_idx, - rg_metadata.total_byte_size() - ); - - for (cc_idx, cc_metadata) in rg_metadata.columns().iter().enumerate() { - let col_path = cc_metadata.column_path(); - let builder = stats_builders - .remove(&col_path.string()) - .unwrap_or_else(|| { - let data_type = data_type_from_parquet_type(cc_metadata.column_type()); - ColumnStatsBuilder::new(col_path.string(), cc_idx, data_type) - }) - .compression(&format!( - "Enc: {}, Comp: {}", - encoding_display(&cc_metadata.encodings()), - compression_display(&cc_metadata.compression()), - )) - .add_rows( - cc_metadata - .num_values() - .try_into() - .expect("positive number of values"), - ) - .add_compressed_bytes( - cc_metadata - .compressed_size() - .try_into() - .expect("positive compressed size"), - ) - .add_uncompressed_bytes( - cc_metadata - .uncompressed_size() - .try_into() - .expect("positive uncompressed size"), - ); - - // put the builder back in the map - stats_builders.insert(col_path.string(), builder); - } - } - - // now, marshal up all the results - for (_k, b) in stats_builders { - file_stats_builder = file_stats_builder.add_column(b.build()); - } - - Ok(file_stats_builder.build()) -} - -/// Create a more user friendly display of the encodings -fn encoding_display(encodings: &[Encoding]) -> String { - if encodings.iter().any(|&e| e == Encoding::RLE_DICTIONARY) { - // parquet represents "dictionary" encoding as [PLAIN, - // RLE_DICTIONARY, RLE] , which is somewhat confusing -- it means - // to point out that the dictionary page uses plain encoding, - // whereas the data page uses RLE encoding. - "Dictionary".into() - } else { - return format!("{:?}", encodings); - } -} - -/// Create a user friendly display of the encodings -fn compression_display(compression: &Compression) -> String { - return format!("{}", compression); -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_encoding_display() { - assert_eq!(&encoding_display(&[Encoding::PLAIN]), "[PLAIN]"); - assert_eq!( - &encoding_display(&[Encoding::PLAIN, Encoding::RLE]), - "[PLAIN, RLE]" - ); - assert_eq!( - &encoding_display(&[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]), - "Dictionary" - ); - assert_eq!( - &encoding_display(&[Encoding::DELTA_BYTE_ARRAY, Encoding::RLE_DICTIONARY]), - "Dictionary" - ); - } - - #[test] - fn test_compression_display() { - assert_eq!(&compression_display(&Compression::GZIP), "GZIP"); - } -} diff --git a/ingest/src/parquet/writer.rs b/ingest/src/parquet/writer.rs deleted file mode 100644 index df98aec7e6..0000000000 --- a/ingest/src/parquet/writer.rs +++ /dev/null @@ -1,709 +0,0 @@ -//! This module contains the code to write table data to parquet -use internal_types::schema::{InfluxColumnType, InfluxFieldType, Schema}; -use observability_deps::tracing::{debug, log::warn}; -use parquet::file::writer::ParquetWriter; -use parquet::{ - self, - basic::{ - Compression, Encoding, IntType, LogicalType, Repetition, TimeUnit, TimestampType, - Type as PhysicalType, - }, - errors::ParquetError, - file::{ - properties::{WriterProperties, WriterPropertiesBuilder}, - writer::{FileWriter, SerializedFileWriter, TryClone}, - }, - schema::types::{ColumnPath, Type}, -}; -use snafu::{OptionExt, ResultExt, Snafu}; -use std::{ - fmt, - io::{Seek, Write}, - str::FromStr, - sync::Arc, -}; - -use super::metadata::parquet_schema_as_string; -use packers::{Error as TableError, IOxTableWriter, Packers}; - -#[derive(Debug, Snafu)] -pub enum Error { - #[snafu(display(r#"{}, underlying parqet error {}"#, message, source))] - ParquetLibraryError { - message: String, - source: ParquetError, - }, - - #[snafu(display(r#"Could not get packer for column {}"#, column_number))] - MismatchedColumns { column_number: usize }, - - #[snafu(display( - r#"Unknown compression level '{}'. Valid options 'max' or 'compatibility'"#, - compression_level - ))] - UnknownCompressionLevel { compression_level: String }, - - #[snafu(display(r#"Unsupported datatype for parquet writing: {:?}"#, data_type,))] - UnsupportedDataType { data_type: String }, -} - -pub type Result = std::result::Result; - -impl From for TableError { - fn from(other: Error) -> Self { - Self::Data { - source: Box::new(other), - } - } -} - -/// Specify the desired compression level when writing parquet files -#[derive(Debug, Clone, Copy, PartialEq)] -pub enum CompressionLevel { - /// Minimize the size of the written parquet file - Maximum, - - // Attempt to maximize interoperability with other ecosystem tools. - // - // See https://github.com/influxdata/influxdb_iox/issues/184 - Compatibility, -} - -impl FromStr for CompressionLevel { - type Err = Error; - - fn from_str(compression_level: &str) -> Result { - match compression_level { - "max" => Ok(Self::Maximum), - "compatibility" => Ok(Self::Compatibility), - _ => UnknownCompressionLevel { compression_level }.fail(), - } - } -} - -/// A `IOxParquetTableWriter` is used for writing batches of rows -/// parquet files. -pub struct IOxParquetTableWriter -where - W: ParquetWriter, -{ - parquet_schema: Arc, - file_writer: SerializedFileWriter, -} - -impl IOxParquetTableWriter -where - W: Write + Seek + TryClone, -{ - /// Create a new TableWriter that writes its rows to something - /// that implements the trait (e.g. std::File). For example: - /// - /// ``` - /// # use std::fs; - /// # use internal_types::schema::{builder::SchemaBuilder, InfluxFieldType}; - /// # use packers::IOxTableWriter; - /// # use packers::{Packer, Packers}; - /// # use ingest::parquet::writer::{IOxParquetTableWriter, CompressionLevel}; - /// # use parquet::data_type::ByteArray; - /// - /// let schema = SchemaBuilder::new() - /// .measurement("measurement_name") - /// .tag("tag1") - /// .influx_field("field1", InfluxFieldType::Integer) - /// .timestamp() - /// .build() - /// .unwrap(); - /// - /// let mut packers: Vec = vec![ - /// Packers::Bytes(Packer::new()), // 0: tag1 - /// Packers::Integer(Packer::new()), // 1: field1 - /// Packers::Integer(Packer::new()), // 2: timestamp - /// ]; - /// - /// packers[0].bytes_packer_mut().push(ByteArray::from("tag1")); // tag1 val - /// packers[1].i64_packer_mut().push(100); // field1 val - /// packers[2].push_none(); // no timestamp - /// - /// // Write to '/tmp/example.parquet' - /// let mut output_file_name = std::env::temp_dir(); - /// output_file_name.push("example.parquet"); - /// let output_file = fs::File::create(output_file_name.as_path()).unwrap(); - /// - /// let compression_level = CompressionLevel::Compatibility; - /// - /// let mut parquet_writer = IOxParquetTableWriter::new( - /// &schema, compression_level, output_file) - /// .unwrap(); - /// - /// // write the actual data to parquet - /// parquet_writer.write_batch(&packers).unwrap(); - /// - /// // Closing the writer closes the data and the file - /// parquet_writer.close().unwrap(); - /// - /// # std::fs::remove_file(output_file_name); - /// ``` - pub fn new( - schema: &Schema, - compression_level: CompressionLevel, - writer: W, - ) -> Result { - let writer_props = create_writer_props(&schema, compression_level); - let parquet_schema = convert_to_parquet_schema(&schema)?; - - let file_writer = - SerializedFileWriter::new(writer, Arc::clone(&parquet_schema), writer_props).context( - ParquetLibraryError { - message: String::from("Error trying to create a SerializedFileWriter"), - }, - )?; - - let parquet_writer = Self { - parquet_schema, - file_writer, - }; - debug!( - "ParqutWriter created for schema: {}", - parquet_schema_as_string(&parquet_writer.parquet_schema) - ); - Ok(parquet_writer) - } -} -impl IOxTableWriter for IOxParquetTableWriter -where - W: Write + Seek + TryClone, -{ - /// Writes a batch of packed data to the output file in a single - /// column chunk - /// - /// TODO: better control of column chunks - fn write_batch(&mut self, packers: &[Packers]) -> Result<(), TableError> { - // now write out the data - let mut row_group_writer = - self.file_writer - .next_row_group() - .context(ParquetLibraryError { - message: String::from("Error creating next row group writer"), - })?; - - use parquet::column::writer::ColumnWriter::*; - let mut column_number = 0; - while let Some(mut col_writer) = - row_group_writer - .next_column() - .context(ParquetLibraryError { - message: String::from("Can't create the next row_group_writer"), - })? - { - let packer = packers - .get(column_number) - .context(MismatchedColumns { column_number }) - .map_err(TableError::from_other)?; - - // TODO(edd) This seems super awkward and not the right way to do it... - // We know we have a direct mapping between a col_writer (ColumnWriter) - // type and a Packers variant. We also know that we do exactly the same - // work for each variant (we just dispatch to the writ_batch method) - // on the column write. - // - // I think this match could be so much shorter but not sure how yet. - match col_writer { - BoolColumnWriter(ref mut w) => { - let p = packer.bool_packer(); - let n = w - .write_batch(&p.some_values(), Some(&p.def_levels()), None) - .context(ParquetLibraryError { - message: String::from("Can't write_batch with bool values"), - })?; - debug!("Wrote {} rows of bool data", n); - } - Int32ColumnWriter(_) => unreachable!("ParquetWriter does not support INT32 data"), - Int64ColumnWriter(ref mut w) => { - let p = packer.i64_packer(); - let n = w - .write_batch(&p.some_values(), Some(&p.def_levels()), None) - .context(ParquetLibraryError { - message: String::from("Can't write_batch with int64 values"), - })?; - debug!("Wrote {} rows of int64 data", n); - } - Int96ColumnWriter(_) => unreachable!("ParquetWriter does not support INT96 data"), - FloatColumnWriter(_) => { - unreachable!("ParquetWriter does not support FLOAT (32-bit float) data") - } - DoubleColumnWriter(ref mut w) => { - let p = packer.f64_packer(); - let n = w - .write_batch(&p.some_values(), Some(&p.def_levels()), None) - .context(ParquetLibraryError { - message: String::from("Can't write_batch with f64 values"), - })?; - debug!("Wrote {} rows of f64 data", n); - } - ByteArrayColumnWriter(ref mut w) => { - let p = packer.bytes_packer(); - let n = w - .write_batch(&p.some_values(), Some(&p.def_levels()), None) - .context(ParquetLibraryError { - message: String::from("Can't write_batch with byte array values"), - })?; - debug!("Wrote {} rows of byte data", n); - } - FixedLenByteArrayColumnWriter(_) => { - unreachable!("ParquetWriter does not support FIXED_LEN_BYTE_ARRAY data"); - } - }; - debug!("Closing column writer for {}", column_number); - row_group_writer - .close_column(col_writer) - .context(ParquetLibraryError { - message: String::from("Can't close column writer"), - })?; - column_number += 1; - } - self.file_writer - .close_row_group(row_group_writer) - .context(ParquetLibraryError { - message: String::from("Can't close row group writer"), - })?; - Ok(()) - } - - /// Closes this writer, and finalizes the underlying parquet file - fn close(&mut self) -> Result<(), TableError> { - self.file_writer.close().context(ParquetLibraryError { - message: String::from("Can't close file writer"), - })?; - Ok(()) - } -} - -impl fmt::Debug for IOxParquetTableWriter -where - W: Write + Seek + TryClone, -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("IOxParquetTableWriter") - .field("parquet_schema", &self.parquet_schema) - .field("file_writer", &"SerializedFileWriter") - .finish() - } -} - -// Converts from line protocol `Schema` to the equivalent parquet schema `Type`. -fn convert_to_parquet_schema(schema: &Schema) -> Result, Error> { - let mut parquet_columns = Vec::new(); - - for (i, (influxdb_column_type, field)) in schema.iter().enumerate() { - debug!( - "Determining parquet schema for column[{}] {:?} -> {:?}", - i, influxdb_column_type, field - ); - let (physical_type, logical_type) = match influxdb_column_type { - Some(InfluxColumnType::Tag) => ( - PhysicalType::BYTE_ARRAY, - Some(LogicalType::STRING(Default::default())), - ), - Some(InfluxColumnType::Field(InfluxFieldType::Boolean)) => { - (PhysicalType::BOOLEAN, None) - } - Some(InfluxColumnType::Field(InfluxFieldType::Float)) => (PhysicalType::DOUBLE, None), - Some(InfluxColumnType::Field(InfluxFieldType::Integer)) => (PhysicalType::INT64, None), - Some(InfluxColumnType::Field(InfluxFieldType::UInteger)) => ( - PhysicalType::INT64, - Some(LogicalType::INTEGER(IntType { - bit_width: 64, - is_signed: false, - })), - ), - Some(InfluxColumnType::Field(InfluxFieldType::String)) => ( - PhysicalType::BYTE_ARRAY, - Some(LogicalType::STRING(Default::default())), - ), - Some(InfluxColumnType::Timestamp) => { - ( - PhysicalType::INT64, - Some(LogicalType::TIMESTAMP(TimestampType { - unit: TimeUnit::NANOS(Default::default()), - // Indicates that the timestamp is stored as UTC values - is_adjusted_to_u_t_c: true, - })), - ) - } - None => { - return UnsupportedDataType { - data_type: format!("Arrow type: {:?}", field.data_type()), - } - .fail(); - } - }; - - // All fields are optional - let parquet_column_builder = Type::primitive_type_builder(field.name(), physical_type) - .with_repetition(Repetition::OPTIONAL) - .with_logical_type(logical_type); - - let parquet_column_type = parquet_column_builder - .build() - .context(ParquetLibraryError { - message: String::from("Can't create parquet column type"), - })?; - debug!( - "Using parquet type {} for column {:?}", - parquet_schema_as_string(&parquet_column_type), - field.name() - ); - - parquet_columns.push(Arc::new(parquet_column_type)); - } - - let measurement = schema.measurement().unwrap(); - let parquet_schema = Type::group_type_builder(measurement) - .with_fields(&mut parquet_columns) - .build() - .context(ParquetLibraryError { - message: String::from("Can't create top level parquet schema"), - })?; - - Ok(Arc::new(parquet_schema)) -} - -fn set_integer_encoding( - influxdb_column_type: InfluxColumnType, - compression_level: CompressionLevel, - col_path: ColumnPath, - builder: WriterPropertiesBuilder, -) -> WriterPropertiesBuilder { - match compression_level { - CompressionLevel::Maximum => { - debug!( - "Setting encoding of {:?} col {} to DELTA_BINARY_PACKED (Maximum)", - influxdb_column_type, col_path - ); - builder - .set_column_encoding(col_path.clone(), Encoding::DELTA_BINARY_PACKED) - .set_column_dictionary_enabled(col_path, false) - } - CompressionLevel::Compatibility => { - debug!( - "Setting encoding of {:?} col {} to PLAIN/RLE (Compatibility)", - influxdb_column_type, col_path - ); - builder - .set_column_encoding(col_path.clone(), Encoding::PLAIN) - .set_column_dictionary_enabled(col_path, true) - } - } -} - -/// Create the parquet writer properties (which defines the encoding -/// and compression for each column) for a given schema. -fn create_writer_props( - schema: &Schema, - compression_level: CompressionLevel, -) -> Arc { - let mut builder = WriterProperties::builder(); - - // TODO: Maybe tweak more of these settings for maximum performance. - - // start off with GZIP for maximum compression ratio (at expense of CPU - // performance...) - builder = builder.set_compression(Compression::GZIP); - - // Setup encoding as defined in - // https://github.com/influxdata/influxdb_iox/blob/alamb/encoding_thoughts/docs/encoding_thoughts.md - // - // Note: the property writer builder's default is to encode - // everything with dictionary encoding, and it turns out that - // dictionary encoding overrides all other encodings. Thus, we - // must explicitly disable dictionary encoding when another - // encoding is desired. - for (i, (influxdb_column_type, field)) in schema.iter().enumerate() { - let column_name = field.name().clone(); - let col_path: ColumnPath = column_name.into(); - - match influxdb_column_type { - Some(InfluxColumnType::Field(InfluxFieldType::Boolean)) => { - debug!( - "Setting encoding of {:?} col {} to RLE", - influxdb_column_type, i - ); - builder = builder - .set_column_encoding(col_path.clone(), Encoding::RLE) - .set_column_dictionary_enabled(col_path, false); - } - Some(InfluxColumnType::Field(InfluxFieldType::Integer)) => { - builder = set_integer_encoding( - influxdb_column_type.unwrap(), - compression_level, - col_path, - builder, - ) - } - Some(InfluxColumnType::Field(InfluxFieldType::UInteger)) => { - builder = set_integer_encoding( - influxdb_column_type.unwrap(), - compression_level, - col_path, - builder, - ) - } - Some(InfluxColumnType::Field(InfluxFieldType::Float)) => { - debug!( - "Setting encoding of {:?} col {} to PLAIN", - influxdb_column_type, col_path - ); - builder = builder - .set_column_encoding(col_path.clone(), Encoding::PLAIN) - .set_column_dictionary_enabled(col_path, false); - } - Some(InfluxColumnType::Field(InfluxFieldType::String)) => { - debug!( - "Setting encoding of non-tag val {:?} col {} to DELTA_LENGTH_BYTE_ARRAY", - influxdb_column_type, col_path - ); - builder = builder - .set_column_encoding(col_path.clone(), Encoding::DELTA_LENGTH_BYTE_ARRAY) - .set_column_dictionary_enabled(col_path, false); - } - // tag values are often very much repeated - Some(InfluxColumnType::Tag) => { - debug!( - "Setting encoding of tag val {:?} col {} to dictionary", - influxdb_column_type, col_path - ); - builder = builder.set_column_dictionary_enabled(col_path, true); - } - Some(InfluxColumnType::Timestamp) => { - builder = set_integer_encoding( - influxdb_column_type.unwrap(), - compression_level, - col_path, - builder, - ) - } - None => { - warn!( - "Using default parquet encoding for column {} which has no LP annotations", - field.name() - ) - } - }; - } - - // Even though the 'set_statistics_enabled()' method is called here, the - // resulting parquet file does not appear to have statistics enabled. - // - // This is due to the fact that the underlying rust parquet - // library does not support statistics generation at this time. - let props = builder - .set_statistics_enabled(true) - .set_created_by("InfluxDB IOx".to_string()) - .build(); - Arc::new(props) -} - -#[cfg(test)] -mod tests { - use internal_types::schema::builder::SchemaBuilder; - - use super::*; - - // Collapses multiple spaces into a single space, and removes trailing - // whitespace - fn normalize_spaces(s: &str) -> String { - // previous non space, if any - let mut prev: Option = None; - let no_double_spaces: String = s - .chars() - .filter(|c| { - if let Some(prev_c) = prev { - if prev_c == ' ' && *c == ' ' { - return false; - } - } - prev = Some(*c); - true - }) - .collect(); - no_double_spaces - .trim_end_matches(|c| c == ' ' || c == '\n') - .to_string() - } - - #[test] - fn test_convert_to_parquet_schema() { - let schema = SchemaBuilder::new() - .measurement("measurement_name") - .tag("tag1") - .influx_field("string_field", InfluxFieldType::String) - .influx_field("float_field", InfluxFieldType::Float) - .influx_field("int_field", InfluxFieldType::Integer) - .influx_field("uint_field", InfluxFieldType::UInteger) - .influx_field("bool_field", InfluxFieldType::Boolean) - .timestamp() - .build() - .unwrap(); - - let parquet_schema = convert_to_parquet_schema(&schema).expect("conversion successful"); - let parquet_schema_string = normalize_spaces(&parquet_schema_as_string(&parquet_schema)); - let expected_schema_string = normalize_spaces( - r#"message measurement_name { - OPTIONAL BYTE_ARRAY tag1 (STRING); - OPTIONAL BYTE_ARRAY string_field (STRING); - OPTIONAL DOUBLE float_field; - OPTIONAL INT64 int_field; - OPTIONAL INT64 uint_field (INTEGER(64,false)); - OPTIONAL BOOLEAN bool_field; - OPTIONAL INT64 time (TIMESTAMP(NANOS,true)); -}"#, - ); - - assert_eq!(parquet_schema_string, expected_schema_string); - } - - fn make_test_schema() -> Schema { - SchemaBuilder::new() - .measurement("measurement_name") - .tag("tag1") - .influx_field("string_field", InfluxFieldType::String) - .influx_field("float_field", InfluxFieldType::Float) - .influx_field("int_field", InfluxFieldType::Integer) - .influx_field("bool_field", InfluxFieldType::Boolean) - .timestamp() - .build() - .unwrap() - } - - #[test] - fn test_create_writer_props_maximum() { - do_test_create_writer_props(CompressionLevel::Maximum); - } - - #[test] - fn test_create_writer_props_compatibility() { - do_test_create_writer_props(CompressionLevel::Compatibility); - } - - fn do_test_create_writer_props(compression_level: CompressionLevel) { - let schema = make_test_schema(); - let writer_props = create_writer_props(&schema, compression_level); - - let tag1_colpath = ColumnPath::from("tag1"); - assert_eq!(writer_props.encoding(&tag1_colpath), None); - assert_eq!(writer_props.compression(&tag1_colpath), Compression::GZIP); - assert_eq!(writer_props.dictionary_enabled(&tag1_colpath), true); - assert_eq!(writer_props.statistics_enabled(&tag1_colpath), true); - - let string_field_colpath = ColumnPath::from("string_field"); - assert_eq!( - writer_props.encoding(&string_field_colpath), - Some(Encoding::DELTA_LENGTH_BYTE_ARRAY) - ); - assert_eq!( - writer_props.compression(&string_field_colpath), - Compression::GZIP - ); - assert_eq!( - writer_props.dictionary_enabled(&string_field_colpath), - false - ); - assert_eq!(writer_props.statistics_enabled(&string_field_colpath), true); - - let float_field_colpath = ColumnPath::from("float_field"); - assert_eq!( - writer_props.encoding(&float_field_colpath), - Some(Encoding::PLAIN) - ); - assert_eq!( - writer_props.compression(&float_field_colpath), - Compression::GZIP - ); - assert_eq!(writer_props.dictionary_enabled(&float_field_colpath), false); - assert_eq!(writer_props.statistics_enabled(&float_field_colpath), true); - - let int_field_colpath = ColumnPath::from("int_field"); - match compression_level { - CompressionLevel::Maximum => { - assert_eq!( - writer_props.encoding(&int_field_colpath), - Some(Encoding::DELTA_BINARY_PACKED) - ); - assert_eq!(writer_props.dictionary_enabled(&int_field_colpath), false); - } - CompressionLevel::Compatibility => { - assert_eq!( - writer_props.encoding(&int_field_colpath), - Some(Encoding::PLAIN) - ); - assert_eq!(writer_props.dictionary_enabled(&int_field_colpath), true); - } - } - assert_eq!( - writer_props.compression(&int_field_colpath), - Compression::GZIP - ); - assert_eq!(writer_props.statistics_enabled(&int_field_colpath), true); - - let bool_field_colpath = ColumnPath::from("bool_field"); - assert_eq!( - writer_props.encoding(&bool_field_colpath), - Some(Encoding::RLE) - ); - assert_eq!( - writer_props.compression(&bool_field_colpath), - Compression::GZIP - ); - assert_eq!(writer_props.dictionary_enabled(&bool_field_colpath), false); - assert_eq!(writer_props.statistics_enabled(&bool_field_colpath), true); - - let timestamp_field_colpath = ColumnPath::from("time"); - match compression_level { - CompressionLevel::Maximum => { - assert_eq!( - writer_props.encoding(×tamp_field_colpath), - Some(Encoding::DELTA_BINARY_PACKED) - ); - assert_eq!( - writer_props.dictionary_enabled(×tamp_field_colpath), - false - ); - } - CompressionLevel::Compatibility => { - assert_eq!( - writer_props.encoding(×tamp_field_colpath), - Some(Encoding::PLAIN) - ); - assert_eq!( - writer_props.dictionary_enabled(×tamp_field_colpath), - true - ); - } - } - - assert_eq!( - writer_props.compression(×tamp_field_colpath), - Compression::GZIP - ); - - assert_eq!( - writer_props.statistics_enabled(×tamp_field_colpath), - true - ); - } - - #[test] - fn compression_level() { - assert_eq!( - CompressionLevel::from_str("max").ok().unwrap(), - CompressionLevel::Maximum - ); - assert_eq!( - CompressionLevel::from_str("compatibility").ok().unwrap(), - CompressionLevel::Compatibility - ); - - let bad = CompressionLevel::from_str("madxxxx"); - assert!(bad.is_err()); - } -} diff --git a/ingest/tests/read_write.rs b/ingest/tests/read_write.rs deleted file mode 100644 index dd1f56d026..0000000000 --- a/ingest/tests/read_write.rs +++ /dev/null @@ -1,97 +0,0 @@ -use ingest::parquet::writer::{CompressionLevel, IOxParquetTableWriter}; -use internal_types::schema::{builder::SchemaBuilder, InfluxFieldType}; -use packers::{IOxTableWriter, Packer, Packers}; - -use parquet::data_type::ByteArray; -use std::fs; - -#[test] -fn test_write_parquet_data() { - let schema = SchemaBuilder::new() - .measurement("measurement_name") - .tag("tag1") - .influx_field("string_field", InfluxFieldType::String) - .influx_field("float_field", InfluxFieldType::Float) - .influx_field("int_field", InfluxFieldType::Integer) - .influx_field("bool_field", InfluxFieldType::Boolean) - .timestamp() - .build() - .unwrap(); - - assert_eq!(schema.len(), 6); - let mut packers = vec![ - Packers::Bytes(Packer::new()), // 0: tag1 - Packers::Bytes(Packer::new()), // 1: string_field - Packers::Float(Packer::new()), // 2: float_field - Packers::Integer(Packer::new()), // 3: int_field - Packers::Boolean(Packer::new()), // 4: bool_field - Packers::Integer(Packer::new()), // 5: timestamp - ]; - - // create this data: - - // row 0: "tag1_val0", "str_val0", 1.0, 100, true, 900000000000 - // row 1: null, null , null, null, null, null - // row 2: "tag1_val2", "str_val2", 2.0, 200, false, 9100000000000 - packers[0] - .bytes_packer_mut() - .push(ByteArray::from("tag1_val0")); - packers[1] - .bytes_packer_mut() - .push(ByteArray::from("str_val0")); - packers[2].f64_packer_mut().push(1.0); - packers[3].i64_packer_mut().push(100); - packers[4].bool_packer_mut().push(true); - packers[5].i64_packer_mut().push(900000000000); - - packers[0].push_none(); - packers[1].push_none(); - packers[2].push_none(); - packers[3].push_none(); - packers[4].push_none(); - packers[5].push_none(); - - packers[0] - .bytes_packer_mut() - .push(ByteArray::from("tag1_val2")); - packers[1] - .bytes_packer_mut() - .push(ByteArray::from("str_val2")); - packers[2].f64_packer_mut().push(2.0); - packers[3].i64_packer_mut().push(200); - packers[4].bool_packer_mut().push(true); - packers[5].i64_packer_mut().push(910000000000); - - // write the data out to the parquet file - let output_path = test_helpers::tempfile::Builder::new() - .prefix("iox_parquet_e2e") - .suffix(".parquet") - .tempfile() - .expect("error creating temp file") - .into_temp_path(); - let output_file = fs::File::create(&output_path).expect("can't open temp file for writing"); - - let mut parquet_writer = - IOxParquetTableWriter::new(&schema, CompressionLevel::Compatibility, output_file) - .expect("can't create parquet writer"); - parquet_writer - .write_batch(&packers) - .expect("can't write batch"); - parquet_writer.close().expect("can't close writer"); - - // verify the file has some non zero content - let file_meta = fs::metadata(&output_path).expect("Can't get file metadata"); - assert!(file_meta.is_file()); - assert!(file_meta.len() > 0, "Length was {}", file_meta.len()); - - // TODO: implement a parquet reader to read the file back in - // then read the data back in and verify it came out successfully - let output_path_string: String = output_path.to_string_lossy().to_string(); - output_path.close().expect("error deleting the temp file"); - - assert!( - fs::metadata(&output_path_string).is_err(), - "temp file was not cleaned up {:?}", - &output_path_string - ); -} diff --git a/internal_types/src/schema/builder.rs b/internal_types/src/schema/builder.rs index 40455db6eb..945c41672c 100644 --- a/internal_types/src/schema/builder.rs +++ b/internal_types/src/schema/builder.rs @@ -1,5 +1,4 @@ -use observability_deps::tracing::warn; -use snafu::{OptionExt, ResultExt, Snafu}; +use snafu::{ResultExt, Snafu}; use std::{ collections::{HashMap, HashSet}, convert::TryInto, @@ -12,19 +11,6 @@ use super::{InfluxColumnType, InfluxFieldType, Schema, TIME_COLUMN_NAME}; /// Database schema creation / validation errors. #[derive(Debug, Snafu)] pub enum Error { - #[snafu(display("No measurement provided",))] - NoMeasurement {}, - - #[snafu(display( - "Multiple measurement names not supported. Old measurement '{}', new measurement '{}'", - old_measurement, - new_measurement - ))] - MultipleMeasurementNames { - old_measurement: String, - new_measurement: String, - }, - #[snafu(display("Error validating schema: {}", source))] ValidatingSchema { source: super::Error }, @@ -207,107 +193,6 @@ impl SchemaBuilder { } } -/// Specialized Schema Builder for use building up am InfluxDB data -/// model Schema while streaming line protocol through it. -/// -/// Duplicated tag and field definitions are ignored, and the -/// resulting schema always has puts tags as the initial columns (in the -/// order of appearance) followed by fields (in order of appearance) -/// and then timestamp -#[derive(Debug, Default)] -pub struct InfluxSchemaBuilder { - /// What tag names we have seen so far - tag_set: HashSet, - /// What field names we have seen so far - field_set: HashMap, - - /// Keep track of the tag_columns in order they were added - tag_list: Vec, - /// Track the fields in order they were added - field_list: Vec, - - /// Keep The measurement name, if seen - measurement: Option, -} - -impl InfluxSchemaBuilder { - pub fn new() -> Self { - Self::default() - } - - /// Set optional measurement name, erroring if previously specified - pub fn saw_measurement(mut self, measurement: impl Into) -> Result { - let new_measurement = measurement.into(); - - if let Some(old_measurement) = &self.measurement { - if old_measurement != &new_measurement { - return MultipleMeasurementNames { - old_measurement, - new_measurement, - } - .fail(); - } - } else { - self.measurement = Some(new_measurement) - } - Ok(self) - } - - /// Add a new tag column to this schema, ignoring if the tag has already - /// been seen - pub fn saw_tag(mut self, column_name: &str) -> Self { - if !self.tag_set.contains(column_name) { - self.tag_set.insert(column_name.to_string()); - self.tag_list.push(column_name.to_string()); - }; - - self - } - - /// Add a new field column with the specified InfluxDB data model - /// type, ignoring if that field has been seen TODO error if the - /// field is a different type (old implementation produces warn! - /// in this condition) - pub fn saw_influx_field( - mut self, - column_name: &str, - influxdb_field_type: InfluxFieldType, - ) -> Self { - if let Some(existing_influxdb_field_type) = self.field_set.get(column_name) { - if &influxdb_field_type != existing_influxdb_field_type { - warn!("Ignoring new type for field '{}': Previously it had type {:?}, attempted to set type {:?}.", - column_name, existing_influxdb_field_type, influxdb_field_type); - } - } else { - self.field_set - .insert(column_name.to_string(), influxdb_field_type); - self.field_list.push(column_name.to_string()) - } - self - } - - /// Build a schema object from the collected schema - pub fn build(self) -> Result { - let builder = - SchemaBuilder::new().measurement(self.measurement.as_ref().context(NoMeasurement)?); - - // tags always first - let builder = self - .tag_list - .iter() - .fold(builder, |builder, tag_name| builder.tag(tag_name)); - - // then fields (in order they were added) - let builder = self.field_list.iter().fold(builder, |builder, field_name| { - let influxdb_field_type = self.field_set.get(field_name).unwrap(); - builder.influx_field(field_name, *influxdb_field_type) - }); - - // and now timestamp - builder.timestamp().build() - } -} - /// Schema Merger /// /// The usecase for merging schemas is when different chunks have @@ -522,68 +407,6 @@ mod test { assert_eq!(res.unwrap_err().to_string(), "Error validating schema: Duplicate column name: 'time' was specified to be Tag as well as timestamp"); } - #[test] - fn test_lp_builder_basic() { - let s = InfluxSchemaBuilder::new() - .saw_influx_field("the_field", Float) - .saw_tag("the_tag") - .saw_tag("the_tag") - .saw_influx_field("the_field", Float) - .saw_measurement("the_measurement") - .unwrap() - .saw_tag("the_tag") - .saw_tag("the_second_tag") - .saw_measurement("the_measurement") - .unwrap() - .build() - .unwrap(); - - assert_column_eq!(s, 0, Tag, "the_tag"); - assert_column_eq!(s, 1, Tag, "the_second_tag"); - assert_column_eq!(s, 2, Field(Float), "the_field"); - assert_column_eq!(s, 3, Timestamp, "time"); - - assert_eq!(s.measurement().unwrap(), "the_measurement"); - assert_eq!(s.len(), 4); - } - - #[test] - fn test_lp_builder_no_measurement() { - let res = InfluxSchemaBuilder::new() - .saw_tag("the_tag") - .saw_influx_field("the_field", Float) - .build(); - - assert_eq!(res.unwrap_err().to_string(), "No measurement provided"); - } - - #[test] - fn test_lp_builder_different_measurement() { - let res = InfluxSchemaBuilder::new() - .saw_measurement("m1") - .unwrap() - .saw_measurement("m2"); - - assert_eq!(res.unwrap_err().to_string(), "Multiple measurement names not supported. Old measurement \'m1\', new measurement \'m2\'"); - } - - #[test] - fn test_lp_changed_field_type() { - let s = InfluxSchemaBuilder::new() - .saw_measurement("the_measurement") - .unwrap() - .saw_influx_field("the_field", Float) - // same field name seen again as a different type - .saw_influx_field("the_field", Integer) - .build() - .unwrap(); - - assert_column_eq!(s, 0, Field(Float), "the_field"); - assert_column_eq!(s, 1, Timestamp, "time"); - - assert_eq!(s.len(), 2); - } - #[test] fn test_merge_schema_empty() { let merged_schema_error = SchemaMerger::new().build().unwrap_err(); diff --git a/src/commands/convert.rs b/src/commands/convert.rs deleted file mode 100644 index 4fc3a04e0f..0000000000 --- a/src/commands/convert.rs +++ /dev/null @@ -1,319 +0,0 @@ -use influxdb_line_protocol::parse_lines; -use ingest::{ - parquet::writer::{CompressionLevel, Error as ParquetWriterError, IOxParquetTableWriter}, - ConversionSettings, Error as IngestError, LineProtocolConverter, TsmFileConverter, -}; -use internal_types::schema::Schema; -use observability_deps::tracing::{debug, info, warn}; -use packers::{Error as TableError, IOxTableWriter, IOxTableWriterSource}; -use snafu::{OptionExt, ResultExt, Snafu}; -use std::{ - convert::TryInto, - fs, - fs::File, - io::{BufReader, Read}, - path::{Path, PathBuf}, -}; - -use crate::commands::input::{FileType, InputReader}; - -#[derive(Debug, Snafu)] -pub enum Error { - #[snafu(display("Error reading {} ({})", name.display(), source))] - UnableToReadInput { - name: PathBuf, - source: std::io::Error, - }, - - #[snafu(display( - "Cannot write multiple measurements to a single file. Saw new measurement named {}", - new_measurement_name - ))] - MultipleMeasurementsToSingleFile { new_measurement_name: String }, - - #[snafu(display("Internal error: measurement name not specified in schema",))] - InternalMeasurementNotSpecified {}, - - #[snafu(display("Error creating a parquet table writer {}", source))] - UnableToCreateParquetTableWriter { source: ParquetWriterError }, - - #[snafu(display("Conversion from Parquet format is not implemented"))] - ParquetNotImplemented, - - #[snafu(display("Error writing remaining lines {}", source))] - UnableToWriteGoodLines { source: IngestError }, - - #[snafu(display("Error opening input {}", source))] - OpenInput { source: super::input::Error }, - - #[snafu(display("Error while closing the table writer {}", source))] - UnableToCloseTableWriter { source: IngestError }, -} - -pub type Result = std::result::Result; - -impl From for TableError { - fn from(source: Error) -> Self { - Self::from_other(source) - } -} - -/// Creates `IOxParquetTableWriter` suitable for writing to a single file -#[derive(Debug)] -struct ParquetFileWriterSource { - output_filename: String, - compression_level: CompressionLevel, - // This creator only supports a single filename at this time - // so track if it has alread been made, for errors - made_file: bool, -} - -impl IOxTableWriterSource for ParquetFileWriterSource { - // Returns a `IOxTableWriter suitable for writing data from packers. - fn next_writer(&mut self, schema: &Schema) -> Result, TableError> { - let measurement = schema - .measurement() - .cloned() - .context(InternalMeasurementNotSpecified)?; - - if self.made_file { - return MultipleMeasurementsToSingleFile { - new_measurement_name: measurement, - } - .fail()?; - } - - let output_file = fs::File::create(&self.output_filename).map_err(|e| { - TableError::from_io( - e, - format!("Error creating output file {}", self.output_filename), - ) - })?; - info!( - "Writing output for measurement {} to {} ...", - measurement, self.output_filename - ); - - let writer = IOxParquetTableWriter::new(schema, self.compression_level, output_file) - .context(UnableToCreateParquetTableWriter)?; - self.made_file = true; - Ok(Box::new(writer)) - } -} - -/// Creates `IOxParquetTableWriter` for each measurement by -/// writing each to a separate file (measurement1.parquet, -/// measurement2.parquet, etc) -#[derive(Debug)] -struct ParquetDirectoryWriterSource { - compression_level: CompressionLevel, - output_dir_path: PathBuf, -} - -impl IOxTableWriterSource for ParquetDirectoryWriterSource { - /// Returns a `IOxTableWriter` suitable for writing data from packers. - /// named in the template of - fn next_writer(&mut self, schema: &Schema) -> Result, TableError> { - let mut output_file_path: PathBuf = self.output_dir_path.clone(); - - let measurement = schema - .measurement() - .context(InternalMeasurementNotSpecified)?; - output_file_path.push(measurement); - output_file_path.set_extension("parquet"); - - let output_file = fs::File::create(&output_file_path).map_err(|e| { - TableError::from_io( - e, - format!("Error creating output file {:?}", output_file_path), - ) - })?; - info!( - "Writing output for measurement {} to {:?} ...", - measurement, output_file_path - ); - - let writer = IOxParquetTableWriter::new(schema, self.compression_level, output_file) - .context(UnableToCreateParquetTableWriter) - .map_err(TableError::from_other)?; - Ok(Box::new(writer)) - } -} - -pub fn is_directory(p: impl AsRef) -> bool { - fs::metadata(p) - .map(|metadata| metadata.is_dir()) - .unwrap_or(false) -} - -pub fn convert( - input_path: &str, - output_path: &str, - compression_level: CompressionLevel, -) -> Result<()> { - info!("convert starting"); - debug!("Reading from input path {}", input_path); - - if is_directory(input_path) { - let mut files: Vec<_> = fs::read_dir(input_path) - .unwrap() - .filter_map(Result::ok) - .filter(|filename| filename.path().extension().map_or(false, |x| x == "tsm")) - .collect(); - - if files.is_empty() { - warn!("No TSM files found"); - return Ok(()); - } - - // Sort files by their TSM generation to ensure any duplicate block - // data is appropriately de-duplicated. - files.sort_by_key(|a| a.file_name()); - - let mut index_readers = Vec::with_capacity(files.len()); - let mut block_readers = Vec::with_capacity(files.len()); - for file in &files { - let index_handle = File::open(file.path()).unwrap(); - let index_size = index_handle.metadata().unwrap().len(); - let block_handle = File::open(file.path()).unwrap(); - - index_readers.push((BufReader::new(index_handle), index_size as usize)); - block_readers.push(BufReader::new(block_handle)); - } - - // setup writing - let writer_source: Box = if is_directory(&output_path) { - info!("Writing to output directory {:?}", output_path); - Box::new(ParquetDirectoryWriterSource { - compression_level, - output_dir_path: PathBuf::from(output_path), - }) - } else { - info!("Writing to output file {}", output_path); - Box::new(ParquetFileWriterSource { - compression_level, - output_filename: String::from(output_path), - made_file: false, - }) - }; - - let mut converter = TsmFileConverter::new(writer_source); - return converter - .convert(index_readers, block_readers) - .context(UnableToCloseTableWriter); - } - - let input_reader = InputReader::new(input_path).context(OpenInput)?; - info!( - "Preparing to convert {} bytes from {}", - input_reader.len(), - input_path - ); - - match input_reader.file_type() { - FileType::LineProtocol => convert_line_protocol_to_parquet( - input_path, - input_reader, - compression_level, - output_path, - ), - FileType::Tsm => { - // TODO(edd): we can remove this when I figure out the best way to share - // the reader between the TSM index reader and the Block decoder. - let input_block_reader = InputReader::new(input_path).context(OpenInput)?; - let len = input_reader.len() as usize; - convert_tsm_to_parquet( - input_reader, - len, - compression_level, - input_block_reader, - output_path, - ) - } - FileType::Parquet => ParquetNotImplemented.fail(), - } -} - -fn convert_line_protocol_to_parquet( - input_filename: &str, - mut input_reader: InputReader, - compression_level: CompressionLevel, - output_name: &str, -) -> Result<()> { - // TODO: make a streaming parser that you can stream data through in blocks. - // for now, just read the whole input at once into a string - let mut buf = String::with_capacity( - input_reader - .len() - .try_into() - .expect("Cannot allocate buffer"), - ); - input_reader - .read_to_string(&mut buf) - .context(UnableToReadInput { - name: input_filename, - })?; - - // FIXME: Design something sensible to do with lines that don't - // parse rather than just dropping them on the floor - let only_good_lines = parse_lines(&buf).filter_map(|r| match r { - Ok(line) => Some(line), - Err(e) => { - warn!("Ignorning line with parse error: {}", e); - None - } - }); - - let writer_source: Box = if is_directory(&output_name) { - info!("Writing to output directory {:?}", output_name); - Box::new(ParquetDirectoryWriterSource { - compression_level, - output_dir_path: PathBuf::from(output_name), - }) - } else { - info!("Writing to output file {}", output_name); - Box::new(ParquetFileWriterSource { - output_filename: String::from(output_name), - compression_level, - made_file: false, - }) - }; - - let settings = ConversionSettings::default(); - let mut converter = LineProtocolConverter::new(settings, writer_source); - converter - .convert(only_good_lines) - .context(UnableToWriteGoodLines)?; - converter.finalize().context(UnableToCloseTableWriter)?; - info!("Completing writing to {} successfully", output_name); - Ok(()) -} - -fn convert_tsm_to_parquet( - index_stream: InputReader, - index_stream_size: usize, - compression_level: CompressionLevel, - block_stream: InputReader, - output_name: &str, -) -> Result<()> { - // setup writing - let writer_source: Box = if is_directory(&output_name) { - info!("Writing to output directory {:?}", output_name); - Box::new(ParquetDirectoryWriterSource { - compression_level, - output_dir_path: PathBuf::from(output_name), - }) - } else { - info!("Writing to output file {}", output_name); - Box::new(ParquetFileWriterSource { - compression_level, - output_filename: String::from(output_name), - made_file: false, - }) - }; - - let mut converter = TsmFileConverter::new(writer_source); - converter - .convert(vec![(index_stream, index_stream_size)], vec![block_stream]) - .context(UnableToCloseTableWriter) -} diff --git a/src/commands/input.rs b/src/commands/input.rs deleted file mode 100644 index a075032108..0000000000 --- a/src/commands/input.rs +++ /dev/null @@ -1,356 +0,0 @@ -use std::{ - borrow::Cow, - collections::VecDeque, - fs, - fs::File, - io, - io::{BufReader, Read, Seek, SeekFrom}, - path::{Path, PathBuf}, -}; - -use snafu::{ResultExt, Snafu}; - -/// Module to handle input files (and maybe urls?) -use packers::Name; -use parquet::{ - self, - file::{ - reader::ChunkReader, - serialized_reader::{FileSource, SliceableCursor}, - writer::TryClone, - }, -}; - -#[derive(Debug, Snafu)] -pub enum Error { - #[snafu(display("Error opening {} ({})", input_name.display(), source))] - UnableToOpenInput { - input_name: PathBuf, - source: io::Error, - }, - - #[snafu(display("Error reading directory {} ({})", input_name.display(), source))] - UnableToReadDirectory { - input_name: PathBuf, - source: io::Error, - }, - - #[snafu(display("Error calculating the size of {} ({})", input_name.display(), source))] - UnableToCalculateSize { - input_name: PathBuf, - source: io::Error, - }, - - #[snafu(display("Unknown input type: {} has an unknown input extension before .gz", input_name.display()))] - UnknownInputTypeGzip { input_name: PathBuf }, - - #[snafu(display("Unknown input type: {} has an unknown input extension", input_name.display()))] - UnknownInputType { input_name: PathBuf }, - - #[snafu(display("Can't read GZip data: {}", input_name.display()))] - ReadingGzip { - input_name: PathBuf, - source: std::io::Error, - }, -} - -pub type Result = std::result::Result; - -#[derive(Debug, Clone, Copy)] -pub enum FileType { - LineProtocol, - Tsm, - Parquet, -} - -/// Represents an input path and can produce InputReaders for each -/// file seen while recursively traversing the path -pub struct InputPath { - // All files left to traverse. Elements of files were actual files - // when this InputPath was constructed. - files: Vec, -} - -/// Interface for interacting with streams -#[derive(Debug)] -pub enum InputReader { - FileInputType(FileInputReader), - MemoryInputType(MemoryInputReader), -} - -/// A (file backed) reader to read raw uncompressed bytes -#[derive(Debug)] -pub struct FileInputReader { - file_type: FileType, - file_size: u64, - path: PathBuf, - reader: BufReader, -} - -/// An in-memory reader -#[derive(Debug)] -pub struct MemoryInputReader { - file_type: FileType, - file_size: u64, - path: PathBuf, - cursor: SliceableCursor, -} - -impl FileInputReader { - fn new(file_type: FileType, input_name: &str) -> Result { - let path = PathBuf::from(input_name); - let file = File::open(&path).context(UnableToOpenInput { input_name })?; - - let file_size = file - .metadata() - .context(UnableToCalculateSize { input_name })? - .len(); - - Ok(Self { - file_type, - file_size, - path, - reader: BufReader::new(file), - }) - } -} - -impl MemoryInputReader { - fn new(file_type: FileType, path: PathBuf, buffer: Vec) -> Self { - let len = buffer.len(); - Self { - file_type, - file_size: len as u64, - path, - cursor: SliceableCursor::new(buffer), - } - } -} - -impl Seek for InputReader { - fn seek(&mut self, pos: SeekFrom) -> io::Result { - match self { - Self::FileInputType(file_input_reader) => file_input_reader.reader.seek(pos), - Self::MemoryInputType(memory_input_reader) => memory_input_reader.cursor.seek(pos), - } - } -} - -impl Read for InputReader { - fn read(&mut self, buf: &mut [u8]) -> io::Result { - match self { - Self::FileInputType(file_input_reader) => file_input_reader.reader.read(buf), - Self::MemoryInputType(memory_input_reader) => memory_input_reader.cursor.read(buf), - } - } -} - -impl parquet::file::reader::Length for InputReader { - fn len(&self) -> u64 { - match self { - Self::FileInputType(file_input_reader) => file_input_reader.file_size, - Self::MemoryInputType(memory_input_reader) => memory_input_reader.file_size, - } - } -} - -impl ChunkReader for InputReader { - type T = InputSlice; - fn get_read(&self, start: u64, length: usize) -> parquet::errors::Result { - match self { - Self::FileInputType(file_input_reader) => Ok(InputSlice::FileSlice(FileSource::new( - file_input_reader.reader.get_ref(), - start, - length, - ))), - Self::MemoryInputType(memory_input_reader) => Ok(InputSlice::Memory( - memory_input_reader.cursor.get_read(start, length)?, - )), - } - } -} - -impl TryClone for InputReader { - fn try_clone(&self) -> std::result::Result { - Err(io::Error::new( - io::ErrorKind::Other, - "TryClone for input reader not supported", - )) - } -} - -impl Name for InputReader { - /// Returns a user understandable identifier of this thing - fn name(&self) -> Cow<'_, str> { - self.path().to_string_lossy() - } -} - -impl InputReader { - pub fn file_type(&self) -> &FileType { - match self { - Self::FileInputType(file_input_reader) => &file_input_reader.file_type, - Self::MemoryInputType(memory_input_reader) => &memory_input_reader.file_type, - } - } - - pub fn len(&self) -> u64 { - match self { - Self::FileInputType(file_input_reader) => file_input_reader.file_size, - Self::MemoryInputType(memory_input_reader) => memory_input_reader.file_size, - } - } - - pub fn path(&self) -> &Path { - match self { - Self::FileInputType(file_input_reader) => &file_input_reader.path, - Self::MemoryInputType(memory_input_reader) => &memory_input_reader.path, - } - } - - // Create a new input reader suitable for reading from - // `input_name` and figures out the file input type based on - // heuristics (ahem, the filename extension) - pub fn new(input_name: &str) -> Result { - let path = Path::new(input_name); - - // Initially simply use the file name's extension to determine - // the type; Maybe in the future we can be more clever and - // inspect contents. - let ext = path.extension().and_then(|p| p.to_str()); - - match ext { - Some("tsm") => Ok(Self::FileInputType(FileInputReader::new( - FileType::Tsm, - input_name, - )?)), - Some("lp") => Ok(Self::FileInputType(FileInputReader::new( - FileType::LineProtocol, - input_name, - )?)), - Some("parquet") => Ok(Self::FileInputType(FileInputReader::new( - FileType::Parquet, - input_name, - )?)), - Some("gz") => { - let buffer = || { - let file = File::open(input_name).context(UnableToOpenInput { input_name })?; - let mut decoder = flate2::read::GzDecoder::new(file); - let mut buffer = Vec::new(); - decoder - .read_to_end(&mut buffer) - .context(ReadingGzip { input_name })?; - Ok(buffer) - }; - - let path = PathBuf::from(input_name); - let stem = Path::new(path.file_stem().unwrap()); - let stem_ext = stem.extension().and_then(|p| p.to_str()); - - match stem_ext { - Some("tsm") => Ok(Self::MemoryInputType(MemoryInputReader::new( - FileType::Tsm, - path, - buffer()?, - ))), - Some("lp") => Ok(Self::MemoryInputType(MemoryInputReader::new( - FileType::LineProtocol, - path, - buffer()?, - ))), - Some("parquet") => Ok(Self::MemoryInputType(MemoryInputReader::new( - FileType::Parquet, - path, - buffer()?, - ))), - _ => UnknownInputTypeGzip { input_name }.fail(), - } - } - _ => UnknownInputType { input_name }.fail(), - } - } -} - -impl InputPath { - // Create a new InputPath with a snapshot of all the files in the - // directory tree rooted at root_path that pass predicate P - pub fn new

(root_path: impl Into, mut pred: P) -> Result - where - P: FnMut(&Path) -> bool, - { - struct PathAndType { - path: PathBuf, - file_type: fs::FileType, - } - - let mut paths = VecDeque::new(); - let root_path = root_path.into(); - let root_meta = fs::metadata(&root_path).context(UnableToOpenInput { - input_name: root_path.clone(), - })?; - - paths.push_back(PathAndType { - path: root_path, - file_type: root_meta.file_type(), - }); - - let mut files = Vec::with_capacity(100); - while let Some(PathAndType { path, file_type }) = paths.pop_front() { - if file_type.is_dir() { - let dir = fs::read_dir(&path).context(UnableToReadDirectory { - input_name: path.clone(), - })?; - - for entry in dir { - let entry = entry.context(UnableToReadDirectory { - input_name: path.clone(), - })?; - - let file_type = entry.file_type().context(UnableToOpenInput { - input_name: entry.path(), - })?; - - paths.push_back(PathAndType { - path: entry.path(), - file_type, - }); - } - } else if file_type.is_file() { - if pred(&path) { - files.push(path) - } - } else { - unimplemented!( - "Unknown file type {:?} while recursing {:?}", - file_type, - path - ); - } - } - - // sort filenames in order to ensure repeatability between runs - files.sort(); - Ok(Self { files }) - } - - pub fn input_readers(&self) -> impl Iterator> + '_ { - self.files - .iter() - .rev() - .map(|p| InputReader::new(&p.to_string_lossy())) - } -} - -pub enum InputSlice { - FileSlice(FileSource), - Memory(SliceableCursor), -} - -impl Read for InputSlice { - fn read(&mut self, buf: &mut [u8]) -> io::Result { - match self { - Self::FileSlice(src) => src.read(buf), - Self::Memory(src) => src.read(buf), - } - } -} diff --git a/src/commands/meta.rs b/src/commands/meta.rs deleted file mode 100644 index 6e3b6556e3..0000000000 --- a/src/commands/meta.rs +++ /dev/null @@ -1,165 +0,0 @@ -use influxdb_tsm::{reader::IndexEntry, reader::TsmIndexReader, InfluxId, TsmError}; -use ingest::parquet::metadata::print_parquet_metadata; -use observability_deps::tracing::{debug, info}; -use snafu::{ResultExt, Snafu}; -use std::{ - collections::{BTreeMap, BTreeSet}, - convert::TryInto, -}; - -use crate::commands::input::{FileType, InputReader}; - -pub type Result = std::result::Result; - -pub fn dump_meta(input_filename: &str) -> Result<()> { - info!("meta starting"); - debug!("Reading from input file {}", input_filename); - - let input_reader = InputReader::new(input_filename).context(OpenInput)?; - - match input_reader.file_type() { - FileType::LineProtocol => LineProtocolNotImplemented.fail(), - FileType::Tsm => { - let len = input_reader - .len() - .try_into() - .expect("File size more than usize"); - let reader = TsmIndexReader::try_new(input_reader, len).context(CreateTsm)?; - - let mut stats_builder = TsmMetadataBuilder::new(); - - for entry in reader { - let entry = entry.context(UnableToReadTsmEntry)?; - stats_builder.process_entry(entry)?; - } - stats_builder.print_report(); - Ok(()) - } - FileType::Parquet => { - print_parquet_metadata(input_reader).context(UnableDumpToParquetMetadata) - } - } -} - -#[derive(Debug, Default)] -struct MeasurementMetadata { - /// tag name --> list of seen tag values - tags: BTreeMap>, - /// List of field names seen - fields: BTreeSet, -} - -impl MeasurementMetadata { - fn update_for_entry(&mut self, index_entry: &mut IndexEntry) -> Result<()> { - let key = index_entry.parse_key().context(UnableToParseTsmKey)?; - - for (tag_name, tag_value) in key.tagset { - let tag_entry = self.tags.entry(tag_name).or_default(); - tag_entry.insert(tag_value); - } - self.fields.insert(key.field_key); - Ok(()) - } - - fn print_report(&self, prefix: &str) { - for (tag_name, tag_values) in &self.tags { - println!("{} tag {} = {:?}", prefix, tag_name, tag_values); - } - for field_name in &self.fields { - println!("{} field {}", prefix, field_name); - } - } -} - -/// Represents stats for a single bucket -#[derive(Debug, Default)] -struct BucketMetadata { - /// How many index entries have been seen - count: u64, - - // Total 'records' (aka sum of the lengths of all timeseries) - total_records: u64, - - // measurement -> - measurements: BTreeMap, -} - -impl BucketMetadata { - fn update_for_entry(&mut self, index_entry: &mut IndexEntry) -> Result<()> { - self.count += 1; - self.total_records += u64::from(index_entry.count); - let key = index_entry.parse_key().context(UnableToParseTsmKey)?; - - let meta = self.measurements.entry(key.measurement).or_default(); - meta.update_for_entry(index_entry)?; - Ok(()) - } - - fn print_report(&self, prefix: &str) { - for (measurement, meta) in &self.measurements { - println!("{}{}", prefix, measurement); - let indent = format!("{} ", prefix); - meta.print_report(&indent); - } - } -} - -#[derive(Debug, Default)] -struct TsmMetadataBuilder { - num_entries: u32, - - // (org_id, bucket_id) --> Bucket Metadata - bucket_stats: BTreeMap<(InfluxId, InfluxId), BucketMetadata>, -} - -impl TsmMetadataBuilder { - fn new() -> Self { - Self::default() - } - - fn process_entry(&mut self, mut index_entry: IndexEntry) -> Result<()> { - self.num_entries += 1; - let key = (index_entry.org_id(), index_entry.bucket_id()); - let stats = self.bucket_stats.entry(key).or_default(); - stats.update_for_entry(&mut index_entry)?; - Ok(()) - } - - fn print_report(&self) { - println!("TSM Metadata Report:"); - println!(" Valid Index Entries: {}", self.num_entries); - println!(" Organizations/Bucket Stats:"); - for (k, stats) in &self.bucket_stats { - let (org_id, bucket_id) = k; - println!( - " ({}, {}) {} index entries, {} total records", - org_id, bucket_id, stats.count, stats.total_records - ); - println!(" Measurements:"); - stats.print_report(" "); - } - } -} - -#[derive(Debug, Snafu)] -pub enum Error { - #[snafu(display("Error opening input {}", source))] - OpenInput { source: super::input::Error }, - - #[snafu(display("Line protocol metadata dump is not implemented"))] - LineProtocolNotImplemented, - - #[snafu(display("Unable to dump parquet file metadata: {}", source))] - UnableDumpToParquetMetadata { - source: ingest::parquet::error::IOxParquetError, - }, - - #[snafu(display(r#"Unable to create TSM reader: {}"#, source))] - CreateTsm { source: TsmError }, - - #[snafu(display(r#"Unable to parse TSM key: {}"#, source))] - UnableToParseTsmKey { source: TsmError }, - - #[snafu(display(r#"Unable to read TSM entry: {}"#, source))] - UnableToReadTsmEntry { source: TsmError }, -} diff --git a/src/commands/stats.rs b/src/commands/stats.rs deleted file mode 100644 index f90f15a50f..0000000000 --- a/src/commands/stats.rs +++ /dev/null @@ -1,116 +0,0 @@ -//! This module contains code to report compression statistics for storage files - -use observability_deps::tracing::info; -use snafu::{ResultExt, Snafu}; -use structopt::StructOpt; - -use ingest::parquet::{error::IOxParquetError, stats as parquet_stats}; -use packers::{ - stats::{FileSetStatsBuilder, FileStats}, - Name, -}; - -use crate::commands::input::{FileType, InputPath, InputReader}; - -#[derive(Debug, Snafu)] -pub enum Error { - #[snafu(display("Not implemented: {}", operation_name))] - NotImplemented { operation_name: String }, - - #[snafu(display("Error opening input {}", source))] - OpenInput { source: super::input::Error }, - - #[snafu(display("Unable to dump parquet file metadata: {}", source))] - UnableDumpToParquetMetadata { source: IOxParquetError }, -} - -pub type Result = std::result::Result; - -/// Print out storage statistics information to stdout. -/// -/// If a directory is specified, checks all files recursively -#[derive(Debug, StructOpt)] -pub struct Config { - /// The input filename or directory to read from - input: String, - - /// Include detailed information per column - #[structopt(long)] - per_column: bool, - - /// Include detailed information per file - #[structopt(long)] - per_file: bool, -} - -/// Print statistics about all the files rooted at input_path -pub async fn stats(config: &Config) -> Result<()> { - info!("stats starting for {:?}", config); - let input_path = InputPath::new(&config.input, |p| { - p.extension() == Some(std::ffi::OsStr::new("parquet")) - }) - .context(OpenInput)?; - - println!("Storage statistics:"); - - let mut builder = FileSetStatsBuilder::default(); - - for input_reader in input_path.input_readers() { - let input_reader = input_reader.context(OpenInput)?; - - let file_stats = stats_for_file(config, input_reader).await?; - - if config.per_file { - println!("{}", file_stats); - } - builder = builder.accumulate(&file_stats); - } - - let overall_stats = builder.build(); - if config.per_column { - println!("-------------------------------"); - println!("Overall Per Column Stats"); - println!("-------------------------------"); - for c in &overall_stats.col_stats { - println!("{}", c); - println!(); - } - } - println!("{}", overall_stats); - - Ok(()) -} - -/// Print statistics about the file name in input_filename to stdout -pub async fn stats_for_file(config: &Config, input_reader: InputReader) -> Result { - let input_name = String::from(input_reader.name()); - let file_stats = match input_reader.file_type() { - FileType::LineProtocol => { - return NotImplemented { - operation_name: "Line protocol storage statistics", - } - .fail() - } - FileType::Tsm => { - return NotImplemented { - operation_name: "TSM storage statistics", - } - .fail() - } - FileType::Parquet => { - parquet_stats::file_stats(input_reader).context(UnableDumpToParquetMetadata)? - } - }; - - if config.per_column && config.per_file { - println!("-------------------------------"); - println!("Column Stats for {}", input_name); - println!("-------------------------------"); - for c in &file_stats.col_stats { - println!("{}", c); - println!(); - } - } - - Ok(file_stats) -} diff --git a/src/main.rs b/src/main.rs index 12a7c4546c..9b7c08d026 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,31 +7,24 @@ clippy::clone_on_ref_ptr )] -use std::str::FromStr; - use dotenv::dotenv; use structopt::StructOpt; use tokio::runtime::Runtime; use commands::tracing::{init_logs_and_tracing, init_simple_logs}; -use ingest::parquet::writer::CompressionLevel; -use observability_deps::tracing::{debug, warn}; +use observability_deps::tracing::warn; use crate::commands::tracing::TracingGuard; use observability_deps::tracing::dispatcher::SetGlobalDefaultError; use tikv_jemallocator::Jemalloc; mod commands { - pub mod convert; pub mod database; - mod input; - pub mod meta; pub mod operations; pub mod run; pub mod server; pub mod server_remote; pub mod sql; - pub mod stats; pub mod tracing; } @@ -125,30 +118,9 @@ struct Config { #[derive(Debug, StructOpt)] enum Command { - /// Convert one storage format to another - Convert { - /// The input files to read from - input: String, - /// The filename or directory to write the output - output: String, - /// How much to compress the output data. 'max' compresses the most; - /// 'compatibility' compresses in a manner more likely to be readable by - /// other tools. - #[structopt( - short, long, default_value = "compatibility", - possible_values = & ["max", "compatibility"])] - compression_level: String, - }, - - /// Print out metadata information about a storage file - Meta { - /// The input filename to read from - input: String, - }, Database(commands::database::Config), // Clippy recommended boxing this variant because it's much larger than the others Run(Box), - Stats(commands::stats::Config), Server(commands::server::Config), Operation(commands::operations::Config), Sql(commands::sql::Config), @@ -176,41 +148,6 @@ fn main() -> Result<(), std::io::Error> { } match config.command { - Command::Convert { - input, - output, - compression_level, - } => { - let _tracing_guard = handle_init_logs(init_simple_logs(log_verbose_count)); - let compression_level = CompressionLevel::from_str(&compression_level).unwrap(); - match commands::convert::convert(&input, &output, compression_level) { - Ok(()) => debug!("Conversion completed successfully"), - Err(e) => { - eprintln!("Conversion failed: {}", e); - std::process::exit(ReturnCode::Failure as _) - } - } - } - Command::Meta { input } => { - let _tracing_guard = handle_init_logs(init_simple_logs(log_verbose_count)); - match commands::meta::dump_meta(&input) { - Ok(()) => debug!("Metadata dump completed successfully"), - Err(e) => { - eprintln!("Metadata dump failed: {}", e); - std::process::exit(ReturnCode::Failure as _) - } - } - } - Command::Stats(config) => { - let _tracing_guard = handle_init_logs(init_simple_logs(log_verbose_count)); - match commands::stats::stats(&config).await { - Ok(()) => debug!("Storage statistics dump completed successfully"), - Err(e) => { - eprintln!("Stats dump failed: {}", e); - std::process::exit(ReturnCode::Failure as _) - } - } - } Command::Database(config) => { let _tracing_guard = handle_init_logs(init_simple_logs(log_verbose_count)); if let Err(e) = commands::database::command(host, config).await { diff --git a/tests/commands.rs b/tests/commands.rs deleted file mode 100644 index acdab6311e..0000000000 --- a/tests/commands.rs +++ /dev/null @@ -1,396 +0,0 @@ -use std::fs; -use std::fs::File; -use std::io::{BufReader, Read}; -use std::path::Path; - -use assert_cmd::assert::Assert; -use assert_cmd::Command; -use predicates::prelude::*; - -/// Validates that p is a valid parquet file -fn validate_parquet_file(p: &Path) { - // Verify file extension is parquet - let file_extension = p - .extension() - .map_or(String::from(""), |ext| ext.to_string_lossy().to_string()); - assert_eq!(file_extension, "parquet"); - - // TODO: verify we can decode the contents of the parquet file and - // it is as expected. For now, just use a check against the magic - // `PAR1` bytes all parquet files start with. - let mut reader = BufReader::new(File::open(p).expect("Error reading file")); - - let mut first_four = [0; 4]; - reader - .read_exact(&mut first_four) - .expect("Error reading first four bytes"); - assert_eq!(&first_four, b"PAR1"); -} - -#[test] -fn convert_bad_input_filename() { - let mut cmd = Command::cargo_bin("influxdb_iox").unwrap(); - let assert = cmd - .arg("-v") - .arg("convert") - .arg("non_existent_input.lp") - .arg("non_existent_output") - .assert(); - - assert - .failure() - .code(1) - .stderr(predicate::str::contains( - "Error opening non_existent_input.lp", - )) - .stderr(predicate::str::contains("No such file or directory")); -} - -#[test] -fn convert_bad_compression_level() { - let mut cmd = Command::cargo_bin("influxdb_iox").unwrap(); - let assert = cmd - .arg("-v") - .arg("convert") - .arg("--compression-level") - .arg("maxxx") - .arg("tests/fixtures/lineproto/temperature.lp") - .arg("/tmp") - .assert(); - - assert.failure().code(1).stderr(predicate::str::contains( - "error: 'maxxx' isn't a valid value for '--compression-level ", - )); -} - -#[test] -fn convert_line_protocol_good_input_filename() { - let mut cmd = Command::cargo_bin("influxdb_iox").unwrap(); - - let parquet_path = test_helpers::tempfile::Builder::new() - .prefix("convert_e2e") - .suffix(".parquet") - .tempfile() - .expect("error creating temp file") - .into_temp_path(); - let parquet_filename_string = parquet_path.to_string_lossy().to_string(); - - let assert = cmd - .arg("-v") - .arg("convert") - .arg("--compression-level") - .arg("compatibility") - .arg("tests/fixtures/lineproto/temperature.lp") - .arg(&parquet_filename_string) - .assert(); - - let expected_success_string = format!( - "Completing writing to {} successfully", - parquet_filename_string - ); - - assert - .success() - .stdout(predicate::str::contains("convert starting")) - .stdout(predicate::str::contains( - "Writing output for measurement h2o_temperature", - )) - .stdout(predicate::str::contains(expected_success_string)); - - validate_parquet_file(&parquet_path); -} - -#[test] -fn convert_tsm_good_input_filename() { - // - // TODO: this needs to work for a temp directory... - // - - // let mut cmd = Command::cargo_bin("influxdb_iox").unwrap(); - - // let tmp_dir = test_helpers::tmp_dir(); - // let parquet_path = tmp_dir.unwrap().into_path().to_str().unwrap(); - - // // ::Builder::new() - // // .prefix("dstool_e2e_tsm") - // // .suffix(".parquet") - // // .tempfile() - // // .expect("error creating temp file") - // // .into_temp_path(); - // // let parquet_filename_string = - // parquet_path.to_string_lossy().to_string(); - - // let assert = cmd - // .arg("convert") - // .arg("tests/fixtures/cpu_usage.tsm") - // .arg(&parquet_path) - // .assert(); - - // // TODO this should succeed when TSM -> parquet conversion is - // implemented. // assert - // // .failure() - // // .code(1) - // // .stderr(predicate::str::contains("Conversion failed")) - // // .stderr(predicate::str::contains( - // // "Not implemented: TSM Conversion not supported yet", - // // )); - - // // TODO add better success expectations - - // // let expected_success_string = format!( - // // "Completing writing to {} successfully", - // // parquet_filename_string - // // ); - - // // assert - // // .success() - // // .stderr(predicate::str::contains("dstool convert starting")) - // // .stderr(predicate::str::contains( - // // "Writing output for measurement h2o_temperature", - // // )) - // // .stderr(predicate::str::contains(expected_success_string)); - - // // validate_parquet_file(&parquet_path); -} - -#[test] -fn convert_multiple_measurements() { - let mut cmd = Command::cargo_bin("influxdb_iox").unwrap(); - - // Create a directory - let parquet_output_path = test_helpers::tempfile::Builder::new() - .prefix("convert_multiple_e2e") - .tempdir() - .expect("error creating temp directory"); - - let parquet_output_dir_path = parquet_output_path.path().to_string_lossy().to_string(); - - let assert = cmd - .arg("-v") - .arg("convert") - .arg("tests/fixtures/lineproto/air_and_water.lp") - .arg(&parquet_output_dir_path) - .assert(); - - let expected_success_string = format!( - "Completing writing to {} successfully", - parquet_output_dir_path - ); - - assert - .success() - .stdout(predicate::str::contains("convert starting")) - .stdout(predicate::str::contains("Writing to output directory")) - .stdout(predicate::str::contains( - "Writing output for measurement h2o_temperature", - )) - .stdout(predicate::str::contains( - "Writing output for measurement air_temperature", - )) - .stdout(predicate::str::contains(expected_success_string)); - - // check that the two files have been written successfully - let mut output_files: Vec<_> = fs::read_dir(parquet_output_path.path()) - .expect("reading directory") - .map(|dir_ent| { - let dir_ent = dir_ent.expect("error reading dir entry"); - validate_parquet_file(&dir_ent.path()); - dir_ent.file_name().to_string_lossy().to_string() - }) - .collect(); - - // Ensure the order is consistent before comparing them - output_files.sort(); - assert_eq!( - output_files, - vec!["air_temperature.parquet", "h2o_temperature.parquet"] - ); -} - -#[test] -fn meta_bad_input_filename() { - let mut cmd = Command::cargo_bin("influxdb_iox").unwrap(); - let assert = cmd.arg("meta").arg("non_existent_input").assert(); - - assert - .failure() - .code(1) - .stderr(predicate::str::contains("Metadata dump failed")) - .stderr(predicate::str::contains( - "Metadata dump failed: Error opening input Unknown input type: non_existent_input has an unknown input extension", - )); -} - -#[test] -fn meta_non_existent_input_filename() { - let mut cmd = Command::cargo_bin("influxdb_iox").unwrap(); - let assert = cmd.arg("meta").arg("non_existent_input.tsm").assert(); - - assert - .failure() - .code(1) - .stderr(predicate::str::contains("Metadata dump failed")) - .stderr(predicate::str::contains( - "Metadata dump failed: Error opening input Error opening non_existent_input.tsm", - )); -} - -#[test] -fn meta_bad_input_filename_gz() { - let mut cmd = Command::cargo_bin("influxdb_iox").unwrap(); - let assert = cmd.arg("meta").arg("non_existent_input.gz").assert(); - - assert - .failure() - .code(1) - .stderr(predicate::str::contains("Metadata dump failed")) - .stderr(predicate::str::contains( - "Metadata dump failed: Error opening input Unknown input type: non_existent_input.gz has an unknown input extension before .gz", - )); -} - -// gunzip's the contents of the file at input_path into a temporary path -fn uncompress_gz(input_path: &str, output_extension: &str) -> test_helpers::tempfile::TempPath { - let gz_file = File::open(input_path).expect("Error opening input"); - - let output_path = test_helpers::tempfile::Builder::new() - .prefix("decompressed_e2e") - .suffix(output_extension) - .tempfile() - .expect("error creating temp file") - .into_temp_path(); - - let mut output_file = File::create(&output_path).expect("error opening output"); - let mut decoder = flate2::read::GzDecoder::new(gz_file); - std::io::copy(&mut decoder, &mut output_file).expect("error copying stream"); - output_path -} - -/// Validates some of the metadata output content for this tsm file -fn assert_meta_000000000000005_000000002_tsm(assert: Assert) { - assert - .success() - .stdout(predicate::str::contains("TSM Metadata Report")) - .stdout(predicate::str::contains( - "(05c19117091a1000, 05c19117091a1001) 2159 index entries, 2159 total records", - )) - .stdout(predicate::str::contains( - "task_scheduler_total_schedule_fails", - )); -} - -#[test] -fn meta_000000000000005_000000002_tsm() { - let input_tsm = uncompress_gz("tests/fixtures/000000000000005-000000002.tsm.gz", ".tsm"); - let mut cmd = Command::cargo_bin("influxdb_iox").unwrap(); - let assert = cmd - .arg("meta") - .arg(input_tsm.to_string_lossy().to_string()) - .assert(); - assert_meta_000000000000005_000000002_tsm(assert); -} - -#[test] -fn meta_000000000000005_000000002_tsm_gz() { - let mut cmd = Command::cargo_bin("influxdb_iox").unwrap(); - let assert = cmd - .arg("meta") - .arg("tests/fixtures/000000000000005-000000002.tsm.gz") - .assert(); - - assert_meta_000000000000005_000000002_tsm(assert); -} - -/// Validates some of the metadata output content for this tsm file -fn assert_meta_cpu_usage_tsm(assert: Assert) { - assert - .success() - .stdout(predicate::str::contains("TSM Metadata Report")) - .stdout(predicate::str::contains( - "(05b4927b3fe38000, 05b4927b3fe38001) 2735 index entries, 2735 total records", - )) - .stdout(predicate::str::contains( - "task_scheduler_total_schedule_fails", - )); -} - -#[test] -fn meta_cpu_usage_tsm() { - let input_tsm = uncompress_gz("tests/fixtures/cpu_usage.tsm.gz", ".tsm"); - let mut cmd = Command::cargo_bin("influxdb_iox").unwrap(); - let assert = cmd - .arg("meta") - .arg(input_tsm.to_string_lossy().to_string()) - .assert(); - - assert_meta_cpu_usage_tsm(assert); -} - -#[test] -fn meta_cpu_usage_tsm_gz() { - let mut cmd = Command::cargo_bin("influxdb_iox").unwrap(); - let assert = cmd - .arg("meta") - .arg("tests/fixtures/cpu_usage.tsm.gz") - .assert(); - - assert_meta_cpu_usage_tsm(assert); -} - -/// Validates some of the metadata output content for this tsm file -fn assert_meta_temperature_parquet(assert: Assert) { - assert - .success() - .stdout(predicate::str::contains("Parquet file metadata:")) - .stdout(predicate::str::contains(r#"created by: "Delorean""#)) - .stdout(predicate::str::contains( - r#"Column Chunk [3]: - file_offset: 595 - column_type: DOUBLE - column_path: bottom_degrees - num_values: 6 - encodings: [PLAIN, RLE_DICTIONARY, RLE] - compression: GZIP - compressed_size: 125 - uncompressed_size: 90 - data_page_offset: 547 - has_index_page: false - has_dictionary_page: true - dictionary_page_offset: 470 - NO STATISTICS"#, - )); -} - -#[test] -fn meta_temperature_parquet() { - let mut cmd = Command::cargo_bin("influxdb_iox").unwrap(); - let assert = cmd - .arg("meta") - .arg("tests/fixtures/parquet/temperature.parquet") - .assert(); - - assert_meta_temperature_parquet(assert); -} - -#[test] -fn stats_temperature_parquet() { - let mut cmd = Command::cargo_bin("influxdb_iox").unwrap(); - let assert = cmd - .arg("stats") - .arg("--per-column") - .arg("--per-file") - .arg("tests/fixtures/parquet/temperature.parquet") - .assert(); - - assert - .success() - .stdout(predicate::str::contains("Storage statistics:")) - .stdout(predicate::str::contains( - r#"Column Stats 'state' [1] - Total rows: 6.00 (6), DataType: Utf8, Compression: {"Enc: Dictionary, Comp: GZIP"} - Compressed/Uncompressed Bytes : 90.00 / 52.00 ( 90 / 52) 120.0000 bits per row -"#)) - .stdout(predicate::str::contains( - "temperature.parquet: total columns ( 5), rows: 6.00 ( 6), size: 1.13 k ( 1128), bits per row: 1504.0000" - )); -}