parent
bf884ff3d3
commit
d7dbf061cb
|
@ -767,6 +767,7 @@ dependencies = [
|
|||
"integer-encoding",
|
||||
"libflate",
|
||||
"rand 0.7.3",
|
||||
"snap",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
|
|
@ -8,6 +8,7 @@ edition = "2018"
|
|||
|
||||
[dependencies]
|
||||
integer-encoding = "1.0.7"
|
||||
snap = "1.0.0"
|
||||
|
||||
[dev-dependencies]
|
||||
hex = "0.4.2"
|
||||
|
|
|
@ -1,7 +1,11 @@
|
|||
pub mod float;
|
||||
pub mod integer;
|
||||
mod simple8b;
|
||||
pub mod string;
|
||||
pub mod timestamp;
|
||||
|
||||
/// Max number of bytes needed to store a varint-encoded 32-bit integer.
|
||||
const MAX_VAR_INT_32: usize = 5;
|
||||
|
||||
/// Max number of bytes needed to store a varint-encoded 64-bit integer.
|
||||
const MAX_VAR_INT_64: usize = 10;
|
||||
|
|
|
@ -0,0 +1,183 @@
|
|||
use integer_encoding::VarInt;
|
||||
use std::{convert::TryInto, error::Error};
|
||||
|
||||
/// A compressed encoding using Snappy compression. Snappy is the only available string compression
|
||||
/// format at this time.
|
||||
const STRING_COMPRESSED_SNAPPY: u8 = 1;
|
||||
/// The header consists of one byte indicating the compression type.
|
||||
const HEADER_LEN: usize = 1;
|
||||
|
||||
/// Encodes a slice of string slices into a vector of bytes. Currently uses Snappy compression.
|
||||
pub fn encode<T: AsRef<str>>(src: &[T], dst: &mut Vec<u8>) -> Result<(), Box<dyn Error>> {
|
||||
dst.truncate(0); // reset buffer
|
||||
if src.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// strings shouldn't be longer than 64kb
|
||||
let length_of_lengths = src.len() * super::MAX_VAR_INT_32;
|
||||
let sum_of_lengths: usize = src.iter().map(|s| s.as_ref().len()).sum();
|
||||
let source_size = 2 + length_of_lengths + sum_of_lengths;
|
||||
|
||||
// determine the maximum possible length needed for the buffer, which
|
||||
// includes the compressed size
|
||||
let max_encoded_len = snap::raw::max_compress_len(source_size);
|
||||
if max_encoded_len == 0 {
|
||||
return Err("source length too large".into());
|
||||
}
|
||||
let compressed_size = max_encoded_len + HEADER_LEN;
|
||||
let total_size = source_size + compressed_size;
|
||||
|
||||
if dst.len() < total_size {
|
||||
dst.resize(total_size, 0);
|
||||
}
|
||||
|
||||
// write the data to be compressed *after* the space needed for snappy
|
||||
// compression. The compressed data is at the start of the allocated buffer,
|
||||
// ensuring the entire capacity is returned and available for subsequent use.
|
||||
let (compressed_data, data) = dst.split_at_mut(compressed_size);
|
||||
let mut n = 0;
|
||||
for s in src {
|
||||
let s = s.as_ref();
|
||||
let len = s.len();
|
||||
let len_u64: u64 = len.try_into()?;
|
||||
n += len_u64.encode_var(&mut data[n..]);
|
||||
data[n..n + len].copy_from_slice(s.as_bytes());
|
||||
n += len;
|
||||
}
|
||||
let data = &data[..n];
|
||||
|
||||
let (header, compressed_data) = compressed_data.split_at_mut(HEADER_LEN);
|
||||
|
||||
header[0] = STRING_COMPRESSED_SNAPPY << 4; // write compression type
|
||||
|
||||
// TODO: snap docs say it is beneficial to reuse an `Encoder` when possible
|
||||
let mut encoder = snap::raw::Encoder::new();
|
||||
let actual_compressed_size = encoder.compress(data, compressed_data)?;
|
||||
|
||||
dst.truncate(HEADER_LEN + actual_compressed_size);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Decodes a slice of bytes representing Snappy-compressed data into a vector of `String`s.
|
||||
pub fn decode(src: &[u8], dst: &mut Vec<String>) -> Result<(), Box<dyn Error>> {
|
||||
if src.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let mut decoder = snap::raw::Decoder::new();
|
||||
// First byte stores the encoding type, only have snappy format
|
||||
// currently so ignore for now.
|
||||
let decoded_bytes = decoder.decompress_vec(&src[HEADER_LEN..])?;
|
||||
|
||||
if dst.capacity() == 0 {
|
||||
dst.reserve_exact(64);
|
||||
}
|
||||
|
||||
let num_decoded_bytes = decoded_bytes.len();
|
||||
let mut i = 0;
|
||||
|
||||
while i < num_decoded_bytes {
|
||||
let (length, num_bytes_read) = u64::decode_var(&decoded_bytes[i..]);
|
||||
let length: usize = length.try_into()?;
|
||||
if num_bytes_read == 0 {
|
||||
return Err("invalid encoded string length".into());
|
||||
}
|
||||
|
||||
let lower = i + num_bytes_read;
|
||||
let upper = lower + length;
|
||||
|
||||
if upper < lower {
|
||||
return Err("length overflow".into());
|
||||
}
|
||||
if upper > num_decoded_bytes {
|
||||
return Err("short buffer".into());
|
||||
}
|
||||
|
||||
dst.push(std::str::from_utf8(&decoded_bytes[lower..upper])?.to_string());
|
||||
|
||||
// The length of this string plus the length of the variable byte encoded length
|
||||
i += length + num_bytes_read;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn encode_no_values() {
|
||||
let src: Vec<&str> = vec![];
|
||||
let mut dst = vec![];
|
||||
|
||||
// check for error
|
||||
encode(&src, &mut dst).expect("failed to encode src");
|
||||
|
||||
// verify encoded no values.
|
||||
assert_eq!(dst.to_vec().len(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn encode_single() {
|
||||
let src = vec!["v1"];
|
||||
let mut dst = vec![];
|
||||
|
||||
encode(&src, &mut dst).expect("failed to encode src");
|
||||
assert_eq!(dst, vec![16, 3, 8, 2, 118, 49]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn encode_multi_compressed() {
|
||||
let src: Vec<_> = (0..10).map(|i| format!("value {}", i)).collect();
|
||||
let mut dst = vec![];
|
||||
|
||||
encode(&src, &mut dst).expect("failed to encode src");
|
||||
assert_eq!(
|
||||
dst,
|
||||
vec![
|
||||
16, 80, 28, 7, 118, 97, 108, 117, 101, 32, 48, 13, 8, 0, 49, 13, 8, 0, 50, 13, 8,
|
||||
0, 51, 13, 8, 0, 52, 13, 8, 0, 53, 13, 8, 0, 54, 13, 8, 0, 55, 13, 8, 32, 56, 7,
|
||||
118, 97, 108, 117, 101, 32, 57
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn decode_no_values() {
|
||||
let src: Vec<u8> = vec![];
|
||||
let mut dst = vec![];
|
||||
|
||||
// check for error
|
||||
decode(&src, &mut dst).expect("failed to decode src");
|
||||
|
||||
// verify decoded no values.
|
||||
assert_eq!(dst.to_vec().len(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn decode_single() {
|
||||
let src = vec![16, 3, 8, 2, 118, 49];
|
||||
let mut dst = vec![];
|
||||
|
||||
decode(&src, &mut dst).expect("failed to decode src");
|
||||
assert_eq!(dst, vec!["v1"]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn decode_multi_compressed() {
|
||||
let src = vec![
|
||||
16, 80, 28, 7, 118, 97, 108, 117, 101, 32, 48, 13, 8, 0, 49, 13, 8, 0, 50, 13, 8, 0,
|
||||
51, 13, 8, 0, 52, 13, 8, 0, 53, 13, 8, 0, 54, 13, 8, 0, 55, 13, 8, 32, 56, 7, 118, 97,
|
||||
108, 117, 101, 32, 57,
|
||||
];
|
||||
let mut dst = vec![];
|
||||
|
||||
decode(&src, &mut dst).expect("failed to decode src");
|
||||
|
||||
let expected: Vec<_> = (0..10).map(|i| format!("value {}", i)).collect();
|
||||
assert_eq!(dst, expected);
|
||||
}
|
||||
}
|
|
@ -295,11 +295,12 @@ where
|
|||
description: String::from("bool block type unsupported"),
|
||||
}),
|
||||
BlockType::Str => {
|
||||
let len = ts.len();
|
||||
Ok(BlockData::Str {
|
||||
ts,
|
||||
values: vec!["unsupported string!!".to_string(); len as usize],
|
||||
})
|
||||
// values will be same length as time-stamps.
|
||||
let mut values = Vec::with_capacity(ts.len());
|
||||
encoders::string::decode(&data[idx..], &mut values).map_err(|e| TSMError {
|
||||
description: e.to_string(),
|
||||
})?;
|
||||
Ok(BlockData::Str { ts, values })
|
||||
}
|
||||
BlockType::Unsigned => Err(TSMError {
|
||||
description: String::from("unsigned integer block type unsupported"),
|
||||
|
@ -459,9 +460,6 @@ mod tests {
|
|||
BlockType::Bool => {
|
||||
eprintln!("Note: ignoring bool block, not implemented");
|
||||
}
|
||||
BlockType::Str => {
|
||||
eprintln!("Note: ignoring Str block, not implemented");
|
||||
}
|
||||
BlockType::Unsigned => {
|
||||
eprintln!("Note: ignoring unsigned block, not implemented");
|
||||
}
|
||||
|
|
|
@ -7,6 +7,7 @@
|
|||
//!
|
||||
//! - f64 (float blocks);
|
||||
//! - i64 (signed integer blocks);
|
||||
//! - String;
|
||||
//!
|
||||
//! Other block types are ready to be supported when the appropriate encoders
|
||||
//! have been implemented.
|
||||
|
@ -161,7 +162,7 @@
|
|||
//! ```
|
||||
|
||||
use crate::storage::StorageError;
|
||||
use delorean_tsm::encoders::{float, integer, timestamp};
|
||||
use delorean_tsm::encoders::{float, integer, string, timestamp};
|
||||
|
||||
use integer_encoding::*;
|
||||
use num::bigint::{BigInt, BigUint};
|
||||
|
@ -238,9 +239,9 @@ impl Encoder for Vec<u64> {
|
|||
}
|
||||
|
||||
impl Encoder for Vec<&str> {
|
||||
fn encode(&self, _: &mut Vec<u8>) -> Result<(), StorageError> {
|
||||
Err(StorageError {
|
||||
description: String::from("not yet implemented"),
|
||||
fn encode(&self, dst: &mut Vec<u8>) -> Result<(), StorageError> {
|
||||
string::encode(&self, dst).map_err(|e| StorageError {
|
||||
description: e.to_string(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue