feat: add support for decoding blocks

pull/24376/head
Edd Robinson 2020-06-01 11:54:42 +01:00
parent 413738a264
commit b94d4ddd94
3 changed files with 250 additions and 140 deletions

View File

@ -2,97 +2,3 @@ pub mod float;
pub mod integer;
mod simple8b;
pub mod timestamp;
use crate::storage::StorageError;
/// Types implementing `Encoder` are able to encode themselves into compressed
/// blocks of data.
pub trait Encoder {
fn encode(&self, dst: &mut Vec<u8>) -> Result<(), StorageError>;
}
impl Encoder for Vec<f64> {
fn encode(&self, dst: &mut Vec<u8>) -> Result<(), StorageError> {
float::encode(&self, dst).map_err(|e| StorageError {
description: e.to_string(),
})
}
}
impl Encoder for Vec<i64> {
fn encode(&self, dst: &mut Vec<u8>) -> Result<(), StorageError> {
integer::encode(&self, dst).map_err(|e| StorageError {
description: e.to_string(),
})
}
}
impl Encoder for Vec<u64> {
fn encode(&self, _: &mut Vec<u8>) -> Result<(), StorageError> {
Err(StorageError {
description: String::from("not yet implemented"),
})
}
}
impl Encoder for Vec<&str> {
fn encode(&self, _: &mut Vec<u8>) -> Result<(), StorageError> {
Err(StorageError {
description: String::from("not yet implemented"),
})
}
}
impl Encoder for Vec<bool> {
fn encode(&self, _: &mut Vec<u8>) -> Result<(), StorageError> {
Err(StorageError {
description: String::from("not yet implemented"),
})
}
}
/// Types implementing `Decoder` are able to decode themselves from compressed
/// blocks of vectors of values.
pub trait Decoder<T> {
fn decode(dst: &mut Vec<T>, src: Vec<u8>) -> Result<(), StorageError>;
}
impl Decoder<f64> for Vec<f64> {
fn decode(dst: &mut Vec<f64>, src: Vec<u8>) -> Result<(), StorageError> {
float::decode(&src, dst).map_err(|e| StorageError {
description: e.to_string(),
})
}
}
impl Decoder<i64> for Vec<i64> {
fn decode(dst: &mut Vec<i64>, src: Vec<u8>) -> Result<(), StorageError> {
integer::decode(&src, dst).map_err(|e| StorageError {
description: e.to_string(),
})
}
}
impl Decoder<u64> for Vec<u64> {
fn decode(_: &mut Vec<u64>, _: Vec<u8>) -> Result<(), StorageError> {
Err(StorageError {
description: String::from("not yet implemented"),
})
}
}
impl Decoder<&str> for Vec<&str> {
fn decode(_: &mut Vec<&str>, _: Vec<u8>) -> Result<(), StorageError> {
Err(StorageError {
description: String::from("not yet implemented"),
})
}
}
impl Decoder<bool> for Vec<bool> {
fn decode(_: &mut Vec<bool>, _: Vec<u8>) -> Result<(), StorageError> {
Err(StorageError {
description: String::from("not yet implemented"),
})
}
}

View File

@ -160,8 +160,7 @@
//! ╚═════════════════════════════════════╝
//! ```
use crate::encoders::timestamp;
use crate::encoders::Encoder;
use crate::encoders::{float, integer, timestamp};
use crate::storage::StorageError;
use integer_encoding::*;
@ -202,6 +201,52 @@ impl BlockType for u64 {
type BlockSummary = UnsignedBlockSummary;
}
/// Types implementing `Encoder` are able to encode themselves into compressed
/// blocks of data.
pub trait Encoder {
fn encode(&self, dst: &mut Vec<u8>) -> Result<(), StorageError>;
}
impl Encoder for Vec<f64> {
fn encode(&self, dst: &mut Vec<u8>) -> Result<(), StorageError> {
float::encode(&self, dst).map_err(|e| StorageError {
description: e.to_string(),
})
}
}
impl Encoder for Vec<i64> {
fn encode(&self, dst: &mut Vec<u8>) -> Result<(), StorageError> {
integer::encode(&self, dst).map_err(|e| StorageError {
description: e.to_string(),
})
}
}
impl Encoder for Vec<u64> {
fn encode(&self, _: &mut Vec<u8>) -> Result<(), StorageError> {
Err(StorageError {
description: String::from("not yet implemented"),
})
}
}
impl Encoder for Vec<&str> {
fn encode(&self, _: &mut Vec<u8>) -> Result<(), StorageError> {
Err(StorageError {
description: String::from("not yet implemented"),
})
}
}
impl Encoder for Vec<bool> {
fn encode(&self, _: &mut Vec<u8>) -> Result<(), StorageError> {
Err(StorageError {
description: String::from("not yet implemented"),
})
}
}
/// `Hasher` provides a sub-set of the `std::hash::Hasher` API.
///
/// Specifically, only raw byte streams can be written, ensuring that the caller

View File

@ -1,7 +1,8 @@
use crate::encoders::*;
use crate::storage::StorageError;
use std::io::BufRead;
use std::io::{Read, Seek, SeekFrom};
use integer_encoding::VarInt;
use std::io::{BufRead, Seek, SeekFrom};
use std::u64;
pub struct TSMFile<R>
@ -39,8 +40,11 @@ where
}
}
pub struct Index<T: Read> {
r: T,
pub struct Index<R>
where
R: BufRead + Seek,
{
r: R,
curr_offset: u64,
end_offset: u64,
@ -48,7 +52,7 @@ pub struct Index<T: Read> {
next: Option<IndexEntry>,
}
impl<T: Read> Index<T> {
impl<R: BufRead + Seek> Index<R> {
/// read_index_entry will yield either the next index entry in a TSM file's index
/// or will return an error. read_index_entry updates the offset on the Index
/// but it's the caller's responsibility to stop reading entries when the index
@ -75,18 +79,11 @@ impl<T: Read> Index<T> {
self.curr_offset += 2;
let count = u16::from_be_bytes(buf);
let mut buf2: [u8; 8] = [0; 8];
buf2.copy_from_slice(&key_bytes[..8]);
let org_id = InfluxID::from_be_bytes(buf2);
buf2.copy_from_slice(&key_bytes[8..16]);
let bucket_id = InfluxID::from_be_bytes(buf2);
Ok(IndexEntry {
key: key_bytes,
org_id,
bucket_id,
_org_id: None,
_bucket_id: None,
// _measurement: None,
block_type,
count,
curr_block: 1,
@ -126,9 +123,74 @@ impl<T: Read> Index<T> {
size,
})
}
// decode_block decodes the current block pointed to by the provided index
// entry, returning two vectors containing the timestamps and values.
// decode_block will seek back to the original position in the index before
// returning.
//
// The vectors are guaranteed to have the same length, with a maximum length
// of 1000.
fn decode_block(&mut self, block: &Block) -> Result<BlockData, StorageError> {
self.r.seek(SeekFrom::Start(block.offset))?;
let mut data: Vec<u8> = vec![0; block.size as usize];
self.r.read_exact(&mut data)?;
// TODO(edd): skip 32-bit CRC checksum at beginning of block for now
let mut idx = 4;
// determine the block type
let block_type = data[idx];
idx += 1;
// first decode the timestamp block.
let mut ts: Vec<i64> = Vec::with_capacity(1000); // 1000 is the max block size
let (len, n) = u64::decode_var(&data[idx..]); // size of timestamp block
idx += n;
timestamp::decode(&data[idx..idx + (len as usize)], &mut ts).map_err(|e| StorageError {
description: e.to_string(),
})?;
idx += len as usize;
match block_type {
0 => {
// values will be same length as time-stamps.
let mut values: Vec<f64> = Vec::with_capacity(ts.len());
float::decode_influxdb(&data[idx..], &mut values).map_err(|e| StorageError {
description: e.to_string(),
})?;
// seek to original position in index before returning to caller.
self.r.seek(SeekFrom::Start(self.curr_offset))?;
Ok(BlockData::Float { ts, values })
}
1 => {
// values will be same length as time-stamps.
let mut values: Vec<i64> = Vec::with_capacity(ts.len());
integer::decode(&data[idx..], &mut values).map_err(|e| StorageError {
description: e.to_string(),
})?;
Ok(BlockData::Integer { ts, values })
}
2 => Err(StorageError {
description: String::from("bool block type unsupported"),
}),
3 => Err(StorageError {
description: String::from("string block type unsupported"),
}),
4 => Err(StorageError {
description: String::from("unsigned integer block type unsupported"),
}),
_ => Err(StorageError {
description: String::from(format!("unsupported block type {:?}", block_type)),
}),
}
}
}
impl<T: Read> Iterator for Index<T> {
impl<R: BufRead + Seek> Iterator for Index<R> {
type Item = Result<IndexEntry, StorageError>;
fn next(&mut self) -> Option<Result<IndexEntry, StorageError>> {
@ -168,6 +230,69 @@ impl<T: Read> Iterator for Index<T> {
}
}
#[derive(Clone)]
pub struct IndexEntry {
key: Vec<u8>,
_org_id: Option<InfluxID>,
_bucket_id: Option<InfluxID>,
// _measurement: Option<&'a str>,
block_type: u8,
count: u16,
block: Block,
curr_block: u16,
}
impl IndexEntry {
// org_id returns the organisation ID that this entry belongs to.
fn org_id(&mut self) -> InfluxID {
match &self._org_id {
Some(id) => id.clone(),
None => {
let mut buf2: [u8; 8] = [0; 8];
buf2.copy_from_slice(&self.key[..8]);
let id = InfluxID::from_be_bytes(buf2);
self._org_id = Some(id.clone());
id
}
}
}
// bucket_id returns the organisation ID that this entry belongs to.
fn bucket_id(&mut self) -> InfluxID {
match &self._bucket_id {
Some(id) => id.clone(),
None => {
let mut buf2: [u8; 8] = [0; 8];
buf2.copy_from_slice(&self.key[8..16]);
let id = InfluxID::from_be_bytes(buf2);
self._bucket_id = Some(id.clone());
id
}
}
}
}
/// BlockData describes the various types of block data that can be held within
/// a TSM file.
enum BlockData<'a> {
Float { ts: Vec<i64>, values: Vec<f64> },
Integer { ts: Vec<i64>, values: Vec<i64> },
Bool { ts: Vec<i64>, values: Vec<bool> },
Str { ts: Vec<i64>, values: Vec<&'a str> },
Unsigned { ts: Vec<i64>, values: Vec<u64> },
}
#[derive(Copy, Clone)]
pub struct Block {
min_time: i64,
max_time: i64,
offset: u64,
size: u32,
}
#[derive(Clone, Debug)]
/// InfluxID represents an InfluxDB ID used in InfluxDB 2.x to represent
/// organization and bucket identifiers.
@ -199,39 +324,20 @@ impl std::cmp::PartialEq for InfluxID {
}
}
#[derive(Clone)]
pub struct IndexEntry {
key: Vec<u8>,
org_id: InfluxID,
bucket_id: InfluxID,
block_type: u8,
count: u16,
block: Block,
curr_block: u16,
}
#[derive(Copy, Clone)]
pub struct Block {
min_time: i64,
max_time: i64,
offset: u64,
size: u32,
}
#[cfg(test)]
mod tests {
use super::*;
use libflate::gzip::Decoder;
use libflate::gzip;
use std::fs::File;
use std::i64;
use std::io::BufReader;
use std::io::Cursor;
use std::io::Read;
#[test]
fn read_tsm_index() {
let file = File::open("tests/fixtures/000000000000005-000000002.tsm.gz");
let mut decoder = Decoder::new(file.unwrap()).unwrap();
let mut decoder = gzip::Decoder::new(file.unwrap()).unwrap();
let mut buf = Vec::new();
decoder.read_to_end(&mut buf).unwrap();
@ -245,7 +351,7 @@ mod tests {
#[test]
fn read_tsm_block() {
let file = File::open("tests/fixtures/000000000000005-000000002.tsm.gz");
let mut decoder = Decoder::new(file.unwrap()).unwrap();
let mut decoder = gzip::Decoder::new(file.unwrap()).unwrap();
let mut buf = Vec::new();
decoder.read_to_end(&mut buf).unwrap();
@ -264,18 +370,21 @@ mod tests {
for index_entry in index {
match index_entry {
Ok(entry) => {
got_blocks += entry.count as u64;
// TODO(edd): this is surely not the right way. I should be
// returning mutable references from the iterator.
let mut e = entry.clone();
got_blocks += e.count as u64;
if entry.block.min_time < got_min_time {
got_min_time = entry.block.min_time;
got_min_time = e.block.min_time;
}
if entry.block.max_time > got_max_time {
got_max_time = entry.block.max_time;
got_max_time = e.block.max_time;
}
assert_eq!(entry.org_id, org_id);
assert_eq!(entry.bucket_id, bucket_id);
assert_eq!(e.org_id(), org_id);
assert_eq!(e.bucket_id(), bucket_id);
}
Err(e) => panic!("{:?} {:?}", e, got_blocks),
}
@ -286,6 +395,56 @@ mod tests {
assert_eq!(got_max_time, 1590597378379824000); // latest time is 2020-05-27T16:36:18.379824Z
}
#[test]
fn decode_tsm_blocks() {
let file = File::open("tests/fixtures/000000000000005-000000002.tsm.gz");
let mut decoder = gzip::Decoder::new(file.unwrap()).unwrap();
let mut buf = Vec::new();
decoder.read_to_end(&mut buf).unwrap();
let mut reader = TSMFile::new(BufReader::new(Cursor::new(buf)), 4_222_248);
let mut index = reader.index().unwrap();
let mut blocks = vec![];
// Find the float block with offset 5339 in the file.
let f64_entry = index
.find(|e| {
e.as_ref().unwrap().block.offset == 5339 && e.as_ref().unwrap().block_type == 0_u8
})
.unwrap()
.unwrap();
let f64_block = &index.decode_block(&f64_entry.block).unwrap();
blocks.push(f64_block);
// // Find the first integer block index entry in the file.
// let i64_entry = index
// .find(|e| e.as_ref().unwrap().block_type == 1_u8)
// .unwrap()
// .unwrap();
// blocks.push(&index.decode_block(&i64_entry.block).unwrap());
for block in blocks {
// The first integer block in the value should have 509 values in it.
match block {
BlockData::Float { ts, values } => {
assert_eq!(ts.len(), 507);
assert_eq!(values.len(), 507);
}
BlockData::Integer { ts, values } => {
assert_eq!(ts.len(), 509);
assert_eq!(values.len(), 509);
}
BlockData::Bool { ts: _, values: _ } => {
panic!("should not have decoded bool block")
}
BlockData::Str { ts: _, values: _ } => panic!("should not have decoded str block"),
BlockData::Unsigned { ts: _, values: _ } => {
panic!("should not have decoded unsigned block")
}
}
}
}
#[test]
fn influx_id() {
let id = InfluxID::new_str("20aa9b0").unwrap();