refactor: hoist tsm mapper to delorean_tsm
parent
f046dbeea0
commit
621f2f91f0
|
@ -109,7 +109,7 @@ fn float_encode_sequential(c: &mut Criterion) {
|
|||
c,
|
||||
"float_encode_sequential",
|
||||
&LARGER_BATCH_SIZES,
|
||||
delorean::encoders::float::encode,
|
||||
delorean_tsm::encoders::float::encode,
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -136,7 +136,7 @@ fn integer_encode_sequential(c: &mut Criterion) {
|
|||
c,
|
||||
"integer_encode_sequential",
|
||||
&LARGER_BATCH_SIZES,
|
||||
delorean::encoders::integer::encode,
|
||||
delorean_tsm::encoders::integer::encode,
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -145,7 +145,7 @@ fn timestamp_encode_sequential(c: &mut Criterion) {
|
|||
c,
|
||||
"timestamp_encode_sequential",
|
||||
&LARGER_BATCH_SIZES,
|
||||
delorean::encoders::timestamp::encode,
|
||||
delorean_tsm::encoders::timestamp::encode,
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -177,7 +177,7 @@ fn float_encode_random(c: &mut Criterion) {
|
|||
.take(batch_size)
|
||||
.collect()
|
||||
},
|
||||
delorean::encoders::float::encode,
|
||||
delorean_tsm::encoders::float::encode,
|
||||
)
|
||||
}
|
||||
|
||||
|
@ -207,7 +207,7 @@ fn integer_encode_random(c: &mut Criterion) {
|
|||
.map(|_| rand::thread_rng().gen_range(0, 100))
|
||||
.collect()
|
||||
},
|
||||
delorean::encoders::integer::encode,
|
||||
delorean_tsm::encoders::integer::encode,
|
||||
)
|
||||
}
|
||||
|
||||
|
@ -232,7 +232,7 @@ fn float_encode_cpu(c: &mut Criterion) {
|
|||
"float_encode_cpu",
|
||||
&SMALLER_BATCH_SIZES,
|
||||
|batch_size| fixtures::CPU_F64_EXAMPLE_VALUES[..batch_size].to_vec(),
|
||||
delorean::encoders::float::encode,
|
||||
delorean_tsm::encoders::float::encode,
|
||||
)
|
||||
}
|
||||
|
||||
|
@ -244,10 +244,10 @@ fn float_decode_cpu(c: &mut Criterion) {
|
|||
|batch_size| {
|
||||
let decoded: Vec<f64> = fixtures::CPU_F64_EXAMPLE_VALUES[..batch_size].to_vec();
|
||||
let mut encoded = vec![];
|
||||
delorean::encoders::float::encode(&decoded, &mut encoded).unwrap();
|
||||
delorean_tsm::encoders::float::encode(&decoded, &mut encoded).unwrap();
|
||||
(decoded.len(), encoded)
|
||||
},
|
||||
delorean::encoders::float::decode,
|
||||
delorean_tsm::encoders::float::decode,
|
||||
)
|
||||
}
|
||||
|
||||
|
@ -259,10 +259,10 @@ fn float_decode_sequential(c: &mut Criterion) {
|
|||
|batch_size| {
|
||||
let decoded: Vec<f64> = (1..batch_size).map(convert_from_usize).collect();
|
||||
let mut encoded = vec![];
|
||||
delorean::encoders::float::encode(&decoded, &mut encoded).unwrap();
|
||||
delorean_tsm::encoders::float::encode(&decoded, &mut encoded).unwrap();
|
||||
(decoded.len(), encoded)
|
||||
},
|
||||
delorean::encoders::float::decode,
|
||||
delorean_tsm::encoders::float::decode,
|
||||
)
|
||||
}
|
||||
|
||||
|
@ -274,10 +274,10 @@ fn integer_decode_sequential(c: &mut Criterion) {
|
|||
|batch_size| {
|
||||
let decoded: Vec<i64> = (1..batch_size).map(convert_from_usize).collect();
|
||||
let mut encoded = vec![];
|
||||
delorean::encoders::integer::encode(&decoded, &mut encoded).unwrap();
|
||||
delorean_tsm::encoders::integer::encode(&decoded, &mut encoded).unwrap();
|
||||
(decoded.len(), encoded)
|
||||
},
|
||||
delorean::encoders::integer::decode,
|
||||
delorean_tsm::encoders::integer::decode,
|
||||
)
|
||||
}
|
||||
|
||||
|
@ -289,10 +289,10 @@ fn timestamp_decode_sequential(c: &mut Criterion) {
|
|||
|batch_size| {
|
||||
let decoded: Vec<i64> = (1..batch_size).map(convert_from_usize).collect();
|
||||
let mut encoded = vec![];
|
||||
delorean::encoders::timestamp::encode(&decoded, &mut encoded).unwrap();
|
||||
delorean_tsm::encoders::timestamp::encode(&decoded, &mut encoded).unwrap();
|
||||
(decoded.len(), encoded)
|
||||
},
|
||||
delorean::encoders::timestamp::decode,
|
||||
delorean_tsm::encoders::timestamp::decode,
|
||||
)
|
||||
}
|
||||
|
||||
|
@ -309,10 +309,10 @@ fn float_decode_random(c: &mut Criterion) {
|
|||
.collect();
|
||||
|
||||
let mut encoded = vec![];
|
||||
delorean::encoders::float::encode(&decoded, &mut encoded).unwrap();
|
||||
delorean_tsm::encoders::float::encode(&decoded, &mut encoded).unwrap();
|
||||
(decoded.len(), encoded)
|
||||
},
|
||||
delorean::encoders::float::decode,
|
||||
delorean_tsm::encoders::float::decode,
|
||||
)
|
||||
}
|
||||
|
||||
|
@ -326,10 +326,10 @@ fn integer_decode_random(c: &mut Criterion) {
|
|||
.map(|_| rand::thread_rng().gen_range(0, 100))
|
||||
.collect();
|
||||
let mut encoded = vec![];
|
||||
delorean::encoders::integer::encode(&decoded, &mut encoded).unwrap();
|
||||
delorean_tsm::encoders::integer::encode(&decoded, &mut encoded).unwrap();
|
||||
(decoded.len(), encoded)
|
||||
},
|
||||
delorean::encoders::integer::decode,
|
||||
delorean_tsm::encoders::integer::decode,
|
||||
)
|
||||
}
|
||||
|
||||
|
|
|
@ -1,456 +1,10 @@
|
|||
//! Types for reading and writing TSM files produced by InfluxDB >= 2.x
|
||||
pub mod encoders;
|
||||
pub mod mapper;
|
||||
pub mod reader;
|
||||
|
||||
use encoders::*;
|
||||
|
||||
use integer_encoding::VarInt;
|
||||
use std::convert::TryFrom;
|
||||
use std::error;
|
||||
use std::fmt;
|
||||
use std::io;
|
||||
use std::io::{BufRead, Seek, SeekFrom};
|
||||
use std::u64;
|
||||
|
||||
/// `TSMIndexReader` allows you to read index data within a TSM file.
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// Iterating over the TSM index.
|
||||
///
|
||||
/// ```
|
||||
/// # use delorean_tsm::*;
|
||||
/// # use libflate::gzip;
|
||||
/// # use std::fs::File;
|
||||
/// # use std::io::BufReader;
|
||||
/// # use std::io::Cursor;
|
||||
/// # use std::io::Read;
|
||||
/// # let file = File::open("../tests/fixtures/000000000000005-000000002.tsm.gz");
|
||||
/// # let mut decoder = gzip::Decoder::new(file.unwrap()).unwrap();
|
||||
/// # let mut buf = Vec::new();
|
||||
/// # decoder.read_to_end(&mut buf).unwrap();
|
||||
/// # let data_len = buf.len();
|
||||
/// # let r = Cursor::new(buf);
|
||||
///
|
||||
/// let reader = TSMIndexReader::try_new(BufReader::new(r), data_len).unwrap();
|
||||
///
|
||||
/// // reader allows you to access each index entry, and each block for each
|
||||
/// // entry in order.
|
||||
/// for index_entry in reader {
|
||||
/// match index_entry {
|
||||
/// Ok(entry) => {
|
||||
/// let key = entry.parse_key().unwrap();
|
||||
/// println!(
|
||||
/// "bucket id is {:?}, measurement name is {:?}",
|
||||
/// entry.bucket_id(),
|
||||
/// key.measurement,
|
||||
/// )
|
||||
/// }
|
||||
/// Err(e) => println!("got an error {:?}", e),
|
||||
/// }
|
||||
/// }
|
||||
/// ```
|
||||
///
|
||||
#[derive(Debug)]
|
||||
pub struct TSMIndexReader<R>
|
||||
where
|
||||
R: BufRead + Seek,
|
||||
{
|
||||
r: R,
|
||||
|
||||
curr_offset: u64,
|
||||
end_offset: u64,
|
||||
|
||||
curr: Option<IndexEntry>,
|
||||
next: Option<IndexEntry>,
|
||||
}
|
||||
|
||||
impl<R> TSMIndexReader<R>
|
||||
where
|
||||
R: BufRead + Seek,
|
||||
{
|
||||
pub fn try_new(mut r: R, len: usize) -> Result<Self, StorageError> {
|
||||
// determine offset to index, which is held in last 8 bytes of file.
|
||||
r.seek(SeekFrom::End(-8))?;
|
||||
let mut buf: [u8; 8] = [0; 8];
|
||||
r.read_exact(&mut buf)?;
|
||||
|
||||
let index_offset = u64::from_be_bytes(buf);
|
||||
r.seek(SeekFrom::Start(index_offset))?;
|
||||
|
||||
Ok(Self {
|
||||
r,
|
||||
curr_offset: index_offset,
|
||||
end_offset: len as u64 - 8,
|
||||
curr: None,
|
||||
next: None,
|
||||
})
|
||||
}
|
||||
|
||||
/// next_index_entry will return either the next index entry in a TSM file's
|
||||
/// index or will return an error. `next_index_entry` updates the offset on
|
||||
/// the Index, but it's the caller's responsibility to stop reading entries
|
||||
/// when the index has been exhausted.
|
||||
fn next_index_entry(&mut self) -> Result<IndexEntry, StorageError> {
|
||||
// read length of series key
|
||||
let mut buf: [u8; 2] = [0; 2];
|
||||
self.r.read_exact(&mut buf)?;
|
||||
self.curr_offset += 2;
|
||||
let key_len = u16::from_be_bytes(buf);
|
||||
|
||||
// read the series key itself
|
||||
let mut key_bytes = vec![0; key_len as usize]; // TODO(edd): re-use this
|
||||
self.r.read_exact(key_bytes.as_mut_slice())?;
|
||||
self.curr_offset += key_len as u64;
|
||||
|
||||
// read the block type
|
||||
self.r.read_exact(&mut buf[..1])?;
|
||||
self.curr_offset += 1;
|
||||
let block_type = buf[0];
|
||||
|
||||
// read how many blocks there are for this entry.
|
||||
self.r.read_exact(&mut buf)?;
|
||||
self.curr_offset += 2;
|
||||
let count = u16::from_be_bytes(buf);
|
||||
|
||||
Ok(IndexEntry {
|
||||
key: key_bytes,
|
||||
block_type,
|
||||
count,
|
||||
curr_block: 1,
|
||||
block: self.next_block_entry()?,
|
||||
})
|
||||
}
|
||||
|
||||
/// next_block_entry will return the next block entry within an index entry.
|
||||
/// It is the caller's responsibility to stop reading block entries when
|
||||
/// they have all been read for an index entry.
|
||||
fn next_block_entry(&mut self) -> Result<Block, StorageError> {
|
||||
// read min time on block entry
|
||||
let mut buf: [u8; 8] = [0; 8];
|
||||
self.r.read_exact(&mut buf[..])?;
|
||||
self.curr_offset += 8;
|
||||
let min_time = i64::from_be_bytes(buf);
|
||||
|
||||
// read max time on block entry
|
||||
self.r.read_exact(&mut buf[..])?;
|
||||
self.curr_offset += 8;
|
||||
let max_time = i64::from_be_bytes(buf);
|
||||
|
||||
// read block data offset
|
||||
self.r.read_exact(&mut buf[..])?;
|
||||
self.curr_offset += 8;
|
||||
let offset = u64::from_be_bytes(buf);
|
||||
|
||||
// read block size
|
||||
self.r.read_exact(&mut buf[..4])?;
|
||||
self.curr_offset += 4;
|
||||
let size = u32::from_be_bytes([buf[0], buf[1], buf[2], buf[3]]);
|
||||
|
||||
Ok(Block {
|
||||
min_time,
|
||||
max_time,
|
||||
offset,
|
||||
size,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl<R: BufRead + Seek> Iterator for TSMIndexReader<R> {
|
||||
type Item = Result<IndexEntry, StorageError>;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
if self.curr_offset == self.end_offset {
|
||||
// end of entries
|
||||
return None;
|
||||
}
|
||||
|
||||
match &self.curr {
|
||||
Some(curr) => {
|
||||
if curr.curr_block < curr.count {
|
||||
// there are more block entries for this index entry. Read
|
||||
// the next block entry.
|
||||
let mut next = curr.clone();
|
||||
match self.next_block_entry() {
|
||||
Ok(block) => next.block = block,
|
||||
Err(e) => return Some(Err(e)),
|
||||
}
|
||||
next.curr_block += 1;
|
||||
self.next = Some(next);
|
||||
} else {
|
||||
// no more block entries. Move onto the next entry.
|
||||
match self.next_index_entry() {
|
||||
Ok(entry) => self.next = Some(entry),
|
||||
Err(e) => return Some(Err(e)),
|
||||
}
|
||||
}
|
||||
}
|
||||
None => match self.next_index_entry() {
|
||||
Ok(entry) => self.next = Some(entry),
|
||||
Err(e) => return Some(Err(e)),
|
||||
},
|
||||
}
|
||||
|
||||
self.curr = self.next.clone();
|
||||
Some(Ok(self.curr.clone().unwrap()))
|
||||
}
|
||||
}
|
||||
|
||||
/// `IndexEntry` provides lazy accessors for components of the entry.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct IndexEntry {
|
||||
key: Vec<u8>,
|
||||
|
||||
pub block_type: u8,
|
||||
pub count: u16,
|
||||
pub block: Block,
|
||||
curr_block: u16,
|
||||
}
|
||||
|
||||
impl IndexEntry {
|
||||
/// Get the organization ID that this entry belongs to.
|
||||
pub fn org_id(&self) -> InfluxID {
|
||||
IndexEntry::extract_id_from_slice(&self.key[..8])
|
||||
}
|
||||
|
||||
/// Get the bucket ID that this entry belongs to.
|
||||
pub fn bucket_id(&self) -> InfluxID {
|
||||
IndexEntry::extract_id_from_slice(&self.key[8..16])
|
||||
}
|
||||
|
||||
fn extract_id_from_slice(data: &[u8]) -> InfluxID {
|
||||
let mut buf: [u8; 8] = [0; 8];
|
||||
buf.copy_from_slice(&data[..8]);
|
||||
InfluxID::from_be_bytes(buf)
|
||||
}
|
||||
|
||||
pub fn parse_key(&self) -> Result<ParsedTSMKey, StorageError> {
|
||||
parse_tsm_key(self.key.to_vec())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct ParsedTSMKey {
|
||||
pub measurement: String,
|
||||
pub tagset: Vec<(String, String)>,
|
||||
pub field_key: String,
|
||||
}
|
||||
|
||||
/// parse_tsm_key parses from the series key the measurement, field key and tag
|
||||
/// set.
|
||||
///
|
||||
/// It does not provide access to the org and bucket ids on the key, these can
|
||||
/// be accessed via org_id() and bucket_id() respectively.
|
||||
///
|
||||
/// TODO: handle escapes in the series key for , = and \t
|
||||
///
|
||||
fn parse_tsm_key(mut key: Vec<u8>) -> Result<ParsedTSMKey, StorageError> {
|
||||
// skip over org id, bucket id, comma, null byte (measurement) and =
|
||||
// The next n-1 bytes are the measurement name, where the nᵗʰ byte is a `,`.
|
||||
key = key.drain(8 + 8 + 1 + 1 + 1..).collect::<Vec<u8>>();
|
||||
let mut i = 0;
|
||||
// TODO(edd): can we make this work with take_while?
|
||||
while i != key.len() {
|
||||
if key[i] == b',' {
|
||||
break;
|
||||
}
|
||||
i += 1;
|
||||
}
|
||||
|
||||
let mut rem_key = key.drain(i..).collect::<Vec<u8>>();
|
||||
let measurement = String::from_utf8(key).map_err(|e| StorageError {
|
||||
description: e.to_string(),
|
||||
})?;
|
||||
|
||||
let mut tagset = Vec::<(String, String)>::with_capacity(10);
|
||||
let mut reading_key = true;
|
||||
let mut key = String::with_capacity(100);
|
||||
let mut value = String::with_capacity(100);
|
||||
|
||||
// skip the comma separating measurement tag
|
||||
for byte in rem_key.drain(1..) {
|
||||
match byte {
|
||||
44 => {
|
||||
// ,
|
||||
reading_key = true;
|
||||
tagset.push((key, value));
|
||||
key = String::with_capacity(250);
|
||||
value = String::with_capacity(250);
|
||||
}
|
||||
61 => {
|
||||
// =
|
||||
reading_key = false;
|
||||
}
|
||||
_ => {
|
||||
if reading_key {
|
||||
key.push(byte as char);
|
||||
} else {
|
||||
value.push(byte as char);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// fields are stored on the series keys in TSM indexes as follows:
|
||||
//
|
||||
// <field_key><4-byte delimiter><field_key>
|
||||
//
|
||||
// so we can trim the parsed value.
|
||||
let field_trim_length = (value.len() - 4) / 2;
|
||||
let (field, _) = value.split_at(field_trim_length);
|
||||
Ok(ParsedTSMKey {
|
||||
measurement,
|
||||
tagset,
|
||||
field_key: field.to_string(),
|
||||
})
|
||||
}
|
||||
|
||||
pub const F64_BLOCKTYPE_MARKER: u8 = 0;
|
||||
pub const I64_BLOCKTYPE_MARKER: u8 = 1;
|
||||
pub const BOOL_BLOCKTYPE_MARKER: u8 = 2;
|
||||
pub const STRING_BLOCKTYPE_MARKER: u8 = 3;
|
||||
pub const U64_BLOCKTYPE_MARKER: u8 = 4;
|
||||
|
||||
/// `TSMBlockReader` allows you to read and decode TSM blocks from within a TSM
|
||||
/// file.
|
||||
///
|
||||
#[derive(Debug)]
|
||||
pub struct TSMBlockReader<R>
|
||||
where
|
||||
R: BufRead + Seek,
|
||||
{
|
||||
r: R,
|
||||
}
|
||||
|
||||
impl<R> TSMBlockReader<R>
|
||||
where
|
||||
R: BufRead + Seek,
|
||||
{
|
||||
pub fn new(r: R) -> Self {
|
||||
Self { r }
|
||||
}
|
||||
|
||||
/// decode_block decodes a block whose location is described by the provided
|
||||
/// `Block`.
|
||||
///
|
||||
/// The components of the returned `BlockData` are guaranteed to have
|
||||
/// identical lengths.
|
||||
pub fn decode_block(&mut self, block: &Block) -> Result<BlockData, StorageError> {
|
||||
self.r.seek(SeekFrom::Start(block.offset))?;
|
||||
|
||||
let mut data: Vec<u8> = vec![0; block.size as usize];
|
||||
self.r.read_exact(&mut data)?;
|
||||
|
||||
// TODO(edd): skip 32-bit CRC checksum at beginning of block for now
|
||||
let mut idx = 4;
|
||||
|
||||
// determine the block type
|
||||
let block_type = data[idx];
|
||||
idx += 1;
|
||||
|
||||
// first decode the timestamp block.
|
||||
let mut ts: Vec<i64> = Vec::with_capacity(MAX_BLOCK_VALUES); // 1000 is the max block size
|
||||
let (len, n) = u64::decode_var(&data[idx..]); // size of timestamp block
|
||||
idx += n;
|
||||
timestamp::decode(&data[idx..idx + (len as usize)], &mut ts).map_err(|e| StorageError {
|
||||
description: e.to_string(),
|
||||
})?;
|
||||
idx += len as usize;
|
||||
|
||||
match block_type {
|
||||
F64_BLOCKTYPE_MARKER => {
|
||||
// values will be same length as time-stamps.
|
||||
let mut values: Vec<f64> = Vec::with_capacity(ts.len());
|
||||
float::decode_influxdb(&data[idx..], &mut values).map_err(|e| StorageError {
|
||||
description: e.to_string(),
|
||||
})?;
|
||||
|
||||
Ok(BlockData::Float { ts, values })
|
||||
}
|
||||
I64_BLOCKTYPE_MARKER => {
|
||||
// values will be same length as time-stamps.
|
||||
let mut values: Vec<i64> = Vec::with_capacity(ts.len());
|
||||
integer::decode(&data[idx..], &mut values).map_err(|e| StorageError {
|
||||
description: e.to_string(),
|
||||
})?;
|
||||
|
||||
Ok(BlockData::Integer { ts, values })
|
||||
}
|
||||
BOOL_BLOCKTYPE_MARKER => Err(StorageError {
|
||||
description: String::from("bool block type unsupported"),
|
||||
}),
|
||||
STRING_BLOCKTYPE_MARKER => Err(StorageError {
|
||||
description: String::from("string block type unsupported"),
|
||||
}),
|
||||
U64_BLOCKTYPE_MARKER => Err(StorageError {
|
||||
description: String::from("unsigned integer block type unsupported"),
|
||||
}),
|
||||
_ => Err(StorageError {
|
||||
description: format!("unsupported block type {:?}", block_type),
|
||||
}),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// `Block` holds information about location and time range of a block of data.
|
||||
#[derive(Debug, Copy, Clone)]
|
||||
#[allow(dead_code)]
|
||||
pub struct Block {
|
||||
pub min_time: i64,
|
||||
pub max_time: i64,
|
||||
pub offset: u64,
|
||||
pub size: u32,
|
||||
}
|
||||
|
||||
// MAX_BLOCK_VALUES is the maximum number of values a TSM block can store.
|
||||
const MAX_BLOCK_VALUES: usize = 1000;
|
||||
|
||||
/// `BlockData` describes the various types of block data that can be held within
|
||||
/// a TSM file.
|
||||
#[derive(Debug)]
|
||||
pub enum BlockData {
|
||||
Float { ts: Vec<i64>, values: Vec<f64> },
|
||||
Integer { ts: Vec<i64>, values: Vec<i64> },
|
||||
Bool { ts: Vec<i64>, values: Vec<bool> },
|
||||
Str { ts: Vec<i64>, values: Vec<String> },
|
||||
Unsigned { ts: Vec<i64>, values: Vec<u64> },
|
||||
}
|
||||
|
||||
impl BlockData {
|
||||
pub fn is_empty(&self) -> bool {
|
||||
match &self {
|
||||
BlockData::Float { ts, values: _ } => ts.is_empty(),
|
||||
BlockData::Integer { ts, values: _ } => ts.is_empty(),
|
||||
BlockData::Bool { ts, values: _ } => ts.is_empty(),
|
||||
BlockData::Str { ts, values: _ } => ts.is_empty(),
|
||||
BlockData::Unsigned { ts, values: _ } => ts.is_empty(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)]
|
||||
/// `InfluxID` represents an InfluxDB ID used in InfluxDB 2.x to represent
|
||||
/// organization and bucket identifiers.
|
||||
pub struct InfluxID(u64);
|
||||
|
||||
#[allow(dead_code)]
|
||||
impl InfluxID {
|
||||
fn new_str(s: &str) -> Result<InfluxID, StorageError> {
|
||||
let v = u64::from_str_radix(s, 16).map_err(|e| StorageError {
|
||||
description: e.to_string(),
|
||||
})?;
|
||||
Ok(InfluxID(v))
|
||||
}
|
||||
|
||||
fn from_be_bytes(bytes: [u8; 8]) -> InfluxID {
|
||||
InfluxID(u64::from_be_bytes(bytes))
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for InfluxID {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::result::Result<(), std::fmt::Error> {
|
||||
write!(f, "{:016x}", self.0)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct ParsedTSMKey {
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
///! Types for mapping and converting series data from TSM indexes produced by
|
||||
///! InfluxDB >= 2.x
|
||||
use super::reader::{TSMBlockReader, TSMIndexReader};
|
||||
use super::*;
|
||||
use super::{Block, BlockData, TSMError};
|
||||
|
||||
use std::collections::{BTreeMap, BTreeSet};
|
||||
use std::fmt::{Display, Formatter};
|
||||
|
@ -22,14 +22,14 @@ pub struct TSMMeasurementMapper<R>
|
|||
where
|
||||
R: BufRead + Seek,
|
||||
{
|
||||
iter: Peekable<TSMReader<R>>,
|
||||
iter: Peekable<TSMIndexReader<R>>,
|
||||
}
|
||||
|
||||
impl<R> TSMMeasurementMapper<R>
|
||||
where
|
||||
R: BufRead + Seek,
|
||||
{
|
||||
pub fn new(iter: Peekable<TSMReader<R>>) -> TSMMeasurementMapper<R> {
|
||||
pub fn new(iter: Peekable<TSMIndexReader<R>>) -> TSMMeasurementMapper<R> {
|
||||
TSMMeasurementMapper { iter }
|
||||
}
|
||||
}
|
||||
|
@ -350,7 +350,7 @@ trait BlockDecoder {
|
|||
fn block_data(&mut self, block: &Block) -> Result<BlockData, TSMError>;
|
||||
}
|
||||
|
||||
impl<R> BlockDecoder for &mut TSMReader<R>
|
||||
impl<R> BlockDecoder for &mut TSMBlockReader<R>
|
||||
where
|
||||
R: BufRead + Seek,
|
||||
{
|
||||
|
@ -658,12 +658,12 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn map_tsm_index() {
|
||||
let file = File::open("tests/fixtures/000000000000005-000000002.tsm.gz");
|
||||
let file = File::open("../tests/fixtures/000000000000005-000000002.tsm.gz");
|
||||
let mut decoder = gzip::Decoder::new(file.unwrap()).unwrap();
|
||||
let mut buf = Vec::new();
|
||||
decoder.read_to_end(&mut buf).unwrap();
|
||||
|
||||
let reader = TSMReader::try_new(BufReader::new(Cursor::new(buf)), 4_222_248).unwrap();
|
||||
let reader = TSMIndexReader::try_new(BufReader::new(Cursor::new(buf)), 4_222_248).unwrap();
|
||||
let mapper = TSMMeasurementMapper::new(reader.peekable());
|
||||
|
||||
// Although there are over 2,000 series keys in the TSM file, there are
|
||||
|
@ -683,19 +683,16 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn map_field_columns_file() {
|
||||
let file = File::open("tests/fixtures/000000000000005-000000002.tsm.gz");
|
||||
let file = File::open("../tests/fixtures/000000000000005-000000002.tsm.gz");
|
||||
let mut decoder = gzip::Decoder::new(file.unwrap()).unwrap();
|
||||
let mut buf = Vec::new();
|
||||
decoder.read_to_end(&mut buf).unwrap();
|
||||
|
||||
let reader = TSMReader::try_new(BufReader::new(Cursor::new(buf)), 4_222_248).unwrap();
|
||||
let mut mapper = TSMMeasurementMapper::new(reader.peekable());
|
||||
let index_reader =
|
||||
TSMIndexReader::try_new(BufReader::new(Cursor::new(&buf)), 4_222_248).unwrap();
|
||||
let mut mapper = TSMMeasurementMapper::new(index_reader.peekable());
|
||||
|
||||
let file2 = File::open("tests/fixtures/000000000000005-000000002.tsm.gz");
|
||||
let mut decoder2 = gzip::Decoder::new(file2.unwrap()).unwrap();
|
||||
let mut buf2 = Vec::new();
|
||||
decoder2.read_to_end(&mut buf2).unwrap();
|
||||
let mut reader2 = TSMReader::try_new(BufReader::new(Cursor::new(buf2)), 4_222_248).unwrap();
|
||||
let mut block_reader = TSMBlockReader::new(BufReader::new(Cursor::new(&buf)));
|
||||
|
||||
let mut cpu = mapper
|
||||
.find(|m| m.to_owned().unwrap().name == "cpu")
|
||||
|
@ -717,7 +714,8 @@ mod tests {
|
|||
];
|
||||
|
||||
for field_blocks in cpu.tag_set_fields_blocks.values_mut() {
|
||||
let (_, field_cols) = super::map_field_columns(&mut reader2, field_blocks).unwrap();
|
||||
let (_, field_cols) =
|
||||
super::map_field_columns(&mut block_reader, field_blocks).unwrap();
|
||||
let keys: Vec<&String> = field_cols.keys().collect();
|
||||
|
||||
// Every mapping between field blocks should result in columns
|
||||
|
@ -725,15 +723,81 @@ mod tests {
|
|||
assert_eq!(keys, exp_field_keys);
|
||||
}
|
||||
}
|
||||
// #[test]
|
||||
// fn gen_table_file() {
|
||||
// let file = File::open("../tests/fixtures/000000000000005-000000002.tsm.gz");
|
||||
// let mut decoder = gzip::Decoder::new(file.unwrap()).unwrap();
|
||||
// let mut buf = Vec::new();
|
||||
// decoder.read_to_end(&mut buf).unwrap();
|
||||
|
||||
// let index_reader =
|
||||
// TSMIndexReader::try_new(BufReader::new(Cursor::new(&buf)), 4_222_248).unwrap();
|
||||
// let block_reader = TSMBlockReader::new(BufReader::new(Cursor::new(&buf)));
|
||||
|
||||
// let mut index_mapper = TSMMeasurementMapper::new(index_reader.peekable());
|
||||
// let mut cpu = index_mapper
|
||||
// .find(|m| m.to_owned().unwrap().name == "cpu")
|
||||
// .unwrap()
|
||||
// .unwrap();
|
||||
|
||||
// // Get the tag set and the field blocks for the first of the tagsets associated with this measurement.
|
||||
// let (tag_set, field_blocks) = cpu.tag_set_fields_blocks.iter_mut().next().unwrap();
|
||||
|
||||
// let tag_keys: Vec<String> = cpu.tag_columns.iter().cloned().collect();
|
||||
// let field_keys: Vec<String> = cpu.field_columns.iter().cloned().collect();
|
||||
// let (time_column, field_data_columns) =
|
||||
// map_field_columns(block_reader, field_blocks).unwrap();
|
||||
|
||||
// let table = super::TableData::new(
|
||||
// tag_keys,
|
||||
// field_keys,
|
||||
// time_column,
|
||||
// tag_set.clone(),
|
||||
// field_data_columns,
|
||||
// );
|
||||
|
||||
// // println("{:?}", table.field_columns())
|
||||
// }
|
||||
|
||||
// fn next(&mut self) -> Option<Self::Item> {
|
||||
// let next = self.tag_set_fields_blocks.iter_mut().next();
|
||||
// match next {
|
||||
// Some((key, field_blocks)) => {
|
||||
// // FIXME - remove this cloning.
|
||||
// let tag_keys: Vec<String> = self.tag_columns.iter().cloned().collect();
|
||||
// let field_keys: Vec<String> = self.field_columns.iter().cloned().collect();
|
||||
|
||||
// // FIXME: get matching right.
|
||||
// let res = map_field_columns(self.block_decoder, field_blocks);
|
||||
// match res {
|
||||
// Ok((time_column, field_data_columns)) => {
|
||||
// let table = TableData::new(
|
||||
// tag_keys,
|
||||
// field_keys,
|
||||
// time_column,
|
||||
// key.clone(),
|
||||
// field_data_columns,
|
||||
// );
|
||||
// // TODO(edd): this is awful. Need something like the
|
||||
// // experimental pop_first function.
|
||||
// self.tag_set_fields_blocks.remove(key);
|
||||
|
||||
// Some(Ok(table))
|
||||
// }
|
||||
// Err(e) => return Some(Err(e)),
|
||||
// }
|
||||
// }
|
||||
// None => None,
|
||||
// }
|
||||
|
||||
#[test]
|
||||
fn measurement_table_columns() {
|
||||
let file = File::open("tests/fixtures/000000000000005-000000002.tsm.gz");
|
||||
let file = File::open("../tests/fixtures/000000000000005-000000002.tsm.gz");
|
||||
let mut decoder = gzip::Decoder::new(file.unwrap()).unwrap();
|
||||
let mut buf = Vec::new();
|
||||
decoder.read_to_end(&mut buf).unwrap();
|
||||
|
||||
let reader = TSMReader::try_new(BufReader::new(Cursor::new(buf)), 4_222_248).unwrap();
|
||||
let reader = TSMIndexReader::try_new(BufReader::new(Cursor::new(buf)), 4_222_248).unwrap();
|
||||
let mut mapper = TSMMeasurementMapper::new(reader.peekable());
|
||||
|
||||
let cpu = mapper
|
|
@ -1,6 +1,5 @@
|
|||
use snafu::Snafu;
|
||||
|
||||
use delorean::storage::StorageError;
|
||||
use delorean_ingest::Error as IngestError;
|
||||
use delorean_parquet::error::Error as DeloreanParquetError;
|
||||
use delorean_parquet::writer::Error as ParquetWriterError;
|
||||
|
|
|
@ -10,7 +10,6 @@ pub mod partitioned_store;
|
|||
pub mod predicate;
|
||||
pub mod remote_partition;
|
||||
pub mod s3_partition;
|
||||
pub mod tsm_mapper;
|
||||
|
||||
#[derive(Debug, Eq, PartialEq, Clone)]
|
||||
pub struct ReadPoint<T: Clone> {
|
||||
|
|
Loading…
Reference in New Issue