refactor: hoist tsm reader into own crate

pull/24376/head
Edd Robinson 2020-06-17 09:49:21 +01:00
parent fd9f2ea5b8
commit 85e0b4ec16
12 changed files with 87 additions and 14 deletions

12
Cargo.lock generated
View File

@ -634,6 +634,7 @@ dependencies = [
"delorean_table",
"delorean_table_schema",
"delorean_test_helpers",
"delorean_tsm",
"delorean_wal",
"dirs 2.0.2",
"dotenv",
@ -755,6 +756,17 @@ dependencies = [
"tempfile",
]
[[package]]
name = "delorean_tsm"
version = "0.1.0"
dependencies = [
"delorean_test_helpers",
"hex",
"integer-encoding",
"libflate",
"rand 0.7.3",
]
[[package]]
name = "delorean_utilities"
version = "0.1.0"

View File

@ -7,6 +7,7 @@ default-run = "delorean"
[workspace]
members = [
"delorean_generated_types",
"delorean_ingest",
"delorean_line_parser",
"delorean_object_store",
@ -14,7 +15,7 @@ members = [
"delorean_table",
"delorean_table_schema",
"delorean_test_helpers",
"delorean_generated_types",
"delorean_tsm",
"delorean_utilities",
"delorean_wal",
]
@ -31,6 +32,7 @@ delorean_table = { path = "delorean_table" }
delorean_table_schema = { path = "delorean_table_schema" }
delorean_wal = { path = "delorean_wal" }
delorean_object_store = { path = "delorean_object_store" }
delorean_tsm = { path = "delorean_tsm" }
bytes = "0.5.4"
integer-encoding = "1.0.7"

16
delorean_tsm/Cargo.toml Normal file
View File

@ -0,0 +1,16 @@
[package]
name = "delorean_tsm"
version = "0.1.0"
authors = ["Edd Robinson <me@edd.io>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
integer-encoding = "1.0.7"
[dev-dependencies]
hex = "0.4.2"
libflate = "1.0.0"
rand = "0.7.2"
delorean_test_helpers = { path = "../delorean_test_helpers" }

View File

@ -1,9 +1,14 @@
//! Types for reading and writing TSM files produced by InfluxDB >= 2.x
use crate::encoders::*;
use crate::storage::block::*;
use crate::storage::StorageError;
pub mod encoders;
// use crate::storage::block::*;
use encoders::*;
use integer_encoding::VarInt;
use std::convert::TryFrom;
use std::error;
use std::fmt;
use std::io;
use std::io::{BufRead, Seek, SeekFrom};
use std::u64;
@ -14,13 +19,13 @@ use std::u64;
/// Iterating over the TSM index.
///
/// ```
/// # use delorean::storage::tsm::*;
/// # use delorean_tsm::*;
/// # use libflate::gzip;
/// # use std::fs::File;
/// # use std::io::BufReader;
/// # use std::io::Cursor;
/// # use std::io::Read;
/// # let file = File::open("tests/fixtures/000000000000005-000000002.tsm.gz");
/// # let file = File::open("../tests/fixtures/000000000000005-000000002.tsm.gz");
/// # let mut decoder = gzip::Decoder::new(file.unwrap()).unwrap();
/// # let mut buf = Vec::new();
/// # decoder.read_to_end(&mut buf).unwrap();
@ -300,6 +305,12 @@ fn parse_tsm_key(mut key: Vec<u8>) -> Result<ParsedTSMKey, StorageError> {
})
}
pub const F64_BLOCKTYPE_MARKER: u8 = 0;
pub const I64_BLOCKTYPE_MARKER: u8 = 1;
pub const BOOL_BLOCKTYPE_MARKER: u8 = 2;
pub const STRING_BLOCKTYPE_MARKER: u8 = 3;
pub const U64_BLOCKTYPE_MARKER: u8 = 4;
/// `TSMBlockReader` allows you to read and decode TSM blocks from within a TSM
/// file.
///
@ -442,6 +453,40 @@ impl std::fmt::Display for InfluxID {
}
}
#[derive(Debug, Clone)]
pub struct StorageError {
pub description: String,
}
impl fmt::Display for StorageError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.description)
}
}
impl error::Error for StorageError {
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
// Generic error, underlying cause isn't tracked.
None
}
}
impl From<io::Error> for StorageError {
fn from(e: io::Error) -> Self {
Self {
description: format!("TODO - io error: {} ({:?})", e, e),
}
}
}
impl From<std::str::Utf8Error> for StorageError {
fn from(e: std::str::Utf8Error) -> Self {
Self {
description: format!("TODO - utf8 error: {} ({:?})", e, e),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
@ -454,7 +499,7 @@ mod tests {
#[test]
fn read_tsm_index() {
let file = File::open("tests/fixtures/000000000000005-000000002.tsm.gz");
let file = File::open("../tests/fixtures/000000000000005-000000002.tsm.gz");
let mut decoder = gzip::Decoder::new(file.unwrap()).unwrap();
let mut buf = Vec::new();
decoder.read_to_end(&mut buf).unwrap();
@ -467,7 +512,7 @@ mod tests {
#[test]
fn read_tsm_block() {
let file = File::open("tests/fixtures/000000000000005-000000002.tsm.gz");
let file = File::open("../tests/fixtures/000000000000005-000000002.tsm.gz");
let mut decoder = gzip::Decoder::new(file.unwrap()).unwrap();
let mut buf = Vec::new();
decoder.read_to_end(&mut buf).unwrap();
@ -521,7 +566,7 @@ mod tests {
#[test]
fn decode_tsm_blocks() {
let file = File::open("tests/fixtures/000000000000005-000000002.tsm.gz");
let file = File::open("../tests/fixtures/000000000000005-000000002.tsm.gz");
let mut decoder = gzip::Decoder::new(file.unwrap()).unwrap();
let mut buf = Vec::new();
decoder.read_to_end(&mut buf).unwrap();
@ -650,11 +695,11 @@ mod tests {
#[test]
fn check_tsm_cpu_usage() {
walk_index_and_check_for_errors("tests/fixtures/cpu_usage.tsm.gz");
walk_index_and_check_for_errors("../tests/fixtures/cpu_usage.tsm.gz");
}
#[test]
fn check_tsm_000000000000005_000000002() {
walk_index_and_check_for_errors("tests/fixtures/000000000000005-000000002.tsm.gz");
walk_index_and_check_for_errors("../tests/fixtures/000000000000005-000000002.tsm.gz");
}
}

View File

@ -3,7 +3,6 @@
use std::{error, fmt};
pub mod encoders;
pub mod id;
pub mod line_parser;
pub mod storage;

View File

@ -10,7 +10,6 @@ pub mod partitioned_store;
pub mod predicate;
pub mod remote_partition;
pub mod s3_partition;
pub mod tsm;
pub mod tsm_mapper;
#[derive(Debug, Eq, PartialEq, Clone)]

View File

@ -160,8 +160,8 @@
//! ╚═════════════════════════════════════╝
//! ```
use crate::encoders::{float, integer, timestamp};
use crate::storage::StorageError;
use delorean_tsm::encoders::{float, integer, timestamp};
use integer_encoding::*;
use num::bigint::{BigInt, BigUint};