Merge pull request #168 from influxdata/er/tsm-parquet

feat: Add support for converting TSM files into Parquet
pull/24376/head
Edd Robinson 2020-06-22 19:10:17 +01:00 committed by GitHub
commit 2768b15bf4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 1612 additions and 389 deletions

14
Cargo.lock generated
View File

@ -634,6 +634,7 @@ dependencies = [
"delorean_table",
"delorean_table_schema",
"delorean_test_helpers",
"delorean_tsm",
"delorean_wal",
"dirs 2.0.2",
"dotenv",
@ -683,6 +684,7 @@ dependencies = [
"delorean_table",
"delorean_table_schema",
"delorean_test_helpers",
"delorean_tsm",
"env_logger",
"log",
"parquet",
@ -743,6 +745,7 @@ dependencies = [
name = "delorean_table_schema"
version = "0.1.0"
dependencies = [
"delorean_tsm",
"log",
]
@ -755,6 +758,17 @@ dependencies = [
"tempfile",
]
[[package]]
name = "delorean_tsm"
version = "0.1.0"
dependencies = [
"delorean_test_helpers",
"hex",
"integer-encoding",
"libflate",
"rand 0.7.3",
]
[[package]]
name = "delorean_utilities"
version = "0.1.0"

View File

@ -7,6 +7,7 @@ default-run = "delorean"
[workspace]
members = [
"delorean_generated_types",
"delorean_ingest",
"delorean_line_parser",
"delorean_object_store",
@ -14,7 +15,7 @@ members = [
"delorean_table",
"delorean_table_schema",
"delorean_test_helpers",
"delorean_generated_types",
"delorean_tsm",
"delorean_utilities",
"delorean_wal",
]
@ -31,6 +32,7 @@ delorean_table = { path = "delorean_table" }
delorean_table_schema = { path = "delorean_table_schema" }
delorean_wal = { path = "delorean_wal" }
delorean_object_store = { path = "delorean_object_store" }
delorean_tsm = { path = "delorean_tsm" }
bytes = "0.5.4"
integer-encoding = "1.0.7"

View File

@ -109,7 +109,7 @@ fn float_encode_sequential(c: &mut Criterion) {
c,
"float_encode_sequential",
&LARGER_BATCH_SIZES,
delorean::encoders::float::encode,
delorean_tsm::encoders::float::encode,
);
}
@ -136,7 +136,7 @@ fn integer_encode_sequential(c: &mut Criterion) {
c,
"integer_encode_sequential",
&LARGER_BATCH_SIZES,
delorean::encoders::integer::encode,
delorean_tsm::encoders::integer::encode,
);
}
@ -145,7 +145,7 @@ fn timestamp_encode_sequential(c: &mut Criterion) {
c,
"timestamp_encode_sequential",
&LARGER_BATCH_SIZES,
delorean::encoders::timestamp::encode,
delorean_tsm::encoders::timestamp::encode,
);
}
@ -177,7 +177,7 @@ fn float_encode_random(c: &mut Criterion) {
.take(batch_size)
.collect()
},
delorean::encoders::float::encode,
delorean_tsm::encoders::float::encode,
)
}
@ -207,7 +207,7 @@ fn integer_encode_random(c: &mut Criterion) {
.map(|_| rand::thread_rng().gen_range(0, 100))
.collect()
},
delorean::encoders::integer::encode,
delorean_tsm::encoders::integer::encode,
)
}
@ -232,7 +232,7 @@ fn float_encode_cpu(c: &mut Criterion) {
"float_encode_cpu",
&SMALLER_BATCH_SIZES,
|batch_size| fixtures::CPU_F64_EXAMPLE_VALUES[..batch_size].to_vec(),
delorean::encoders::float::encode,
delorean_tsm::encoders::float::encode,
)
}
@ -244,10 +244,10 @@ fn float_decode_cpu(c: &mut Criterion) {
|batch_size| {
let decoded: Vec<f64> = fixtures::CPU_F64_EXAMPLE_VALUES[..batch_size].to_vec();
let mut encoded = vec![];
delorean::encoders::float::encode(&decoded, &mut encoded).unwrap();
delorean_tsm::encoders::float::encode(&decoded, &mut encoded).unwrap();
(decoded.len(), encoded)
},
delorean::encoders::float::decode,
delorean_tsm::encoders::float::decode,
)
}
@ -259,10 +259,10 @@ fn float_decode_sequential(c: &mut Criterion) {
|batch_size| {
let decoded: Vec<f64> = (1..batch_size).map(convert_from_usize).collect();
let mut encoded = vec![];
delorean::encoders::float::encode(&decoded, &mut encoded).unwrap();
delorean_tsm::encoders::float::encode(&decoded, &mut encoded).unwrap();
(decoded.len(), encoded)
},
delorean::encoders::float::decode,
delorean_tsm::encoders::float::decode,
)
}
@ -274,10 +274,10 @@ fn integer_decode_sequential(c: &mut Criterion) {
|batch_size| {
let decoded: Vec<i64> = (1..batch_size).map(convert_from_usize).collect();
let mut encoded = vec![];
delorean::encoders::integer::encode(&decoded, &mut encoded).unwrap();
delorean_tsm::encoders::integer::encode(&decoded, &mut encoded).unwrap();
(decoded.len(), encoded)
},
delorean::encoders::integer::decode,
delorean_tsm::encoders::integer::decode,
)
}
@ -289,10 +289,10 @@ fn timestamp_decode_sequential(c: &mut Criterion) {
|batch_size| {
let decoded: Vec<i64> = (1..batch_size).map(convert_from_usize).collect();
let mut encoded = vec![];
delorean::encoders::timestamp::encode(&decoded, &mut encoded).unwrap();
delorean_tsm::encoders::timestamp::encode(&decoded, &mut encoded).unwrap();
(decoded.len(), encoded)
},
delorean::encoders::timestamp::decode,
delorean_tsm::encoders::timestamp::decode,
)
}
@ -309,10 +309,10 @@ fn float_decode_random(c: &mut Criterion) {
.collect();
let mut encoded = vec![];
delorean::encoders::float::encode(&decoded, &mut encoded).unwrap();
delorean_tsm::encoders::float::encode(&decoded, &mut encoded).unwrap();
(decoded.len(), encoded)
},
delorean::encoders::float::decode,
delorean_tsm::encoders::float::decode,
)
}
@ -326,10 +326,10 @@ fn integer_decode_random(c: &mut Criterion) {
.map(|_| rand::thread_rng().gen_range(0, 100))
.collect();
let mut encoded = vec![];
delorean::encoders::integer::encode(&decoded, &mut encoded).unwrap();
delorean_tsm::encoders::integer::encode(&decoded, &mut encoded).unwrap();
(decoded.len(), encoded)
},
delorean::encoders::integer::decode,
delorean_tsm::encoders::integer::decode,
)
}

View File

@ -21,6 +21,7 @@ log = "0.4.8"
delorean_line_parser = { path = "../delorean_line_parser" }
delorean_table = { path = "../delorean_table" }
delorean_table_schema = { path = "../delorean_table_schema" }
delorean_tsm = { path = "../delorean_tsm" }
[dev-dependencies]
delorean_test_helpers ={ path = "../delorean_test_helpers" }

View File

@ -1,19 +1,23 @@
#![deny(rust_2018_idioms)]
#![warn(missing_debug_implementations, clippy::explicit_iter_loop)]
//! Library with code for (aspirationally) ingesting various data formats into Delorean
//! Currently supports converting LineProtocol
//! TODO move this to delorean/src/ingest/line_protocol.rs?
#![deny(rust_2018_idioms)]
#![warn(missing_debug_implementations, clippy::explicit_iter_loop)]
use std::collections::{BTreeMap, BTreeSet};
use std::io::{BufRead, Seek};
use log::debug;
use parquet::data_type::ByteArray;
use snafu::{ResultExt, Snafu};
use std::collections::BTreeMap;
use delorean_line_parser::{FieldValue, ParsedLine};
use delorean_table::packers::Packers;
use delorean_table::packers::{Packer, Packers};
use delorean_table::{DeloreanTableWriter, DeloreanTableWriterSource, Error as TableError};
use delorean_table_schema::{DataType, Schema, SchemaBuilder};
use delorean_tsm::mapper::{map_field_columns, ColumnData, TSMMeasurementMapper};
use delorean_tsm::reader::{TSMBlockReader, TSMIndexReader};
use delorean_tsm::{BlockType, TSMError};
#[derive(Debug, Clone)]
pub struct ConversionSettings {
@ -26,7 +30,7 @@ pub struct ConversionSettings {
impl ConversionSettings {}
impl Default for ConversionSettings {
/// Reasonable defult settings
/// Reasonable default settings
fn default() -> Self {
ConversionSettings {
sample_size: 5,
@ -62,6 +66,9 @@ pub enum Error {
#[snafu(display(r#"Error creating TableWriter: {}"#, source))]
WriterCreation { source: TableError },
#[snafu(display(r#"Error processing TSM File: {}"#, source))]
TSMProcessing { source: TSMError },
}
/// Handles buffering `ParsedLine` objects and deducing a schema from that sample
@ -473,6 +480,216 @@ fn pack_lines<'a>(schema: &Schema, lines: &[ParsedLine<'a>]) -> Vec<Packers> {
packers
}
/// Converts a TSM file into the delorean_table internal columnar
/// data format and then passes that converted data to a
/// `DeloreanTableWriter`
pub struct TSMFileConverter {
table_writer_source: Box<dyn DeloreanTableWriterSource>,
}
impl TSMFileConverter {
pub fn new(table_writer_source: Box<dyn DeloreanTableWriterSource>) -> Self {
Self {
table_writer_source,
}
}
/// Processes a TSM file's index and writes each measurement to a Parquet
/// writer.
pub fn convert(
&mut self,
index_stream: impl BufRead + Seek,
index_stream_size: usize,
block_stream: impl BufRead + Seek,
) -> Result<(), Error> {
let index_reader = TSMIndexReader::try_new(index_stream, index_stream_size)
.map_err(|e| Error::TSMProcessing { source: e })?;
let mut block_reader = TSMBlockReader::new(block_stream);
let mapper = TSMMeasurementMapper::new(index_reader.peekable());
for measurement in mapper {
match measurement {
Ok(mut m) => {
let mut builder = SchemaBuilder::new(&m.name);
let mut packed_columns: Vec<Packers> = Vec::new();
let mut tks = Vec::new();
for tag in m.tag_columns() {
builder = builder.tag(tag);
tks.push(tag.clone());
packed_columns.push(Packers::String(Packer::new()));
}
let mut fks = Vec::new();
for (field_key, block_type) in m.field_columns().to_owned() {
builder = builder.field(&field_key, DataType::from(&block_type));
fks.push((field_key.clone(), block_type));
packed_columns.push(Packers::String(Packer::new())); // FIXME - will change
}
// Account for timestamp
packed_columns.push(Packers::Integer(Packer::new()));
let schema = builder.build();
// get mapping between named columns and packer indexes.
let name_packer = schema
.get_col_defs()
.iter()
.map(|c| (c.name.clone(), c.index as usize))
.collect::<BTreeMap<String, usize>>();
// For each tagset combination in the measurement I need
// to build out the table. Then for each column in the
// table I need to convert to a Packer<T> and append it
// to the packer_column.
for (tag_set_pair, blocks) in m.tag_set_fields_blocks() {
let (ts, field_cols) = map_field_columns(&mut block_reader, blocks)
.map_err(|e| Error::TSMProcessing { source: e })?;
// Start with the timestamp column.
let col_len = ts.len();
let ts_idx =
name_packer
.get(schema.timestamp())
.ok_or(Error::TSMProcessing {
// TODO clean this error up
source: TSMError {
description: "could not find ts column".to_string(),
},
})?;
packed_columns[*ts_idx] = Packers::from(ts);
// Next let's pad out all of the tag columns we know have
// repeated values.
for (tag_key, tag_value) in tag_set_pair {
let idx = name_packer.get(tag_key).ok_or(Error::TSMProcessing {
// TODO clean this error up
source: TSMError {
description: "could not find column".to_string(),
},
})?;
// this will create a column of repeated values.
packed_columns[*idx] = Packers::from_elem_str(tag_value, col_len);
}
// Next let's write out NULL values for any tag columns
// on the measurement that we don't have values for
// because they're not part of this tagset.
let tag_keys = tag_set_pair
.iter()
.map(|pair| pair.0.clone())
.collect::<BTreeSet<String>>();
for key in &tks {
if tag_keys.contains(key) {
continue;
}
let idx = name_packer.get(key).ok_or(Error::TSMProcessing {
// TODO clean this error up
source: TSMError {
description: "could not find column".to_string(),
},
})?;
// this will create a column of repeated None values.
let col: Vec<Option<String>> = vec![None; col_len];
packed_columns[*idx] = Packers::from(col);
}
// Next let's write out all of the field column data.
let mut got_field_cols = Vec::new();
for (field_key, field_values) in field_cols {
let idx = name_packer.get(&field_key).ok_or(Error::TSMProcessing {
// TODO clean this error up
source: TSMError {
description: "could not find column".to_string(),
},
})?;
match field_values {
ColumnData::Float(v) => packed_columns[*idx] = Packers::from(v),
ColumnData::Integer(v) => packed_columns[*idx] = Packers::from(v),
ColumnData::Str(v) => packed_columns[*idx] = Packers::from(v),
ColumnData::Bool(v) => packed_columns[*idx] = Packers::from(v),
ColumnData::Unsigned(v) => packed_columns[*idx] = Packers::from(v),
}
got_field_cols.push(field_key);
}
// Finally let's write out all of the field columns that
// we don't have values for here.
for (key, field_type) in &fks {
if got_field_cols.contains(key) {
continue;
}
let idx = name_packer.get(key).ok_or(Error::TSMProcessing {
// TODO clean this error up
source: TSMError {
description: "could not find column".to_string(),
},
})?;
// this will create a column of repeated None values.
match field_type {
BlockType::Float => {
let col: Vec<Option<f64>> = vec![None; col_len];
packed_columns[*idx] = Packers::from(col);
}
BlockType::Integer => {
let col: Vec<Option<i64>> = vec![None; col_len];
packed_columns[*idx] = Packers::from(col);
}
BlockType::Bool => {
let col: Vec<Option<bool>> = vec![None; col_len];
packed_columns[*idx] = Packers::from(col);
}
BlockType::Str => {
let col: Vec<Option<String>> = vec![None; col_len];
packed_columns[*idx] = Packers::from(col);
}
BlockType::Unsigned => {
let col: Vec<Option<u64>> = vec![None; col_len];
packed_columns[*idx] = Packers::from(col);
}
}
}
}
let mut table_writer = self
.table_writer_source
.next_writer(&schema)
.context(WriterCreation)?;
table_writer
.write_batch(&packed_columns)
.map_err(|e| Error::WriterCreation { source: e })?;
table_writer
.close()
.map_err(|e| Error::WriterCreation { source: e })?;
}
Err(e) => return Err(Error::TSMProcessing { source: e }),
}
}
Ok(())
}
}
impl std::fmt::Debug for TSMFileConverter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TSMFileConverter")
// .field("settings", &self.settings)
// .field("converters", &self.converters)
.field("table_writer_source", &"DYNAMIC")
.finish()
}
}
#[cfg(test)]
mod delorean_ingest_tests {
use super::*;
@ -1104,4 +1321,36 @@ mod delorean_ingest_tests {
Ok(())
}
// ----- Tests for TSM Data -----
// #[test]
// fn conversion_tsm_files() -> Result<(), Error> {
// let log = Arc::new(Mutex::new(WriterLog::new()));
// // let mut converter =
// // LineProtocolConverter::new(settings, NoOpWriterSource::new(log.clone()));
// // converter
// // .convert(parsed_lines)
// // .expect("conversion ok")
// // .finalize()
// // .expect("finalize");
// assert_eq!(
// get_events(&log),
// vec![
// "Created writer for measurement h2o_temperature",
// "Created writer for measurement air_temperature",
// "[air_temperature] Wrote batch of 4 cols, 3 rows",
// "[h2o_temperature] Wrote batch of 4 cols, 3 rows",
// "[air_temperature] Wrote batch of 4 cols, 1 rows",
// "[air_temperature] Closed",
// "[h2o_temperature] Wrote batch of 4 cols, 2 rows",
// "[h2o_temperature] Closed",
// ]
// );
// Ok(())
// }
}

View File

@ -40,6 +40,11 @@ macro_rules! typed_packer_accessors {
}
impl Packers {
/// Create a String Packers with repeated values.
pub fn from_elem_str(v: &str, n: usize) -> Self {
Packers::String(Packer::from(vec![ByteArray::from(v); n]))
}
/// Reserves the minimum capacity for exactly additional more elements to
/// be inserted into the Packer<T>` without reallocation.
pub fn reserve_exact(&mut self, additional: usize) {
@ -99,6 +104,42 @@ impl Packers {
}
}
impl std::convert::From<Vec<i64>> for Packers {
fn from(v: Vec<i64>) -> Self {
Packers::Integer(Packer::from(v))
}
}
impl std::convert::From<Vec<f64>> for Packers {
fn from(v: Vec<f64>) -> Self {
Packers::Float(Packer::from(v))
}
}
impl std::convert::From<Vec<ByteArray>> for Packers {
fn from(v: Vec<ByteArray>) -> Self {
Packers::String(Packer::from(v))
}
}
impl std::convert::From<Vec<bool>> for Packers {
fn from(v: Vec<bool>) -> Self {
Packers::Boolean(Packer::from(v))
}
}
impl std::convert::From<Vec<Option<i64>>> for Packers {
fn from(v: Vec<Option<i64>>) -> Self {
Packers::Integer(Packer::from(v))
}
}
impl std::convert::From<Vec<Option<f64>>> for Packers {
fn from(v: Vec<Option<f64>>) -> Self {
Packers::Float(Packer::from(v))
}
}
impl std::convert::From<delorean_table_schema::DataType> for Packers {
fn from(t: delorean_table_schema::DataType) -> Self {
match t {
@ -111,6 +152,40 @@ impl std::convert::From<delorean_table_schema::DataType> for Packers {
}
}
impl std::convert::From<Vec<Option<String>>> for Packers {
fn from(values: Vec<Option<String>>) -> Self {
// TODO(edd): convert this with an iterator?
let mut as_byte_array: Vec<Option<ByteArray>> = Vec::with_capacity(values.len());
for v in values {
match v {
Some(v) => as_byte_array.push(Some(ByteArray::from(v.as_str()))),
None => as_byte_array.push(None),
}
}
Packers::String(Packer::from(as_byte_array))
}
}
impl std::convert::From<Vec<Option<bool>>> for Packers {
fn from(v: Vec<Option<bool>>) -> Self {
Packers::Boolean(Packer::from(v))
}
}
impl std::convert::From<Vec<Option<u64>>> for Packers {
fn from(values: Vec<Option<u64>>) -> Self {
// TODO(edd): convert this with an iterator?
let mut as_i64: Vec<Option<i64>> = Vec::with_capacity(values.len());
for v in values {
match v {
Some(v) => as_i64.push(Some(v as i64)),
None => as_i64.push(None),
}
}
Packers::Integer(Packer::from(as_i64))
}
}
#[derive(Debug, Default)]
pub struct Packer<T: PackerDefault> {
values: Vec<T>,
@ -196,6 +271,30 @@ impl<T: PackerDefault> Packer<T> {
}
}
// Convert `Vec<T>`, e.g., `Vec<f64>` into the appropriate `Packer<T>` value,
// e.g., `Packer<f64>`.
impl<T: PackerDefault> std::convert::From<Vec<T>> for Packer<T> {
fn from(v: Vec<T>) -> Self {
Self {
def_levels: vec![1; v.len()],
rep_levels: vec![1; v.len()],
values: v,
}
}
}
// Convert `Vec<Option<T>>`, e.g., `Vec<Option<f64>>` into the appropriate
// `Packer<T>` value, e.g., `Packer<f64>`.
impl<T: PackerDefault> std::convert::From<Vec<Option<T>>> for Packer<T> {
fn from(values: Vec<Option<T>>) -> Self {
let mut packer = Packer::new();
for v in values {
packer.push_option(v);
}
packer
}
}
/// Provides a `Default` implementation of compatible `Packer` types where the
/// default values are compatible with the Parquet storage format.
///

View File

@ -8,3 +8,4 @@ edition = "2018"
[dependencies]
log = "0.4.8"
delorean_tsm = { path = "../delorean_tsm" }

View File

@ -30,9 +30,11 @@
//! assert_eq!(cols[3], ColumnDefinition::new("field2", 3, DataType::Boolean));
//! assert_eq!(cols[4], ColumnDefinition::new("timestamp", 4, DataType::Timestamp));
//! ```
use std::collections::BTreeMap;
use delorean_tsm::BlockType;
use log::warn;
use std::collections::BTreeMap;
use std::convert::From;
/// Represents a specific Line Protocol Tag name
#[derive(Debug, PartialEq)]
@ -68,6 +70,18 @@ pub enum DataType {
Timestamp,
}
impl From<&BlockType> for DataType {
fn from(value: &BlockType) -> Self {
match value {
BlockType::Float => Self::Float,
BlockType::Integer => Self::Integer,
BlockType::Bool => Self::Boolean,
BlockType::Str => Self::String,
BlockType::Unsigned => Self::Integer,
}
}
}
/// Represents a specific Line Protocol Field name
#[derive(Debug, PartialEq)]
pub struct Field {

16
delorean_tsm/Cargo.toml Normal file
View File

@ -0,0 +1,16 @@
[package]
name = "delorean_tsm"
version = "0.1.0"
authors = ["Edd Robinson <me@edd.io>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
integer-encoding = "1.0.7"
[dev-dependencies]
hex = "0.4.2"
libflate = "1.0.0"
rand = "0.7.2"
delorean_test_helpers = { path = "../delorean_test_helpers" }

View File

@ -195,8 +195,8 @@ fn decode_rle(src: &[u8], dst: &mut Vec<i64>) -> Result<(), Box<dyn Error>> {
}
fn decode_simple8b(src: &[u8], dst: &mut Vec<i64>) -> Result<(), Box<dyn Error>> {
if src.len() < 9 {
return Err(From::from("not enough data to decode packed timestamp"));
if src.len() < 8 {
return Err(From::from("not enough data to decode packed integer."));
}
// TODO(edd): pre-allocate res by counting bytes in encoded slice?
@ -368,4 +368,24 @@ mod tests {
assert_eq!(dec.len(), values.len());
assert_eq!(dec, values);
}
#[test]
// This tests against a defect found when decoding a TSM block from InfluxDB.
fn simple8b_short_regression() {
let values = vec![346];
let mut enc = vec![];
encode(&values, &mut enc).expect("encoding failed");
// this is a compressed simple8b integer block representing the value 346.
let enc_influx = [16, 0, 0, 0, 0, 0, 0, 2, 180];
// ensure that encoder produces same bytes as InfluxDB encoder.
assert_eq!(enc, enc_influx);
let mut dec = vec![];
decode(&enc, &mut dec).expect("failed to decode");
assert_eq!(dec.len(), values.len());
assert_eq!(dec, values);
}
}

249
delorean_tsm/src/lib.rs Normal file
View File

@ -0,0 +1,249 @@
pub mod encoders;
pub mod mapper;
pub mod reader;
use std::convert::TryFrom;
use std::error;
use std::fmt;
use std::io;
#[derive(Clone, Debug)]
pub struct ParsedTSMKey {
pub measurement: String,
pub tagset: Vec<(String, String)>,
pub field_key: String,
}
/// parse_tsm_key parses from the series key the measurement, field key and tag
/// set.
///
/// It does not provide access to the org and bucket ids on the key, these can
/// be accessed via org_id() and bucket_id() respectively.
///
/// TODO: handle escapes in the series key for , = and \t
///
fn parse_tsm_key(mut key: Vec<u8>) -> Result<ParsedTSMKey, TSMError> {
// skip over org id, bucket id, comma, null byte (measurement) and =
// The next n-1 bytes are the measurement name, where the nᵗʰ byte is a `,`.
key = key.drain(8 + 8 + 1 + 1 + 1..).collect::<Vec<u8>>();
let mut i = 0;
// TODO(edd): can we make this work with take_while?
while i != key.len() {
if key[i] == b',' {
break;
}
i += 1;
}
let mut rem_key = key.drain(i..).collect::<Vec<u8>>();
let measurement = String::from_utf8(key).map_err(|e| TSMError {
description: e.to_string(),
})?;
let mut tagset = Vec::<(String, String)>::with_capacity(10);
let mut reading_key = true;
let mut key = String::with_capacity(100);
let mut value = String::with_capacity(100);
// skip the comma separating measurement tag
for byte in rem_key.drain(1..) {
match byte {
44 => {
// ,
reading_key = true;
tagset.push((key, value));
key = String::with_capacity(250);
value = String::with_capacity(250);
}
61 => {
// =
reading_key = false;
}
_ => {
if reading_key {
key.push(byte as char);
} else {
value.push(byte as char);
}
}
}
}
// fields are stored on the series keys in TSM indexes as follows:
//
// <field_key><4-byte delimiter><field_key>
//
// so we can trim the parsed value.
let field_trim_length = (value.len() - 4) / 2;
let (field, _) = value.split_at(field_trim_length);
Ok(ParsedTSMKey {
measurement,
tagset,
field_key: field.to_string(),
})
}
#[derive(Clone, Debug, Copy)]
pub enum BlockType {
Float,
Integer,
Bool,
Str,
Unsigned,
}
impl TryFrom<u8> for BlockType {
type Error = TSMError;
fn try_from(value: u8) -> Result<Self, Self::Error> {
match value {
0 => Ok(Self::Float),
1 => Ok(Self::Integer),
2 => Ok(Self::Bool),
3 => Ok(Self::Str),
4 => Ok(Self::Unsigned),
_ => Err(TSMError {
description: format!("{:?} is invalid block type", value),
}),
}
}
}
/// `Block` holds information about location and time range of a block of data.
#[derive(Debug, Copy, Clone)]
#[allow(dead_code)]
pub struct Block {
pub min_time: i64,
pub max_time: i64,
pub offset: u64,
pub size: u32,
}
// MAX_BLOCK_VALUES is the maximum number of values a TSM block can store.
const MAX_BLOCK_VALUES: usize = 1000;
/// `BlockData` describes the various types of block data that can be held within
/// a TSM file.
#[derive(Debug)]
pub enum BlockData {
Float { ts: Vec<i64>, values: Vec<f64> },
Integer { ts: Vec<i64>, values: Vec<i64> },
Bool { ts: Vec<i64>, values: Vec<bool> },
Str { ts: Vec<i64>, values: Vec<String> },
Unsigned { ts: Vec<i64>, values: Vec<u64> },
}
impl BlockData {
pub fn is_empty(&self) -> bool {
match &self {
BlockData::Float { ts, values: _ } => ts.is_empty(),
BlockData::Integer { ts, values: _ } => ts.is_empty(),
BlockData::Bool { ts, values: _ } => ts.is_empty(),
BlockData::Str { ts, values: _ } => ts.is_empty(),
BlockData::Unsigned { ts, values: _ } => ts.is_empty(),
}
}
}
#[derive(Copy, Clone, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)]
/// `InfluxID` represents an InfluxDB ID used in InfluxDB 2.x to represent
/// organization and bucket identifiers.
pub struct InfluxID(u64);
#[allow(dead_code)]
impl InfluxID {
fn new_str(s: &str) -> Result<InfluxID, TSMError> {
let v = u64::from_str_radix(s, 16).map_err(|e| TSMError {
description: e.to_string(),
})?;
Ok(InfluxID(v))
}
fn from_be_bytes(bytes: [u8; 8]) -> InfluxID {
InfluxID(u64::from_be_bytes(bytes))
}
}
impl std::fmt::Display for InfluxID {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::result::Result<(), std::fmt::Error> {
write!(f, "{:016x}", self.0)
}
}
#[derive(Debug, Clone)]
pub struct TSMError {
pub description: String,
}
impl fmt::Display for TSMError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.description)
}
}
impl error::Error for TSMError {
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
// Generic error, underlying cause isn't tracked.
None
}
}
impl From<io::Error> for TSMError {
fn from(e: io::Error) -> Self {
Self {
description: format!("TODO - io error: {} ({:?})", e, e),
}
}
}
impl From<std::str::Utf8Error> for TSMError {
fn from(e: std::str::Utf8Error) -> Self {
Self {
description: format!("TODO - utf8 error: {} ({:?})", e, e),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parse_tsm_key() {
//<org_id bucket_id>,\x00=http_api_request_duration_seconds,handler=platform,method=POST,path=/api/v2/setup,status=2XX,user_agent=Firefox,\xff=sum#!~#sum
let buf = vec![
"05C19117091A100005C19117091A10012C003D68747470",
"5F6170695F726571756573745F6475726174696F6E5F73",
"65636F6E64732C68616E646C65723D706C6174666F726D",
"2C6D6574686F643D504F53542C706174683D2F6170692F",
"76322F73657475702C7374617475733D3258582C757365",
"725F6167656E743D46697265666F782CFF3D73756D2321",
"7E2373756D",
]
.join("");
let tsm_key = hex::decode(buf).unwrap();
let parsed_key = super::parse_tsm_key(tsm_key).unwrap();
assert_eq!(
parsed_key.measurement,
String::from("http_api_request_duration_seconds")
);
let exp_tagset = vec![
(String::from("handler"), String::from("platform")),
(String::from("method"), String::from("POST")),
(String::from("path"), String::from("/api/v2/setup")),
(String::from("status"), String::from("2XX")),
(String::from("user_agent"), String::from("Firefox")),
];
assert_eq!(parsed_key.tagset, exp_tagset);
assert_eq!(parsed_key.field_key, String::from("sum"));
}
#[test]
fn influx_id() {
let id = InfluxID::new_str("20aa9b0").unwrap();
assert_eq!(id, InfluxID(34_253_232));
assert_eq!(format!("{}", id), "00000000020aa9b0");
}
}

705
delorean_tsm/src/mapper.rs Normal file
View File

@ -0,0 +1,705 @@
///! Types for mapping and converting series data from TSM indexes produced by
///! InfluxDB >= 2.x
use super::reader::{TSMBlockReader, TSMIndexReader};
use super::{Block, BlockData, BlockType, TSMError};
use std::collections::{BTreeMap, BTreeSet};
use std::fmt::{Display, Formatter};
use std::i64;
use std::io::{BufRead, Seek};
use std::iter::Peekable;
/// `TSMMeasurementMapper` takes a TSM reader and produces an iterator that
/// collects all series data for a given measurement.
///
/// The main purpose of the `TSMMeasurementMapper` is to provide a
/// transformation step that allows one to convert per-series/per-field data
/// into measurement-oriented table data.
///
#[derive(Debug)]
pub struct TSMMeasurementMapper<R>
where
R: BufRead + Seek,
{
iter: Peekable<TSMIndexReader<R>>,
}
impl<R> TSMMeasurementMapper<R>
where
R: BufRead + Seek,
{
pub fn new(iter: Peekable<TSMIndexReader<R>>) -> TSMMeasurementMapper<R> {
TSMMeasurementMapper { iter }
}
}
/// either assign a value from a `Result` or return an error wrapped in an Option.
macro_rules! try_or_some {
($e:expr) => {
match $e {
Ok(val) => val,
Err(err) => return Some(Err(err)),
}
};
}
impl<R: BufRead + Seek> Iterator for TSMMeasurementMapper<R> {
type Item = Result<MeasurementTable, TSMError>;
fn next(&mut self) -> Option<Self::Item> {
// `None` indicates the end of index iteration.
let entry = try_or_some!(self.iter.next()?);
let parsed_key = try_or_some!(entry.parse_key());
let mut measurement: MeasurementTable = MeasurementTable::new(parsed_key.measurement);
try_or_some!(measurement.add_series_data(
parsed_key.tagset,
parsed_key.field_key,
entry.block_type,
entry.block
));
// The first index entry for the item has been processed, next keep
// peeking at subsequent entries in the index until a yielded value is
// for a different measurement. At that point we will return the
// measurement.
while let Some(res) = self.iter.peek() {
match res {
Ok(entry) => {
let parsed_key = try_or_some!(entry.parse_key());
if measurement.name != parsed_key.measurement {
// Next entry is for a different measurement.
return Some(Ok(measurement));
}
try_or_some!(measurement.add_series_data(
parsed_key.tagset,
parsed_key.field_key,
entry.block_type,
entry.block
));
}
Err(e) => return Some(Err(e.clone())),
}
self.iter.next(); // advance iterator - we got what we needed from the peek
}
Some(Ok(measurement)) // final measurement in index.
}
}
// FieldKeyBlocks is a mapping between a set of field keys and all of the blocks
// for those keys.
pub type FieldKeyBlocks = BTreeMap<String, Vec<Block>>;
#[derive(Clone, Debug)]
pub struct MeasurementTable {
pub name: String,
// Tagset for key --> map of fields with that tagset to their blocks.
//
// Here we are mapping each set of field keys (and their blocks) to a unique
// tag set.
//
// One entry in `tag_set_fields_blocks` might be:
//
// key: vec![("region", "west"), ("server", "a")]
// value: {
// {key: "temp": vec![*block1*, *block2*},
// {key: "current": value: vec![*block1*, *block1*, *block3*]}
// {key: "voltage": value: vec![*block1*]}
// }
//
// All of the blocks and fields for `"server"="b"` would be kept under a
// separate key on `tag_set_fields_blocks`.
tag_set_fields_blocks: BTreeMap<Vec<(String, String)>, FieldKeyBlocks>,
tag_columns: BTreeSet<String>,
field_columns: BTreeMap<String, BlockType>,
}
impl MeasurementTable {
pub fn new(name: String) -> Self {
Self {
name,
tag_set_fields_blocks: BTreeMap::new(),
tag_columns: BTreeSet::new(),
field_columns: BTreeMap::new(),
}
}
pub fn tag_set_fields_blocks(
&mut self,
) -> &mut BTreeMap<Vec<(String, String)>, FieldKeyBlocks> {
&mut self.tag_set_fields_blocks
}
pub fn tag_columns(&self) -> Vec<&String> {
self.tag_columns.iter().collect()
}
pub fn field_columns(&self) -> &BTreeMap<String, BlockType> {
&self.field_columns
}
// updates the table with data from a single TSM index entry's block.
fn add_series_data(
&mut self,
tagset: Vec<(String, String)>,
field_key: String,
block_type: BlockType,
block: Block,
) -> Result<(), TSMError> {
// tags will be used as the key to a map, where the value will be a
// collection of all the field keys for that tagset and the associated
// blocks.
self.field_columns.insert(field_key.clone(), block_type);
for (k, _) in &tagset {
self.tag_columns.insert(k.clone());
}
let field_key_blocks = self.tag_set_fields_blocks.entry(tagset).or_default();
let blocks = field_key_blocks.entry(field_key).or_default();
blocks.push(block);
Ok(())
}
}
impl Display for MeasurementTable {
// This trait requires `fmt` with this exact signature.
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
writeln!(f, "Measurement: {}", self.name)?;
writeln!(f, "\nTag Sets:")?;
for (tagset, field_key_blocks) in &self.tag_set_fields_blocks {
write!(f, "\t")?;
for (key, value) in tagset {
write!(f, "{}={} ", key, value)?;
}
writeln!(f, "\n\tField Keys:")?;
for (field_key, blocks) in field_key_blocks {
writeln!(f, "\t{}", field_key)?;
for block in blocks {
writeln!(
f,
"\t\tBlock time-range: ({}, {}), Offset: {}, Size: {}",
block.min_time, block.max_time, block.offset, block.size
)?;
}
}
writeln!(f)?;
}
Ok(())
}
}
/// `ColumnData` describes various types of nullable block data.
#[derive(Debug, PartialEq, Clone)]
pub enum ColumnData {
// TODO(edd): perf - I expect it to be much better to track nulls in a
// separate bitmap.
Float(Vec<Option<f64>>),
Integer(Vec<Option<i64>>),
Bool(Vec<Option<bool>>),
Str(Vec<Option<String>>),
Unsigned(Vec<Option<u64>>),
}
// ValuePair represents a single timestamp-value pair from a TSM block.
#[derive(Debug, PartialEq)]
enum ValuePair {
F64((i64, f64)),
I64((i64, i64)),
Bool((i64, bool)),
Str((i64, String)),
U64((i64, u64)),
}
impl ValuePair {
// returns the timestamp associated with the value pair.
fn timestamp(&self) -> i64 {
match *self {
ValuePair::F64((ts, _)) => ts,
ValuePair::I64((ts, _)) => ts,
ValuePair::Bool((ts, _)) => ts,
ValuePair::Str((ts, _)) => ts,
ValuePair::U64((ts, _)) => ts,
}
}
}
// A BlockDecoder is capable of decoding a block definition into block data
// (timestamps and value vectors).
pub trait BlockDecoder {
fn block_data(&mut self, block: &Block) -> Result<BlockData, TSMError>;
}
impl<R> BlockDecoder for &mut TSMBlockReader<R>
where
R: BufRead + Seek,
{
fn block_data(&mut self, block: &Block) -> Result<BlockData, TSMError> {
self.decode_block(block)
}
}
// Maps multiple columnar field blocks to a single tablular representation.
//
// Given a set of field keys and a set of blocks for each key,
// `map_field_columns` aligns each columnar block by the timestamp component to
// produce a single tablular output with one timestamp column, and each value
// column joined by the timestamp values.
//
// For example, here we have three blocks (one block for a different field):
//
// ┌───────────┬───────────┐ ┌───────────┬───────────┐ ┌───────────┬───────────┐
// │ TS │ Temp │ │ TS │ Voltage │ │ TS │ Current │
// ├───────────┼───────────┤ ├───────────┼───────────┤ ├───────────┼───────────┤
// │ 1 │ 10.2 │ │ 1 │ 1.23 │ │ 2 │ 0.332 │
// ├───────────┼───────────┤ ├───────────┼───────────┤ ├───────────┼───────────┤
// │ 2 │ 11.4 │ │ 2 │ 1.24 │ │ 3 │ 0.5 │
// ├───────────┼───────────┤ ├───────────┼───────────┤ ├───────────┼───────────┤
// │ 3 │ 10.2 │ │ 3 │ 1.26 │ │ 5 │ 0.6 │
// └───────────┼───────────┘ └───────────┼───────────┘ └───────────┼───────────┘
// │ │ │
// │ │ │
// └─────────────────────────────┼────────────────────────────┘
// │
// │
// │
// ▼
// ┌──────────┐ ┌──────────┬─────────┬─────────┐
// │ Time │ │ Current │ Temp │ Voltage │
// ├──────────┤ ├──────────┼─────────┼─────────┤
// │ 1 │ │ NULL │ 10.2 │ 1.23 │
// ├──────────┤ ├──────────┼─────────┼─────────┤
// │ 2 │ │ 0.332 │ 11.4 │ 1.24 │
// ├──────────┤ ├──────────┼─────────┼─────────┤
// │ 3 │ │ 0.5 │ 10.2 │ 1.26 │
// ├──────────┤ ├──────────┼─────────┼─────────┤
// │ 5 │ │ 0.6 │ NULL │ NULL │
// └──────────┘ └──────────┴─────────┴─────────┘
//
// We produce a single time column and a column for each field block. Notice
// that if there is no value for a timestamp that the column entry becomes NULL
// Currently we use an Option(None) variant to represent NULL values but in the
// the future this may be changed to a separate bitmap to track NULL values.
//
// An invariant of the TSM block format is that multiple blocks for the same
// input field will never overlap by time. Once we have mapped a single block
// for a field we can decode and pull the next block for the field and continue
// to build the output.
//
pub fn map_field_columns(
mut decoder: impl BlockDecoder,
field_blocks: &mut FieldKeyBlocks,
) -> Result<(Vec<i64>, BTreeMap<String, ColumnData>), TSMError> {
// This function maintains two main buffers. The first holds the next
// decoded block for each field in the input fields. `refill_block_buffer`
// is responsible for determining if each value in the buffer (a decoded
// block) needs refilling. Refilling involves physically decoding a TSM block
// using the reader.
//
// The second buffer holds the "head" of each of the blocks in the first
// buffer; these values are tuples of time-stamp and value. Using these
// values we can essentially do a k-way "join" on the timestamp parts of the
// tuples, and construct an output row where each field (plus time) are
// columns.
// This buffer holds the next decoded block for each input field.
let mut input_block_buffer = BTreeMap::new();
refill_block_buffer(&mut decoder, field_blocks, &mut input_block_buffer)?;
// This buffer holds the head (ts, value) pair in each decoded input block
// of the input block buffer.
let mut block_value_buffer: Vec<Option<ValuePair>> = Vec::new();
block_value_buffer.resize_with(input_block_buffer.len(), || None);
refill_value_pair_buffer(&mut input_block_buffer, &mut block_value_buffer);
// Create output columns for each field.
let mut result = BTreeMap::new();
for (field_key, block) in &input_block_buffer {
match block {
BlockData::Float { .. } => {
result.insert(field_key.clone(), ColumnData::Float(vec![]));
}
BlockData::Integer { .. } => {
result.insert(field_key.clone(), ColumnData::Integer(vec![]));
}
BlockData::Bool { .. } => {
result.insert(field_key.clone(), ColumnData::Bool(vec![]));
}
BlockData::Str { .. } => {
result.insert(field_key.clone(), ColumnData::Str(vec![]));
}
BlockData::Unsigned { .. } => {
result.insert(field_key.clone(), ColumnData::Unsigned(vec![]));
}
}
}
// Each iteration of this loop will result in the creation of one output
// row. Every input block maps to a single column (field) in the output, but
// a block does not have to have a value for every row. Buffers are only
// refilled if values have been used during the loop iteration.
//
// When all inputs have been drained there is no timestamp available to
// create a row with and iteration stops.
let mut timestamps = Vec::new(); // TODO(edd): get hint for pre-allocate
while let Some(min_ts) = map_blocks_to_columns(&mut block_value_buffer, &mut result) {
//
// TODO(edd): Convert nanoseconds into microseconds for Parquet support.
// Address this in https://github.com/influxdata/delorean/issues/167
//
timestamps.push(min_ts / 1000);
refill_block_buffer(&mut decoder, field_blocks, &mut input_block_buffer)?;
refill_value_pair_buffer(&mut input_block_buffer, &mut block_value_buffer);
}
Ok((timestamps, result))
}
// Given a set of input blocks, where each block comprises two equally sized
// arrays of timestamps and values, join the head of each input block's value
// array by the head of the corresponding timestamp column.
//
fn map_blocks_to_columns(
blocks: &mut [Option<ValuePair>],
dst: &mut BTreeMap<String, ColumnData>,
) -> Option<i64> {
// First determine the minimum timestamp in any of the input blocks or return
// None if all of the blocks have been drained.
let min_ts = blocks.iter().flatten().map(ValuePair::timestamp).min()?;
for (i, column) in dst.values_mut().enumerate() {
match &mut blocks[i] {
Some(pair) => {
// If this candidate has the `min_ts` time-stamp then emit its
// value to the output column, otherwise emit a None value.
match pair {
ValuePair::F64((ts, value)) => {
if let ColumnData::Float(vs) = column {
if *ts == min_ts {
vs.push(Some(*value));
blocks[i] = None;
} else {
vs.push(None); // block has a value available but timestamp doesn't join
}
};
}
ValuePair::I64((ts, value)) => {
if let ColumnData::Integer(vs) = column {
if *ts == min_ts {
vs.push(Some(*value));
blocks[i] = None;
} else {
vs.push(None); // block has a value available but timestamp doesn't join
}
};
}
ValuePair::Bool((ts, value)) => {
if let ColumnData::Bool(vs) = column {
if *ts == min_ts {
vs.push(Some(*value));
blocks[i] = None;
} else {
vs.push(None); // block has a value available but timestamp doesn't join
}
};
}
ValuePair::Str((ts, value)) => {
if let ColumnData::Str(vs) = column {
// TODO(edd): perf - Remove this cloning....
if *ts == min_ts {
vs.push(Some(value.clone()));
blocks[i] = None;
} else {
vs.push(None); // block has a value available but timestamp doesn't join
}
};
}
ValuePair::U64((ts, value)) => {
if let ColumnData::Unsigned(vs) = column {
if *ts == min_ts {
vs.push(Some(*value));
blocks[i] = None;
} else {
vs.push(None); // block has a value available but timestamp doesn't join
}
};
}
}
}
// This field value pair doesn't have a value for the min time-stamp
None => match column {
ColumnData::Float(vs) => {
vs.push(None);
}
ColumnData::Integer(vs) => {
vs.push(None);
}
ColumnData::Bool(vs) => {
vs.push(None);
}
ColumnData::Str(vs) => {
vs.push(None);
}
ColumnData::Unsigned(vs) => {
vs.push(None);
}
},
}
}
Some(min_ts)
}
// Ensures that the next available block for a field is materialised in the
// destination container.
fn refill_block_buffer(
decoder: &mut impl BlockDecoder,
field_blocks: &mut FieldKeyBlocks,
dst: &mut BTreeMap<String, BlockData>,
) -> Result<(), TSMError> {
// Determine for each input block if the destination container needs
// refilling.
for (field, blocks) in field_blocks.iter_mut() {
if blocks.is_empty() {
continue; // drained input block
}
if let Some(dst_block) = dst.get(field) {
if !dst_block.is_empty() {
continue; // not ready to be refilled.
}
};
// Either no block data for field in dst, or the block data that is
// present has been drained.
//
// Pop the next input block for this field key, decode it and refill dst
// with it.
let decoded_block = decoder.block_data(&blocks.remove(0))?;
dst.insert(field.clone(), decoded_block);
}
Ok(())
}
// Fills any empty (consumed) values from the destination vector with the next
// value from the input set of blocks.
fn refill_value_pair_buffer(
blocks: &mut BTreeMap<String, BlockData>,
dst: &mut Vec<Option<ValuePair>>,
) {
for (i, block) in blocks.values_mut().enumerate() {
// TODO(edd): seems like this could be DRY'd up a bit??
// TODO(edd): PERF - removing from vector will shift elements. Better off
// tracking an index that's been read up to?
match dst[i] {
Some(_) => {}
None => match block {
BlockData::Float { ts, values } => {
if ts.is_empty() {
continue;
}
dst[i] = Some(ValuePair::F64((ts.remove(0), values.remove(0))))
}
BlockData::Integer { ts, values } => {
if ts.is_empty() {
continue;
}
dst[i] = Some(ValuePair::I64((ts.remove(0), values.remove(0))))
}
BlockData::Bool { ts, values } => {
if ts.is_empty() {
continue;
}
dst[i] = Some(ValuePair::Bool((ts.remove(0), values.remove(0))))
}
BlockData::Str { ts, values } => {
if ts.is_empty() {
continue;
}
dst[i] = Some(ValuePair::Str((ts.remove(0), values.remove(0))))
}
BlockData::Unsigned { ts, values } => {
if ts.is_empty() {
continue;
}
dst[i] = Some(ValuePair::U64((ts.remove(0), values.remove(0))))
}
},
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use libflate::gzip;
use std::fs::File;
use std::io::BufReader;
use std::io::Cursor;
use std::io::Read;
const TSM_FIXTURE_SIZE: usize = 4_222_248;
#[test]
fn map_tsm_index() {
let file = File::open("../tests/fixtures/000000000000005-000000002.tsm.gz");
let mut decoder = gzip::Decoder::new(file.unwrap()).unwrap();
let mut buf = Vec::new();
decoder.read_to_end(&mut buf).unwrap();
let reader =
TSMIndexReader::try_new(BufReader::new(Cursor::new(buf)), TSM_FIXTURE_SIZE).unwrap();
let mapper = TSMMeasurementMapper::new(reader.peekable());
// Although there are over 2,000 series keys in the TSM file, there are
// only 121 unique measurements.
assert_eq!(mapper.count(), 121);
}
#[test]
fn map_field_columns_file() {
let file = File::open("../tests/fixtures/000000000000005-000000002.tsm.gz");
let mut decoder = gzip::Decoder::new(file.unwrap()).unwrap();
let mut buf = Vec::new();
decoder.read_to_end(&mut buf).unwrap();
let index_reader =
TSMIndexReader::try_new(BufReader::new(Cursor::new(&buf)), TSM_FIXTURE_SIZE).unwrap();
let mut mapper = TSMMeasurementMapper::new(index_reader.peekable());
let mut block_reader = TSMBlockReader::new(BufReader::new(Cursor::new(&buf)));
let mut cpu = mapper
.find(|m| m.as_ref().unwrap().name == "cpu")
.unwrap()
.unwrap();
// cpu measurement has these 10 field keys on each tagset combination
let exp_field_keys = vec![
"usage_guest",
"usage_guest_nice",
"usage_idle",
"usage_iowait",
"usage_irq",
"usage_nice",
"usage_softirq",
"usage_steal",
"usage_system",
"usage_user",
];
for field_blocks in cpu.tag_set_fields_blocks.values_mut() {
let (_, field_cols) =
super::map_field_columns(&mut block_reader, field_blocks).unwrap();
let keys: Vec<_> = field_cols.keys().collect();
// Every mapping between field blocks should result in columns
// for every field.
assert_eq!(keys, exp_field_keys);
}
}
#[test]
fn measurement_table_columns() {
let file = File::open("../tests/fixtures/000000000000005-000000002.tsm.gz");
let mut decoder = gzip::Decoder::new(file.unwrap()).unwrap();
let mut buf = Vec::new();
decoder.read_to_end(&mut buf).unwrap();
let reader =
TSMIndexReader::try_new(BufReader::new(Cursor::new(buf)), TSM_FIXTURE_SIZE).unwrap();
let mut mapper = TSMMeasurementMapper::new(reader.peekable());
let cpu = mapper
.find(|table| table.as_ref().unwrap().name == "cpu")
.unwrap()
.unwrap();
assert_eq!(cpu.tag_columns(), vec!["cpu", "host"]);
assert_eq!(
cpu.field_columns().keys().collect::<Vec<_>>(),
vec![
"usage_guest",
"usage_guest_nice",
"usage_idle",
"usage_iowait",
"usage_irq",
"usage_nice",
"usage_softirq",
"usage_steal",
"usage_system",
"usage_user"
]
);
}
#[test]
fn fill_value_buffer() {
// pairs is a helper to generate expected values.
let pairs = |values: &[(i64, i64)]| -> Vec<Option<ValuePair>> {
values
.iter()
.map(|(t, v)| Some(ValuePair::I64((*t, *v))))
.collect::<Vec<_>>()
};
let mut input = BTreeMap::new();
input.insert(
"a".to_string(),
BlockData::Integer {
ts: vec![1, 2],
values: vec![1, 2],
},
);
input.insert(
"b".to_string(),
BlockData::Integer {
ts: vec![1, 2, 3],
values: vec![10, 20, 30],
},
);
input.insert(
"c".to_string(),
BlockData::Integer {
ts: vec![1, 2, 3],
values: vec![100, 200, 300],
},
);
let mut dst: Vec<Option<ValuePair>> = vec![None, None, None];
super::refill_value_pair_buffer(&mut input, &mut dst);
assert_eq!(dst, pairs(&[(1, 1), (1, 10), (1, 100)]));
// If the buffer wasn't drained then no new values will be added.
super::refill_value_pair_buffer(&mut input, &mut dst);
assert_eq!(dst, pairs(&[(1, 1), (1, 10), (1, 100)]));
// use up a value
dst[2] = None;
super::refill_value_pair_buffer(&mut input, &mut dst);
assert_eq!(dst, pairs(&[(1, 1), (1, 10), (2, 200)]));
// consume multiple values
dst = vec![None, None, None];
super::refill_value_pair_buffer(&mut input, &mut dst);
assert_eq!(dst, pairs(&[(2, 2), (2, 20), (3, 300)]));
// consume values to drain the first and last input
dst = vec![None, None, None];
super::refill_value_pair_buffer(&mut input, &mut dst);
let mut exp = pairs(&[(2, 2), (3, 30), (3, 300)]);
exp[0] = None;
exp[2] = None;
assert_eq!(dst, exp);
// drain remaining input
dst = vec![None, None, None];
super::refill_value_pair_buffer(&mut input, &mut dst);
assert_eq!(dst, vec![None, None, None]);
}
}

View File

@ -1,33 +1,31 @@
//! Types for reading and writing TSM files produced by InfluxDB >= 2.x
use crate::encoders::*;
use crate::storage::block::*;
use crate::storage::StorageError;
use super::*;
use integer_encoding::VarInt;
use std::io::{BufRead, Seek, SeekFrom};
use std::u64;
/// `TSMReader` allows you to read all block and index data within a TSM file.
/// `TSMIndexReader` allows you to read index data within a TSM file.
///
/// # Examples
/// # Example
///
/// Iterating over the TSM index.
///
/// ```
/// # use delorean::storage::tsm::*;
/// # use delorean_tsm::reader::*;
/// # use libflate::gzip;
/// # use std::fs::File;
/// # use std::io::BufReader;
/// # use std::io::Cursor;
/// # use std::io::Read;
/// # let file = File::open("tests/fixtures/000000000000005-000000002.tsm.gz");
/// # let file = File::open("../tests/fixtures/000000000000005-000000002.tsm.gz");
/// # let mut decoder = gzip::Decoder::new(file.unwrap()).unwrap();
/// # let mut buf = Vec::new();
/// # decoder.read_to_end(&mut buf).unwrap();
/// # let data_len = buf.len();
/// # let r = Cursor::new(buf);
///
/// let reader = TSMReader::try_new(BufReader::new(r), data_len).unwrap();
/// let reader = TSMIndexReader::try_new(BufReader::new(r), 4_222_248).unwrap();
///
/// // reader allows you to access each index entry, and each block for each
/// // entry in order.
@ -46,37 +44,8 @@ use std::u64;
/// }
/// ```
///
/// Decoding a block.
///
/// ```
/// # use delorean::storage::tsm::*;
/// # use libflate::gzip;
/// # use std::fs::File;
/// # use std::io::BufReader;
/// # use std::io::Cursor;
/// # use std::io::Read;
/// # let file = File::open("tests/fixtures/000000000000005-000000002.tsm.gz");
/// # let mut decoder = gzip::Decoder::new(file.unwrap()).unwrap();
/// # let mut buf = Vec::new();
/// # decoder.read_to_end(&mut buf).unwrap();
/// # let r = Cursor::new(buf);
/// // Initializing a TSMReader requires a buffered reader and the length of the stream.
/// let mut reader = TSMReader::try_new(BufReader::new(r), 4_222_248).unwrap();
/// let entry = reader.next().unwrap().unwrap();
/// println!("this index entry has {:?} blocks", entry.count);
/// // access the decoded time stamps and values for the first block
/// // associated with the index entry.
/// match reader.decode_block(&entry.block).unwrap() {
/// BlockData::Float { ts: _, values: _ } => {}
/// BlockData::Integer { ts: _, values: _ } => {}
/// BlockData::Bool { ts: _, values: _ } => {}
/// BlockData::Str { ts: _, values: _ } => {}
/// BlockData::Unsigned { ts: _, values: _ } => {}
/// }
///```
///
#[derive(Debug)]
pub struct TSMReader<R>
pub struct TSMIndexReader<R>
where
R: BufRead + Seek,
{
@ -89,11 +58,11 @@ where
next: Option<IndexEntry>,
}
impl<R> TSMReader<R>
impl<R> TSMIndexReader<R>
where
R: BufRead + Seek,
{
pub fn try_new(mut r: R, len: usize) -> Result<Self, StorageError> {
pub fn try_new(mut r: R, len: usize) -> Result<Self, TSMError> {
// determine offset to index, which is held in last 8 bytes of file.
r.seek(SeekFrom::End(-8))?;
let mut buf: [u8; 8] = [0; 8];
@ -115,7 +84,7 @@ where
/// index or will return an error. `next_index_entry` updates the offset on
/// the Index, but it's the caller's responsibility to stop reading entries
/// when the index has been exhausted.
fn next_index_entry(&mut self) -> Result<IndexEntry, StorageError> {
fn next_index_entry(&mut self) -> Result<IndexEntry, TSMError> {
// read length of series key
let mut buf: [u8; 2] = [0; 2];
self.r.read_exact(&mut buf)?;
@ -130,7 +99,7 @@ where
// read the block type
self.r.read_exact(&mut buf[..1])?;
self.curr_offset += 1;
let block_type = buf[0];
let b_type = buf[0];
// read how many blocks there are for this entry.
self.r.read_exact(&mut buf)?;
@ -139,7 +108,7 @@ where
Ok(IndexEntry {
key: key_bytes,
block_type,
block_type: BlockType::try_from(b_type)?,
count,
curr_block: 1,
block: self.next_block_entry()?,
@ -149,7 +118,7 @@ where
/// next_block_entry will return the next block entry within an index entry.
/// It is the caller's responsibility to stop reading block entries when
/// they have all been read for an index entry.
fn next_block_entry(&mut self) -> Result<Block, StorageError> {
fn next_block_entry(&mut self) -> Result<Block, TSMError> {
// read min time on block entry
let mut buf: [u8; 8] = [0; 8];
self.r.read_exact(&mut buf[..])?;
@ -178,79 +147,10 @@ where
size,
})
}
/// decode_block decodes the current block pointed to by the provided index
/// entry, returning two vectors containing the timestamps and values.
/// decode_block will seek back to the original position in the index before
/// returning.
///
/// The vectors are guaranteed to have the same length, with a maximum
/// length of 1000.
pub fn decode_block(&mut self, block: &Block) -> Result<BlockData, StorageError> {
self.r.seek(SeekFrom::Start(block.offset))?;
let mut data: Vec<u8> = vec![0; block.size as usize];
self.r.read_exact(&mut data)?;
// TODO(edd): skip 32-bit CRC checksum at beginning of block for now
let mut idx = 4;
// determine the block type
let block_type = data[idx];
idx += 1;
// first decode the timestamp block.
let mut ts: Vec<i64> = Vec::with_capacity(MAX_BLOCK_VALUES); // 1000 is the max block size
let (len, n) = u64::decode_var(&data[idx..]); // size of timestamp block
idx += n;
timestamp::decode(&data[idx..idx + (len as usize)], &mut ts).map_err(|e| StorageError {
description: e.to_string(),
})?;
idx += len as usize;
match block_type {
F64_BLOCKTYPE_MARKER => {
// values will be same length as time-stamps.
let mut values: Vec<f64> = Vec::with_capacity(ts.len());
float::decode_influxdb(&data[idx..], &mut values).map_err(|e| StorageError {
description: e.to_string(),
})?;
// seek to original position in index before returning to caller.
self.r.seek(SeekFrom::Start(self.curr_offset))?;
Ok(BlockData::Float { ts, values })
}
I64_BLOCKTYPE_MARKER => {
// values will be same length as time-stamps.
let mut values: Vec<i64> = Vec::with_capacity(ts.len());
integer::decode(&data[idx..], &mut values).map_err(|e| StorageError {
description: e.to_string(),
})?;
// seek to original position in index before returning to caller.
self.r.seek(SeekFrom::Start(self.curr_offset))?;
Ok(BlockData::Integer { ts, values })
}
BOOL_BLOCKTYPE_MARKER => Err(StorageError {
description: String::from("bool block type unsupported"),
}),
STRING_BLOCKTYPE_MARKER => Err(StorageError {
description: String::from("string block type unsupported"),
}),
U64_BLOCKTYPE_MARKER => Err(StorageError {
description: String::from("unsigned integer block type unsupported"),
}),
_ => Err(StorageError {
description: format!("unsupported block type {:?}", block_type),
}),
}
}
}
impl<R: BufRead + Seek> Iterator for TSMReader<R> {
type Item = Result<IndexEntry, StorageError>;
impl<R: BufRead + Seek> Iterator for TSMIndexReader<R> {
type Item = Result<IndexEntry, TSMError>;
fn next(&mut self) -> Option<Self::Item> {
if self.curr_offset == self.end_offset {
@ -294,7 +194,7 @@ impl<R: BufRead + Seek> Iterator for TSMReader<R> {
pub struct IndexEntry {
key: Vec<u8>,
pub block_type: u8,
pub block_type: BlockType,
pub count: u16,
pub block: Block,
curr_block: u16,
@ -317,133 +217,94 @@ impl IndexEntry {
InfluxID::from_be_bytes(buf)
}
pub fn parse_key(&self) -> Result<ParsedTSMKey, StorageError> {
pub fn parse_key(&self) -> Result<ParsedTSMKey, TSMError> {
parse_tsm_key(self.key.to_vec())
}
}
#[derive(Clone, Debug)]
pub struct ParsedTSMKey {
pub measurement: String,
pub tagset: Vec<(String, String)>,
pub field_key: String,
}
/// parse_tsm_key parses from the series key the measurement, field key and tag
/// set.
/// `TSMBlockReader` allows you to read and decode TSM blocks from within a TSM
/// file.
///
/// It does not provide access to the org and bucket ids on the key, these can
/// be accessed via org_id() and bucket_id() respectively.
///
/// TODO: handle escapes in the series key for , = and \t
///
fn parse_tsm_key(mut key: Vec<u8>) -> Result<ParsedTSMKey, StorageError> {
// skip over org id, bucket id, comma, null byte (measurement) and =
// The next n-1 bytes are the measurement name, where the nᵗʰ byte is a `,`.
key = key.drain(8 + 8 + 1 + 1 + 1..).collect::<Vec<u8>>();
let mut i = 0;
// TODO(edd): can we make this work with take_while?
while i != key.len() {
if key[i] == b',' {
break;
}
i += 1;
}
let mut rem_key = key.drain(i..).collect::<Vec<u8>>();
let measurement = String::from_utf8(key).map_err(|e| StorageError {
description: e.to_string(),
})?;
let mut tagset = Vec::<(String, String)>::with_capacity(10);
let mut reading_key = true;
let mut key = String::with_capacity(100);
let mut value = String::with_capacity(100);
// skip the comma separating measurement tag
for byte in rem_key.drain(1..) {
match byte {
44 => {
// ,
reading_key = true;
tagset.push((key, value));
key = String::with_capacity(250);
value = String::with_capacity(250);
}
61 => {
// =
reading_key = false;
}
_ => {
if reading_key {
key.push(byte as char);
} else {
value.push(byte as char);
}
}
}
}
// fields are stored on the series keys in TSM indexes as follows:
//
// <field_key><4-byte delimiter><field_key>
//
// so we can trim the parsed value.
let field_trim_length = (value.len() - 4) / 2;
let (field, _) = value.split_at(field_trim_length);
Ok(ParsedTSMKey {
measurement,
tagset,
field_key: field.to_string(),
})
}
/// `Block` holds information about location and time range of a block of data.
#[derive(Debug, Copy, Clone)]
#[allow(dead_code)]
pub struct Block {
pub min_time: i64,
pub max_time: i64,
pub offset: u64,
pub size: u32,
}
// MAX_BLOCK_VALUES is the maximum number of values a TSM block can store.
const MAX_BLOCK_VALUES: usize = 1000;
/// `BlockData` describes the various types of block data that can be held within
/// a TSM file.
#[derive(Debug)]
pub enum BlockData {
Float { ts: Vec<i64>, values: Vec<f64> },
Integer { ts: Vec<i64>, values: Vec<i64> },
Bool { ts: Vec<i64>, values: Vec<bool> },
Str { ts: Vec<i64>, values: Vec<String> },
Unsigned { ts: Vec<i64>, values: Vec<u64> },
pub struct TSMBlockReader<R>
where
R: BufRead + Seek,
{
r: R,
}
#[derive(Copy, Clone, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)]
/// `InfluxID` represents an InfluxDB ID used in InfluxDB 2.x to represent
/// organization and bucket identifiers.
pub struct InfluxID(u64);
impl<R> TSMBlockReader<R>
where
R: BufRead + Seek,
{
pub fn new(r: R) -> Self {
Self { r }
}
#[allow(dead_code)]
impl InfluxID {
fn new_str(s: &str) -> Result<InfluxID, StorageError> {
let v = u64::from_str_radix(s, 16).map_err(|e| StorageError {
description: e.to_string(),
/// decode_block decodes a block whose location is described by the provided
/// `Block`.
///
/// The components of the returned `BlockData` are guaranteed to have
/// identical lengths.
pub fn decode_block(&mut self, block: &Block) -> Result<BlockData, TSMError> {
self.r.seek(SeekFrom::Start(block.offset))?;
let mut data: Vec<u8> = vec![0; block.size as usize];
self.r.read_exact(&mut data)?;
// TODO(edd): skip 32-bit CRC checksum at beginning of block for now
let mut idx = 4;
// determine the block type
let block_type = BlockType::try_from(data[idx])?;
idx += 1;
// first decode the timestamp block.
let mut ts: Vec<i64> = Vec::with_capacity(MAX_BLOCK_VALUES); // 1000 is the max block size
let (len, n) = u64::decode_var(&data[idx..]); // size of timestamp block
idx += n;
encoders::timestamp::decode(&data[idx..idx + (len as usize)], &mut ts).map_err(|e| {
TSMError {
description: e.to_string(),
}
})?;
Ok(InfluxID(v))
}
idx += len as usize;
fn from_be_bytes(bytes: [u8; 8]) -> InfluxID {
InfluxID(u64::from_be_bytes(bytes))
}
}
match block_type {
BlockType::Float => {
// values will be same length as time-stamps.
let mut values: Vec<f64> = Vec::with_capacity(ts.len());
encoders::float::decode_influxdb(&data[idx..], &mut values).map_err(|e| {
TSMError {
description: e.to_string(),
}
})?;
impl std::fmt::Display for InfluxID {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::result::Result<(), std::fmt::Error> {
write!(f, "{:016x}", self.0)
Ok(BlockData::Float { ts, values })
}
BlockType::Integer => {
// values will be same length as time-stamps.
let mut values: Vec<i64> = Vec::with_capacity(ts.len());
encoders::integer::decode(&data[idx..], &mut values).map_err(|e| TSMError {
description: e.to_string(),
})?;
Ok(BlockData::Integer { ts, values })
}
BlockType::Bool => Err(TSMError {
description: String::from("bool block type unsupported"),
}),
BlockType::Str => {
let len = ts.len();
Ok(BlockData::Str {
ts,
values: vec!["unsupported string!!".to_string(); len as usize],
})
}
BlockType::Unsigned => Err(TSMError {
description: String::from("unsigned integer block type unsupported"),
}),
}
}
}
@ -459,12 +320,12 @@ mod tests {
#[test]
fn read_tsm_index() {
let file = File::open("tests/fixtures/000000000000005-000000002.tsm.gz");
let file = File::open("../tests/fixtures/000000000000005-000000002.tsm.gz");
let mut decoder = gzip::Decoder::new(file.unwrap()).unwrap();
let mut buf = Vec::new();
decoder.read_to_end(&mut buf).unwrap();
let reader = TSMReader::try_new(BufReader::new(Cursor::new(buf)), 4_222_248).unwrap();
let reader = TSMIndexReader::try_new(BufReader::new(Cursor::new(buf)), 4_222_248).unwrap();
assert_eq!(reader.curr_offset, 3_893_272);
assert_eq!(reader.count(), 2159)
@ -472,12 +333,12 @@ mod tests {
#[test]
fn read_tsm_block() {
let file = File::open("tests/fixtures/000000000000005-000000002.tsm.gz");
let file = File::open("../tests/fixtures/000000000000005-000000002.tsm.gz");
let mut decoder = gzip::Decoder::new(file.unwrap()).unwrap();
let mut buf = Vec::new();
decoder.read_to_end(&mut buf).unwrap();
let reader = TSMReader::try_new(BufReader::new(Cursor::new(buf)), 4_222_248).unwrap();
let reader = TSMIndexReader::try_new(BufReader::new(Cursor::new(buf)), 4_222_248).unwrap();
let mut got_blocks: u64 = 0;
let mut got_min_time = i64::MAX;
@ -526,32 +387,33 @@ mod tests {
#[test]
fn decode_tsm_blocks() {
let file = File::open("tests/fixtures/000000000000005-000000002.tsm.gz");
let file = File::open("../tests/fixtures/000000000000005-000000002.tsm.gz");
let mut decoder = gzip::Decoder::new(file.unwrap()).unwrap();
let mut buf = Vec::new();
decoder.read_to_end(&mut buf).unwrap();
let r = Cursor::new(buf);
let mut reader = TSMReader::try_new(BufReader::new(r), 4_222_248).unwrap();
let mut block_reader = TSMBlockReader::new(BufReader::new(r));
let block_defs = vec![
super::Block {
min_time: 1590585530000000000,
max_time: 1590590600000000000,
offset: 5339,
size: 153,
},
super::Block {
min_time: 1590585520000000000,
max_time: 1590590600000000000,
offset: 190770,
size: 30,
},
];
let mut blocks = vec![];
// Find the float block with offset 5339 in the file.
let f64_entry = reader
.find(|e| {
e.as_ref().unwrap().block.offset == 5339 && e.as_ref().unwrap().block_type == 0_u8
})
.unwrap()
.unwrap();
let f64_block = &reader.decode_block(&f64_entry.block).unwrap();
blocks.push(f64_block);
// Find the first integer block index entry in the file.
let i64_entry = reader
.find(|e| e.as_ref().unwrap().block_type == 1_u8)
.unwrap()
.unwrap();
let i64_block = &reader.decode_block(&i64_entry.block).unwrap();
blocks.push(i64_block);
for def in block_defs {
blocks.push(block_reader.decode_block(&def).unwrap());
}
for block in blocks {
// The first integer block in the value should have 509 values in it.
@ -575,45 +437,6 @@ mod tests {
}
}
#[test]
fn influx_id() {
let id = InfluxID::new_str("20aa9b0").unwrap();
assert_eq!(id, InfluxID(34_253_232));
assert_eq!(format!("{}", id), "00000000020aa9b0");
}
#[test]
fn parse_tsm_key() {
//<org_id bucket_id>,\x00=http_api_request_duration_seconds,handler=platform,method=POST,path=/api/v2/setup,status=2XX,user_agent=Firefox,\xff=sum#!~#sum
let buf = vec![
"05C19117091A100005C19117091A10012C003D68747470",
"5F6170695F726571756573745F6475726174696F6E5F73",
"65636F6E64732C68616E646C65723D706C6174666F726D",
"2C6D6574686F643D504F53542C706174683D2F6170692F",
"76322F73657475702C7374617475733D3258582C757365",
"725F6167656E743D46697265666F782CFF3D73756D2321",
"7E2373756D",
]
.join("");
let tsm_key = hex::decode(buf).unwrap();
let parsed_key = super::parse_tsm_key(tsm_key).unwrap();
assert_eq!(
parsed_key.measurement,
String::from("http_api_request_duration_seconds")
);
let exp_tagset = vec![
(String::from("handler"), String::from("platform")),
(String::from("method"), String::from("POST")),
(String::from("path"), String::from("/api/v2/setup")),
(String::from("status"), String::from("2XX")),
(String::from("user_agent"), String::from("Firefox")),
];
assert_eq!(parsed_key.tagset, exp_tagset);
assert_eq!(parsed_key.field_key, String::from("sum"));
}
// This test scans over the entire tsm contents and
// ensures no errors are returned from the reader.
fn walk_index_and_check_for_errors(tsm_gz_path: &str) {
@ -622,30 +445,33 @@ mod tests {
let mut buf = Vec::new();
decoder.read_to_end(&mut buf).unwrap();
let data_len = buf.len();
let r = Cursor::new(buf);
let mut reader = TSMReader::try_new(BufReader::new(r), data_len).unwrap();
let mut index_reader =
TSMIndexReader::try_new(BufReader::new(Cursor::new(&buf)), data_len).unwrap();
let mut blocks = Vec::new();
for res in &mut reader {
for res in &mut index_reader {
let entry = res.unwrap();
let key = entry.parse_key().unwrap();
assert!(!key.measurement.is_empty());
let block_type = entry.block_type;
if block_type == BOOL_BLOCKTYPE_MARKER {
eprintln!("Note: ignoring bool block, not implemented");
} else if block_type == STRING_BLOCKTYPE_MARKER {
eprintln!("Note: ignoring string block, not implemented");
} else if block_type == U64_BLOCKTYPE_MARKER {
eprintln!("Note: ignoring bool block, not implemented");
} else {
blocks.push(entry.block);
match entry.block_type {
BlockType::Bool => {
eprintln!("Note: ignoring bool block, not implemented");
}
BlockType::Str => {
eprintln!("Note: ignoring Str block, not implemented");
}
BlockType::Unsigned => {
eprintln!("Note: ignoring unsigned block, not implemented");
}
_ => blocks.push(entry.block),
}
}
let mut block_reader = TSMBlockReader::new(Cursor::new(&buf));
for block in blocks {
reader
block_reader
.decode_block(&block)
.expect("error decoding block data");
}
@ -653,11 +479,11 @@ mod tests {
#[test]
fn check_tsm_cpu_usage() {
walk_index_and_check_for_errors("tests/fixtures/cpu_usage.tsm.gz");
walk_index_and_check_for_errors("../tests/fixtures/cpu_usage.tsm.gz");
}
#[test]
fn check_tsm_000000000000005_000000002() {
walk_index_and_check_for_errors("tests/fixtures/000000000000005-000000002.tsm.gz");
walk_index_and_check_for_errors("../tests/fixtures/000000000000005-000000002.tsm.gz");
}
}

View File

@ -1,4 +1,4 @@
use delorean_ingest::{ConversionSettings, LineProtocolConverter};
use delorean_ingest::{ConversionSettings, LineProtocolConverter, TSMFileConverter};
use delorean_line_parser::parse_lines;
use delorean_parquet::writer::DeloreanParquetTableWriter;
use delorean_table::{DeloreanTableWriter, DeloreanTableWriterSource, Error as TableError};
@ -108,7 +108,13 @@ pub fn convert(input_filename: &str, output_name: &str) -> Result<()> {
FileType::LineProtocol => {
convert_line_protocol_to_parquet(input_filename, input_reader, output_name)
}
FileType::TSM => convert_tsm_to_parquet(input_filename, input_reader, output_name),
FileType::TSM => {
// TODO(edd): we can remove this when I figure out the best way to share
// the reader between the TSM index reader and the Block decoder.
let input_block_reader = InputReader::new(input_filename)?;
let len = input_reader.len() as usize;
convert_tsm_to_parquet(input_reader, len, input_block_reader, output_name)
}
FileType::Parquet => Err(Error::NotImplemented {
operation_name: String::from("Parquet format conversion"),
}),
@ -172,11 +178,27 @@ fn convert_line_protocol_to_parquet(
}
fn convert_tsm_to_parquet(
_input_filename: &str,
mut _input_reader: InputReader,
_output_name: &str,
index_stream: InputReader,
index_stream_size: usize,
block_stream: InputReader,
output_name: &str,
) -> Result<()> {
Err(Error::NotImplemented {
operation_name: String::from("TSM Conversion not supported yet"),
})
// setup writing
let writer_source: Box<dyn DeloreanTableWriterSource> = if is_directory(&output_name) {
info!("Writing to output directory {:?}", output_name);
Box::new(ParquetDirectoryWriterSource {
output_dir_path: PathBuf::from(output_name),
})
} else {
info!("Writing to output file {}", output_name);
Box::new(ParquetFileWriterSource {
output_filename: String::from(output_name),
made_file: false,
})
};
let mut converter = TSMFileConverter::new(writer_source);
converter
.convert(index_stream, index_stream_size, block_stream)
.map_err(|e| Error::UnableToCloseTableWriter { source: e })
}

View File

@ -1,9 +1,9 @@
use snafu::Snafu;
use delorean::storage::StorageError;
use delorean_ingest::Error as IngestError;
use delorean_parquet::error::Error as DeloreanParquetError;
use delorean_parquet::writer::Error as ParquetWriterError;
use delorean_tsm::TSMError;
#[derive(Debug, Snafu)]
pub enum Error {
@ -60,7 +60,7 @@ pub enum Error {
UnableDumpToParquetMetadata { source: DeloreanParquetError },
#[snafu(display(r#"Error reading TSM data: {}"#, source))]
TSM { source: StorageError },
TSM { source: TSMError },
#[snafu(display(r#"Error parsing data: {}"#, source))]
Parsing { source: delorean_line_parser::Error },

View File

@ -4,8 +4,8 @@
use std::collections::{BTreeMap, BTreeSet};
use std::convert::TryInto;
use delorean::storage::tsm::{IndexEntry, InfluxID, TSMReader};
use delorean::storage::StorageError;
use delorean_tsm::reader::{IndexEntry, TSMIndexReader};
use delorean_tsm::{InfluxID, TSMError};
use delorean_parquet::metadata::print_parquet_metadata;
use log::{debug, info};
@ -29,7 +29,7 @@ pub fn dump_meta(input_filename: &str) -> Result<()> {
.try_into()
.expect("File size more than usize");
let reader =
TSMReader::try_new(input_reader, len).map_err(|e| Error::TSM { source: e })?;
TSMIndexReader::try_new(input_reader, len).map_err(|e| Error::TSM { source: e })?;
let mut stats_builder = TSMMetadataBuilder::new();
@ -128,7 +128,7 @@ impl TSMMetadataBuilder {
Self::default()
}
fn process_entry(&mut self, entry: &mut Result<IndexEntry, StorageError>) -> Result<()> {
fn process_entry(&mut self, entry: &mut Result<IndexEntry, TSMError>) -> Result<()> {
match entry {
Ok(index_entry) => {
self.num_entries += 1;

View File

@ -3,7 +3,6 @@
use std::{error, fmt};
pub mod encoders;
pub mod id;
pub mod line_parser;
pub mod storage;

View File

@ -1,8 +1,8 @@
#![deny(rust_2018_idioms)]
#![warn(missing_debug_implementations, clippy::explicit_iter_loop)]
use log::{debug, error, warn};
use clap::{crate_authors, crate_version, App, Arg, SubCommand};
use log::{debug, error, warn};
mod commands {
pub mod convert;

View File

@ -10,7 +10,6 @@ pub mod partitioned_store;
pub mod predicate;
pub mod remote_partition;
pub mod s3_partition;
pub mod tsm;
#[derive(Debug, Eq, PartialEq, Clone)]
pub struct ReadPoint<T: Clone> {

View File

@ -160,8 +160,8 @@
//! ╚═════════════════════════════════════╝
//! ```
use crate::encoders::{float, integer, timestamp};
use crate::storage::StorageError;
use delorean_tsm::encoders::{float, integer, timestamp};
use integer_encoding::*;
use num::bigint::{BigInt, BigUint};

View File

@ -82,47 +82,54 @@ fn convert_line_protocol_good_input_filename() {
#[test]
fn convert_tsm_good_input_filename() {
let mut cmd = Command::cargo_bin("delorean").unwrap();
//
// TODO: this needs to work for a temp directory...
//
let parquet_path = delorean_test_helpers::tempfile::Builder::new()
.prefix("convert_e2e_tsm")
.suffix(".parquet")
.tempfile()
.expect("error creating temp file")
.into_temp_path();
let parquet_filename_string = parquet_path.to_string_lossy().to_string();
// let mut cmd = Command::cargo_bin("delorean").unwrap();
let assert = cmd
.arg("convert")
.arg("tests/fixtures/000000000000005-000000002.tsm.gz")
.arg(&parquet_filename_string)
.assert();
// let tmp_dir = delorean_test_helpers::tmp_dir();
// let parquet_path = tmp_dir.unwrap().into_path().to_str().unwrap();
// TODO this should succeed when TSM -> parquet conversion is implemented.
assert
.failure()
.code(1)
.stderr(predicate::str::contains("Conversion failed"))
.stderr(predicate::str::contains(
"Not implemented: TSM Conversion not supported yet",
));
// // ::Builder::new()
// // .prefix("dstool_e2e_tsm")
// // .suffix(".parquet")
// // .tempfile()
// // .expect("error creating temp file")
// // .into_temp_path();
// // let parquet_filename_string = parquet_path.to_string_lossy().to_string();
// TODO add better success expectations
// let assert = cmd
// .arg("convert")
// .arg("tests/fixtures/cpu_usage.tsm")
// .arg(&parquet_path)
// .assert();
// let expected_success_string = format!(
// "Completing writing to {} successfully",
// parquet_filename_string
// );
// // TODO this should succeed when TSM -> parquet conversion is implemented.
// // assert
// // .failure()
// // .code(1)
// // .stderr(predicate::str::contains("Conversion failed"))
// // .stderr(predicate::str::contains(
// // "Not implemented: TSM Conversion not supported yet",
// // ));
// assert
// .success()
// .stderr(predicate::str::contains("convert starting"))
// .stderr(predicate::str::contains(
// "Writing output for measurement h2o_temperature",
// ))
// .stderr(predicate::str::contains(expected_success_string));
// // TODO add better success expectations
// validate_parquet_file(&parquet_path);
// // let expected_success_string = format!(
// // "Completing writing to {} successfully",
// // parquet_filename_string
// // );
// // assert
// // .success()
// // .stderr(predicate::str::contains("dstool convert starting"))
// // .stderr(predicate::str::contains(
// // "Writing output for measurement h2o_temperature",
// // ))
// // .stderr(predicate::str::contains(expected_success_string));
// // validate_parquet_file(&parquet_path);
}
#[test]