Merge pull request #236 from influxdata/alamb/fix-block-errors

fix: update error handling in block.rs
pull/24376/head
Andrew Lamb 2020-07-08 17:30:29 -04:00 committed by GitHub
commit 13309e10f2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 64 additions and 59 deletions

View File

@ -163,16 +163,32 @@
//! ╚═════════════════════════════════════╝
//! ```
use crate::storage::StorageError;
use delorean_tsm::encoders::{boolean, float, integer, string, timestamp, unsigned};
use integer_encoding::*;
use num::bigint::{BigInt, BigUint};
use snafu::{OptionExt, ResultExt, Snafu};
use std::convert::TryInto;
use std::io::{Seek, SeekFrom, Write};
use std::{u16, u32};
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Block encoding error: {}", source))]
EncodingError { source: Box<dyn std::error::Error> },
#[snafu(display("Block cannot find summary / empty block"))]
CannotFindSummary {},
#[snafu(display("Block encoder I/O error while writing: {}", source))]
WritingError { source: std::io::Error },
#[snafu(display("Block encoder I/O error while seeking: {}", source))]
SeekError { source: std::io::Error },
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
pub const F64_BLOCKTYPE_MARKER: u8 = 0;
pub const I64_BLOCKTYPE_MARKER: u8 = 1;
pub const BOOL_BLOCKTYPE_MARKER: u8 = 2;
@ -213,49 +229,39 @@ impl BlockType for u64 {
/// Types implementing `Encoder` are able to encode themselves into compressed
/// blocks of data.
pub trait Encoder {
fn encode(&self, dst: &mut Vec<u8>) -> Result<(), StorageError>;
fn encode(&self, dst: &mut Vec<u8>) -> Result<()>;
}
impl Encoder for Vec<f64> {
fn encode(&self, dst: &mut Vec<u8>) -> Result<(), StorageError> {
float::encode(&self, dst).map_err(|e| StorageError {
description: e.to_string(),
})
fn encode(&self, dst: &mut Vec<u8>) -> Result<()> {
float::encode(&self, dst).context(EncodingError)
}
}
impl Encoder for Vec<i64> {
fn encode(&self, dst: &mut Vec<u8>) -> Result<(), StorageError> {
integer::encode(&self, dst).map_err(|e| StorageError {
description: e.to_string(),
})
fn encode(&self, dst: &mut Vec<u8>) -> Result<()> {
integer::encode(&self, dst).context(EncodingError)
}
}
impl Encoder for Vec<u64> {
fn encode(&self, dst: &mut Vec<u8>) -> Result<(), StorageError> {
unsigned::encode(&self, dst).map_err(|e| StorageError {
description: e.to_string(),
})
fn encode(&self, dst: &mut Vec<u8>) -> Result<()> {
unsigned::encode(&self, dst).context(EncodingError)
}
}
// The type annotation for `bytes` isn't related to `Self` but clippy thinks it is
#[allow(clippy::use_self)]
impl Encoder for Vec<&str> {
fn encode(&self, dst: &mut Vec<u8>) -> Result<(), StorageError> {
fn encode(&self, dst: &mut Vec<u8>) -> Result<()> {
let bytes: Vec<_> = self.iter().map(|s| s.as_bytes()).collect();
string::encode(&bytes, dst).map_err(|e| StorageError {
description: e.to_string(),
})
string::encode(&bytes, dst).context(EncodingError)
}
}
impl Encoder for Vec<bool> {
fn encode(&self, dst: &mut Vec<u8>) -> Result<(), StorageError> {
boolean::encode(&self, dst).map_err(|e| StorageError {
description: e.to_string(),
})
fn encode(&self, dst: &mut Vec<u8>) -> Result<()> {
boolean::encode(&self, dst).context(EncodingError)
}
}
@ -292,7 +298,7 @@ where
/// on the provided `Hasher`.
///
/// `write_to` returns the number of bytes written to `w` or any error encountered.
fn write_to<W: Write, H: Hasher>(&self, w: &mut W, h: &mut H) -> Result<usize, StorageError>;
fn write_to<W: Write, H: Hasher>(&self, w: &mut W, h: &mut H) -> Result<usize>;
}
/// `Block` is a container for a compressed block of timestamps and associated values.
@ -356,7 +362,7 @@ where
}
/// `write_to` serialises the block into the provided writer `w`.
pub fn write_to<W>(&mut self, w: &mut W) -> Result<usize, StorageError>
pub fn write_to<W>(&mut self, w: &mut W) -> Result<usize>
where
W: Write + Seek,
{
@ -365,9 +371,7 @@ where
// There are some experimental APIs to do that here: https://doc.rust-lang.org/std/io/trait.Seek.html#method.stream_position
// But I'm not sure how to proceed in the meantime...
let summary = self.summary().ok_or_else(|| StorageError {
description: "empty block".to_string(),
})?;
let summary = self.summary().context(CannotFindSummary)?;
// hasher is used to compute a checksum, which will be written to the
// front of the Block when it's serialised.
@ -377,42 +381,42 @@ where
// 4 byte place-holder for checksum.
offset += 4;
w.write_all(&[0; 4])?;
w.write_all(&[0; 4]).context(WritingError)?;
// ID.
let id_bytes = self.id.to_be_bytes();
offset += id_bytes.len();
w.write_all(&id_bytes)?;
w.write_all(&id_bytes).context(WritingError)?;
hasher.update(&id_bytes);
// minimum timestamp in block
let time_range = summary.time_range();
let min_time_bytes = time_range.0.to_be_bytes();
offset += min_time_bytes.len();
w.write_all(&min_time_bytes)?;
w.write_all(&min_time_bytes).context(WritingError)?;
hasher.update(&min_time_bytes);
// maximum timestamp in block
let max_time_bytes = time_range.1.to_be_bytes();
offset += max_time_bytes.len();
w.write_all(&max_time_bytes)?;
w.write_all(&max_time_bytes).context(WritingError)?;
hasher.update(&max_time_bytes);
// write the block type
let marker_bytes = [T::BYTE_MARKER];
offset += marker_bytes.len();
w.write_all(&marker_bytes)?;
w.write_all(&marker_bytes).context(WritingError)?;
hasher.update(&marker_bytes);
// 1 byte place-holder for summary size
let summary_size_offset = offset;
offset += 1;
w.write_all(&[0; 1])?;
w.write_all(&[0; 1]).context(WritingError)?;
// 4 byte place-holder for summary size
let data_size_offset = offset;
offset += 4;
w.write_all(&[0; 4])?;
w.write_all(&[0; 4]).context(WritingError)?;
// write the summary - n bytes
let mut summary_hasher = crc32fast::Hasher::new(); // combined later
@ -429,11 +433,12 @@ where
summary_size_offset
.try_into()
.expect("summary_size_offset did not fit in u64"),
))?;
))
.context(WritingError)?;
let summary_size: u8 = summary_size
.try_into()
.expect("summary_size did not fit in u8");
w.write_all(&[summary_size])?;
w.write_all(&[summary_size]).context(WritingError)?;
hasher.update(&[summary_size]);
// seek and write the data block size in the reserved offset
@ -441,10 +446,12 @@ where
data_size_offset
.try_into()
.expect("data_size_offset did not fit in u64"),
))?;
))
.context(SeekError)?;
let data_size: u32 = data_size.try_into().expect("data_size did not fit in u32");
w.write_all(&(data_size).to_be_bytes())?;
w.write_all(&(data_size).to_be_bytes())
.context(WritingError)?;
hasher.update(&(data_size).to_be_bytes());
// combine hasher with summary hasher and data block hasher.
@ -452,12 +459,12 @@ where
hasher.combine(&data_block_hasher);
// seek back and write the checksum in.
w.seek(SeekFrom::Start(0))?;
w.seek(SeekFrom::Start(0)).context(WritingError)?;
let checksum = hasher.finalize();
w.write_all(&checksum.to_be_bytes())?;
w.write_all(&checksum.to_be_bytes()).context(WritingError)?;
// seek to last written offset for next caller.
w.seek(SeekFrom::Start(offset as u64))?;
w.seek(SeekFrom::Start(offset as u64)).context(SeekError)?;
Ok(offset)
}
}
@ -513,7 +520,7 @@ where
/// `write_to` serialises the block to the provided `Writer`, compressing the
/// timestamps and values using the most appropriate encoder for the data.
fn write_to<W, H>(&mut self, w: &mut W, h: &mut H) -> Result<usize, StorageError>
fn write_to<W, H>(&mut self, w: &mut W, h: &mut H) -> Result<usize>
where
W: Write,
H: Hasher,
@ -531,20 +538,18 @@ where
// TODO(edd): pool this buffer
let mut data_buf: Vec<u8> = vec![];
timestamp::encode(&ts, &mut data_buf).map_err(|e| StorageError {
description: e.to_string(),
})?;
timestamp::encode(&ts, &mut data_buf).context(EncodingError)?;
total += write_64_bit_varint(ts.len(), w, h)?;
total += data_buf.len();
w.write_all(&data_buf)?; // timestamp block
w.write_all(&data_buf).context(WritingError)?; // timestamp block
h.write(&data_buf);
data_buf.clear();
values.encode(&mut data_buf)?;
total += data_buf.len();
w.write_all(&data_buf)?; // values block
w.write_all(&data_buf).context(WritingError)?; // values block
h.write(&data_buf);
Ok(total)
@ -612,7 +617,7 @@ impl BlockSummary<f64> for FloatBlockSummary {
/// `write_to` serialises the summary to the provided writer and calculates a
/// checksum of the data written. The number of bytes written is returned.
fn write_to<W, H>(&self, w: &mut W, h: &mut H) -> Result<usize, StorageError>
fn write_to<W, H>(&self, w: &mut W, h: &mut H) -> Result<usize>
where
W: Write,
H: Hasher,
@ -692,7 +697,7 @@ impl BlockSummary<i64> for IntegerBlockSummary {
/// `write_to` serialises the summary to the provided writer and calculates a
/// checksum. The number of bytes written is returned.
fn write_to<W, H>(&self, w: &mut W, h: &mut H) -> Result<usize, StorageError>
fn write_to<W, H>(&self, w: &mut W, h: &mut H) -> Result<usize>
where
W: Write,
H: Hasher,
@ -706,7 +711,7 @@ impl BlockSummary<i64> for IntegerBlockSummary {
let (sign, sum_bytes) = self.sum.to_bytes_be();
let sign_bytes = [sign as u8];
total += sign_bytes.len();
w.write_all(&sign_bytes)?;
w.write_all(&sign_bytes).context(WritingError)?;
h.write(&sign_bytes);
// next, write out the number of bytes needed to store the big int data.
@ -720,13 +725,13 @@ impl BlockSummary<i64> for IntegerBlockSummary {
.expect("sum_bytes.len() did not fit in u16");
let len_bytes = len.to_be_bytes();
total += len_bytes.len();
w.write_all(&len_bytes)?;
w.write_all(&len_bytes).context(WritingError)?;
h.write(&len_bytes);
// finally, write out the variable number of bytes to represent the big
// int.
total += sum_bytes.len();
w.write_all(&sum_bytes)?;
w.write_all(&sum_bytes).context(WritingError)?;
h.write(&sum_bytes);
// The rest of the summary values are varint encoded i64s.
@ -785,7 +790,7 @@ impl BlockSummary<bool> for BoolBlockSummary {
/// `write_to` serialises the summary to the provided writer and calculates a
/// checksum. The number of bytes written is returned.
fn write_to<W: Write, H: Hasher>(&self, w: &mut W, h: &mut H) -> Result<usize, StorageError> {
fn write_to<W: Write, H: Hasher>(&self, w: &mut W, h: &mut H) -> Result<usize> {
write_64_bit_varint(self.count, w, h)
}
}
@ -837,7 +842,7 @@ impl<'a> BlockSummary<&'a str> for StringBlockSummary<'a> {
/// `write_to` serialises the summary to the provided writer and calculates a
/// checksum. The number of bytes written is returned.
fn write_to<W: Write, H: Hasher>(&self, w: &mut W, h: &mut H) -> Result<usize, StorageError> {
fn write_to<W: Write, H: Hasher>(&self, w: &mut W, h: &mut H) -> Result<usize> {
write_64_bit_varint(self.count, w, h)
}
}
@ -905,7 +910,7 @@ impl BlockSummary<u64> for UnsignedBlockSummary {
/// `write_to` serialises the summary to the provided writer and calculates a
/// checksum. The number of bytes written is returned.
fn write_to<W, H>(&self, w: &mut W, h: &mut H) -> Result<usize, StorageError>
fn write_to<W, H>(&self, w: &mut W, h: &mut H) -> Result<usize>
where
W: Write,
H: Hasher,
@ -926,13 +931,13 @@ impl BlockSummary<u64> for UnsignedBlockSummary {
.expect("sum_bytes.len() did not fit in u16");
let sum_bytes_len_bytes = sum_bytes_len.to_be_bytes();
total += sum_bytes_len_bytes.len();
w.write_all(&sum_bytes_len_bytes)?;
w.write_all(&sum_bytes_len_bytes).context(WritingError)?;
h.write(&sum_bytes_len_bytes);
// finally, write out the variable number of bytes to represent the big
// int.
total += sum_bytes.len();
w.write_all(&sum_bytes)?;
w.write_all(&sum_bytes).context(WritingError)?;
h.write(&sum_bytes);
// The rest of the summary values are varint encoded i64s.
@ -944,7 +949,7 @@ impl BlockSummary<u64> for UnsignedBlockSummary {
}
}
fn write_64_bit_varint<W, H>(val: impl VarInt, w: &mut W, h: &mut H) -> Result<usize, StorageError>
fn write_64_bit_varint<W, H>(val: impl VarInt, w: &mut W, h: &mut H) -> Result<usize>
where
W: Write,
H: Hasher,
@ -952,7 +957,7 @@ where
// 10 bytes is enough to hold the maximum varint for a 64-bit number.
let mut size_buf = [0; 10];
let n = val.encode_var(&mut size_buf);
w.write_all(&size_buf[..n])?;
w.write_all(&size_buf[..n]).context(WritingError)?;
h.write(&size_buf[..n]);
Ok(n)