fix: Encode and decode string data as bytes
String data isn't guaranteed to be UTF-8pull/24376/head
parent
672d3fe668
commit
1e341a7321
|
@ -597,7 +597,7 @@ impl TSMFileConverter {
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
// this will create a column of repeated None values.
|
// this will create a column of repeated None values.
|
||||||
let col: Vec<Option<String>> = vec![None; col_len];
|
let col: Vec<Option<Vec<u8>>> = vec![None; col_len];
|
||||||
packed_columns[*idx] = Packers::from(col);
|
packed_columns[*idx] = Packers::from(col);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -650,7 +650,7 @@ impl TSMFileConverter {
|
||||||
packed_columns[*idx] = Packers::from(col);
|
packed_columns[*idx] = Packers::from(col);
|
||||||
}
|
}
|
||||||
BlockType::Str => {
|
BlockType::Str => {
|
||||||
let col: Vec<Option<String>> = vec![None; col_len];
|
let col: Vec<Option<Vec<u8>>> = vec![None; col_len];
|
||||||
packed_columns[*idx] = Packers::from(col);
|
packed_columns[*idx] = Packers::from(col);
|
||||||
}
|
}
|
||||||
BlockType::Unsigned => {
|
BlockType::Unsigned => {
|
||||||
|
|
|
@ -152,13 +152,13 @@ impl std::convert::From<delorean_table_schema::DataType> for Packers {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl std::convert::From<Vec<Option<String>>> for Packers {
|
impl std::convert::From<Vec<Option<Vec<u8>>>> for Packers {
|
||||||
fn from(values: Vec<Option<String>>) -> Self {
|
fn from(values: Vec<Option<Vec<u8>>>) -> Self {
|
||||||
// TODO(edd): convert this with an iterator?
|
// TODO(edd): convert this with an iterator?
|
||||||
let mut as_byte_array: Vec<Option<ByteArray>> = Vec::with_capacity(values.len());
|
let mut as_byte_array: Vec<Option<ByteArray>> = Vec::with_capacity(values.len());
|
||||||
for v in values {
|
for v in values {
|
||||||
match v {
|
match v {
|
||||||
Some(v) => as_byte_array.push(Some(ByteArray::from(v.as_str()))),
|
Some(v) => as_byte_array.push(Some(ByteArray::from(v))),
|
||||||
None => as_byte_array.push(None),
|
None => as_byte_array.push(None),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,8 +9,9 @@ const HEADER_LEN: usize = 1;
|
||||||
/// Store `i32::MAX` as a `usize` for comparing with lengths in assertions
|
/// Store `i32::MAX` as a `usize` for comparing with lengths in assertions
|
||||||
const MAX_I32: usize = i32::MAX as usize;
|
const MAX_I32: usize = i32::MAX as usize;
|
||||||
|
|
||||||
/// Encodes a slice of string slices into a vector of bytes. Currently uses Snappy compression.
|
/// Encodes a slice of byte slices representing string data into a vector of bytes. Currently uses
|
||||||
pub fn encode<T: AsRef<str>>(src: &[T], dst: &mut Vec<u8>) -> Result<(), Box<dyn Error>> {
|
/// Snappy compression.
|
||||||
|
pub fn encode(src: &[&[u8]], dst: &mut Vec<u8>) -> Result<(), Box<dyn Error>> {
|
||||||
dst.truncate(0); // reset buffer
|
dst.truncate(0); // reset buffer
|
||||||
if src.is_empty() {
|
if src.is_empty() {
|
||||||
return Ok(());
|
return Ok(());
|
||||||
|
@ -21,7 +22,7 @@ pub fn encode<T: AsRef<str>>(src: &[T], dst: &mut Vec<u8>) -> Result<(), Box<dyn
|
||||||
let sum_of_lengths: usize = src
|
let sum_of_lengths: usize = src
|
||||||
.iter()
|
.iter()
|
||||||
.map(|s| {
|
.map(|s| {
|
||||||
let len = s.as_ref().len();
|
let len = s.len();
|
||||||
assert!(len < MAX_I32);
|
assert!(len < MAX_I32);
|
||||||
len
|
len
|
||||||
})
|
})
|
||||||
|
@ -47,11 +48,10 @@ pub fn encode<T: AsRef<str>>(src: &[T], dst: &mut Vec<u8>) -> Result<(), Box<dyn
|
||||||
let (compressed_data, data) = dst.split_at_mut(compressed_size);
|
let (compressed_data, data) = dst.split_at_mut(compressed_size);
|
||||||
let mut n = 0;
|
let mut n = 0;
|
||||||
for s in src {
|
for s in src {
|
||||||
let s = s.as_ref();
|
|
||||||
let len = s.len();
|
let len = s.len();
|
||||||
let len_u64: u64 = len.try_into()?;
|
let len_u64: u64 = len.try_into()?;
|
||||||
n += len_u64.encode_var(&mut data[n..]);
|
n += len_u64.encode_var(&mut data[n..]);
|
||||||
data[n..n + len].copy_from_slice(s.as_bytes());
|
data[n..n + len].copy_from_slice(s);
|
||||||
n += len;
|
n += len;
|
||||||
}
|
}
|
||||||
let data = &data[..n];
|
let data = &data[..n];
|
||||||
|
@ -69,8 +69,9 @@ pub fn encode<T: AsRef<str>>(src: &[T], dst: &mut Vec<u8>) -> Result<(), Box<dyn
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Decodes a slice of bytes representing Snappy-compressed data into a vector of `String`s.
|
/// Decodes a slice of bytes representing Snappy-compressed data into a vector of vectors of bytes
|
||||||
pub fn decode(src: &[u8], dst: &mut Vec<String>) -> Result<(), Box<dyn Error>> {
|
/// representing string data, which may or may not be valid UTF-8.
|
||||||
|
pub fn decode(src: &[u8], dst: &mut Vec<Vec<u8>>) -> Result<(), Box<dyn Error>> {
|
||||||
if src.is_empty() {
|
if src.is_empty() {
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
@ -104,7 +105,7 @@ pub fn decode(src: &[u8], dst: &mut Vec<String>) -> Result<(), Box<dyn Error>> {
|
||||||
return Err("short buffer".into());
|
return Err("short buffer".into());
|
||||||
}
|
}
|
||||||
|
|
||||||
dst.push(std::str::from_utf8(&decoded_bytes[lower..upper])?.to_string());
|
dst.push(decoded_bytes[lower..upper].to_vec());
|
||||||
|
|
||||||
// The length of this string plus the length of the variable byte encoded length
|
// The length of this string plus the length of the variable byte encoded length
|
||||||
i += length + num_bytes_read;
|
i += length + num_bytes_read;
|
||||||
|
@ -119,7 +120,7 @@ mod tests {
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn encode_no_values() {
|
fn encode_no_values() {
|
||||||
let src: Vec<&str> = vec![];
|
let src: Vec<&[u8]> = vec![];
|
||||||
let mut dst = vec![];
|
let mut dst = vec![];
|
||||||
|
|
||||||
// check for error
|
// check for error
|
||||||
|
@ -131,7 +132,8 @@ mod tests {
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn encode_single() {
|
fn encode_single() {
|
||||||
let src = vec!["v1"];
|
let v1_bytes = b"v1";
|
||||||
|
let src = vec![&v1_bytes[..]];
|
||||||
let mut dst = vec![];
|
let mut dst = vec![];
|
||||||
|
|
||||||
encode(&src, &mut dst).expect("failed to encode src");
|
encode(&src, &mut dst).expect("failed to encode src");
|
||||||
|
@ -140,7 +142,8 @@ mod tests {
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn encode_multi_compressed() {
|
fn encode_multi_compressed() {
|
||||||
let src: Vec<_> = (0..10).map(|i| format!("value {}", i)).collect();
|
let src_strings: Vec<_> = (0..10).map(|i| format!("value {}", i)).collect();
|
||||||
|
let src: Vec<_> = src_strings.iter().map(|s| s.as_bytes()).collect();
|
||||||
let mut dst = vec![];
|
let mut dst = vec![];
|
||||||
|
|
||||||
encode(&src, &mut dst).expect("failed to encode src");
|
encode(&src, &mut dst).expect("failed to encode src");
|
||||||
|
@ -172,7 +175,12 @@ mod tests {
|
||||||
let mut dst = vec![];
|
let mut dst = vec![];
|
||||||
|
|
||||||
decode(&src, &mut dst).expect("failed to decode src");
|
decode(&src, &mut dst).expect("failed to decode src");
|
||||||
assert_eq!(dst, vec!["v1"]);
|
|
||||||
|
let dst_as_strings: Vec<_> = dst
|
||||||
|
.iter()
|
||||||
|
.map(|s| std::str::from_utf8(s).unwrap())
|
||||||
|
.collect();
|
||||||
|
assert_eq!(dst_as_strings, vec!["v1"]);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
@ -186,7 +194,11 @@ mod tests {
|
||||||
|
|
||||||
decode(&src, &mut dst).expect("failed to decode src");
|
decode(&src, &mut dst).expect("failed to decode src");
|
||||||
|
|
||||||
|
let dst_as_strings: Vec<_> = dst
|
||||||
|
.iter()
|
||||||
|
.map(|s| std::str::from_utf8(s).unwrap())
|
||||||
|
.collect();
|
||||||
let expected: Vec<_> = (0..10).map(|i| format!("value {}", i)).collect();
|
let expected: Vec<_> = (0..10).map(|i| format!("value {}", i)).collect();
|
||||||
assert_eq!(dst, expected);
|
assert_eq!(dst_as_strings, expected);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -129,7 +129,7 @@ pub enum BlockData {
|
||||||
Float { ts: Vec<i64>, values: Vec<f64> },
|
Float { ts: Vec<i64>, values: Vec<f64> },
|
||||||
Integer { ts: Vec<i64>, values: Vec<i64> },
|
Integer { ts: Vec<i64>, values: Vec<i64> },
|
||||||
Bool { ts: Vec<i64>, values: Vec<bool> },
|
Bool { ts: Vec<i64>, values: Vec<bool> },
|
||||||
Str { ts: Vec<i64>, values: Vec<String> },
|
Str { ts: Vec<i64>, values: Vec<Vec<u8>> },
|
||||||
Unsigned { ts: Vec<i64>, values: Vec<u64> },
|
Unsigned { ts: Vec<i64>, values: Vec<u64> },
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -15,7 +15,7 @@ use std::iter::Peekable;
|
||||||
/// The main purpose of the `TSMMeasurementMapper` is to provide a
|
/// The main purpose of the `TSMMeasurementMapper` is to provide a
|
||||||
/// transformation step that allows one to convert per-series/per-field data
|
/// transformation step that allows one to convert per-series/per-field data
|
||||||
/// into measurement-oriented table data.
|
/// into measurement-oriented table data.
|
||||||
///
|
///
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct TSMMeasurementMapper<R>
|
pub struct TSMMeasurementMapper<R>
|
||||||
where
|
where
|
||||||
|
@ -199,7 +199,7 @@ pub enum ColumnData {
|
||||||
Float(Vec<Option<f64>>),
|
Float(Vec<Option<f64>>),
|
||||||
Integer(Vec<Option<i64>>),
|
Integer(Vec<Option<i64>>),
|
||||||
Bool(Vec<Option<bool>>),
|
Bool(Vec<Option<bool>>),
|
||||||
Str(Vec<Option<String>>),
|
Str(Vec<Option<Vec<u8>>>),
|
||||||
Unsigned(Vec<Option<u64>>),
|
Unsigned(Vec<Option<u64>>),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -209,7 +209,7 @@ enum ValuePair {
|
||||||
F64((i64, f64)),
|
F64((i64, f64)),
|
||||||
I64((i64, i64)),
|
I64((i64, i64)),
|
||||||
Bool((i64, bool)),
|
Bool((i64, bool)),
|
||||||
Str((i64, String)),
|
Str((i64, Vec<u8>)),
|
||||||
U64((i64, u64)),
|
U64((i64, u64)),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -240,7 +240,8 @@ impl Encoder for Vec<u64> {
|
||||||
|
|
||||||
impl Encoder for Vec<&str> {
|
impl Encoder for Vec<&str> {
|
||||||
fn encode(&self, dst: &mut Vec<u8>) -> Result<(), StorageError> {
|
fn encode(&self, dst: &mut Vec<u8>) -> Result<(), StorageError> {
|
||||||
string::encode(&self, dst).map_err(|e| StorageError {
|
let bytes: Vec<_> = self.iter().map(|s| s.as_bytes()).collect();
|
||||||
|
string::encode(&bytes, dst).map_err(|e| StorageError {
|
||||||
description: e.to_string(),
|
description: e.to_string(),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue