diff --git a/src/storage/block.rs b/src/storage/block.rs index 4f2398e6ec..3cead1ddc1 100644 --- a/src/storage/block.rs +++ b/src/storage/block.rs @@ -163,16 +163,32 @@ //! ╚═════════════════════════════════════╝ //! ``` -use crate::storage::StorageError; use delorean_tsm::encoders::{boolean, float, integer, string, timestamp, unsigned}; use integer_encoding::*; use num::bigint::{BigInt, BigUint}; +use snafu::{OptionExt, ResultExt, Snafu}; use std::convert::TryInto; use std::io::{Seek, SeekFrom, Write}; use std::{u16, u32}; +#[derive(Debug, Snafu)] +pub enum Error { + #[snafu(display("Block encoding error: {}", source))] + EncodingError { source: Box }, + #[snafu(display("Block cannot find summary / empty block"))] + CannotFindSummary {}, + + #[snafu(display("Block encoder I/O error while writing: {}", source))] + WritingError { source: std::io::Error }, + + #[snafu(display("Block encoder I/O error while seeking: {}", source))] + SeekError { source: std::io::Error }, +} + +pub type Result = std::result::Result; + pub const F64_BLOCKTYPE_MARKER: u8 = 0; pub const I64_BLOCKTYPE_MARKER: u8 = 1; pub const BOOL_BLOCKTYPE_MARKER: u8 = 2; @@ -213,49 +229,39 @@ impl BlockType for u64 { /// Types implementing `Encoder` are able to encode themselves into compressed /// blocks of data. pub trait Encoder { - fn encode(&self, dst: &mut Vec) -> Result<(), StorageError>; + fn encode(&self, dst: &mut Vec) -> Result<()>; } impl Encoder for Vec { - fn encode(&self, dst: &mut Vec) -> Result<(), StorageError> { - float::encode(&self, dst).map_err(|e| StorageError { - description: e.to_string(), - }) + fn encode(&self, dst: &mut Vec) -> Result<()> { + float::encode(&self, dst).context(EncodingError) } } impl Encoder for Vec { - fn encode(&self, dst: &mut Vec) -> Result<(), StorageError> { - integer::encode(&self, dst).map_err(|e| StorageError { - description: e.to_string(), - }) + fn encode(&self, dst: &mut Vec) -> Result<()> { + integer::encode(&self, dst).context(EncodingError) } } impl Encoder for Vec { - fn encode(&self, dst: &mut Vec) -> Result<(), StorageError> { - unsigned::encode(&self, dst).map_err(|e| StorageError { - description: e.to_string(), - }) + fn encode(&self, dst: &mut Vec) -> Result<()> { + unsigned::encode(&self, dst).context(EncodingError) } } // The type annotation for `bytes` isn't related to `Self` but clippy thinks it is #[allow(clippy::use_self)] impl Encoder for Vec<&str> { - fn encode(&self, dst: &mut Vec) -> Result<(), StorageError> { + fn encode(&self, dst: &mut Vec) -> Result<()> { let bytes: Vec<_> = self.iter().map(|s| s.as_bytes()).collect(); - string::encode(&bytes, dst).map_err(|e| StorageError { - description: e.to_string(), - }) + string::encode(&bytes, dst).context(EncodingError) } } impl Encoder for Vec { - fn encode(&self, dst: &mut Vec) -> Result<(), StorageError> { - boolean::encode(&self, dst).map_err(|e| StorageError { - description: e.to_string(), - }) + fn encode(&self, dst: &mut Vec) -> Result<()> { + boolean::encode(&self, dst).context(EncodingError) } } @@ -292,7 +298,7 @@ where /// on the provided `Hasher`. /// /// `write_to` returns the number of bytes written to `w` or any error encountered. - fn write_to(&self, w: &mut W, h: &mut H) -> Result; + fn write_to(&self, w: &mut W, h: &mut H) -> Result; } /// `Block` is a container for a compressed block of timestamps and associated values. @@ -356,7 +362,7 @@ where } /// `write_to` serialises the block into the provided writer `w`. - pub fn write_to(&mut self, w: &mut W) -> Result + pub fn write_to(&mut self, w: &mut W) -> Result where W: Write + Seek, { @@ -365,9 +371,7 @@ where // There are some experimental APIs to do that here: https://doc.rust-lang.org/std/io/trait.Seek.html#method.stream_position // But I'm not sure how to proceed in the meantime... - let summary = self.summary().ok_or_else(|| StorageError { - description: "empty block".to_string(), - })?; + let summary = self.summary().context(CannotFindSummary)?; // hasher is used to compute a checksum, which will be written to the // front of the Block when it's serialised. @@ -377,42 +381,42 @@ where // 4 byte place-holder for checksum. offset += 4; - w.write_all(&[0; 4])?; + w.write_all(&[0; 4]).context(WritingError)?; // ID. let id_bytes = self.id.to_be_bytes(); offset += id_bytes.len(); - w.write_all(&id_bytes)?; + w.write_all(&id_bytes).context(WritingError)?; hasher.update(&id_bytes); // minimum timestamp in block let time_range = summary.time_range(); let min_time_bytes = time_range.0.to_be_bytes(); offset += min_time_bytes.len(); - w.write_all(&min_time_bytes)?; + w.write_all(&min_time_bytes).context(WritingError)?; hasher.update(&min_time_bytes); // maximum timestamp in block let max_time_bytes = time_range.1.to_be_bytes(); offset += max_time_bytes.len(); - w.write_all(&max_time_bytes)?; + w.write_all(&max_time_bytes).context(WritingError)?; hasher.update(&max_time_bytes); // write the block type let marker_bytes = [T::BYTE_MARKER]; offset += marker_bytes.len(); - w.write_all(&marker_bytes)?; + w.write_all(&marker_bytes).context(WritingError)?; hasher.update(&marker_bytes); // 1 byte place-holder for summary size let summary_size_offset = offset; offset += 1; - w.write_all(&[0; 1])?; + w.write_all(&[0; 1]).context(WritingError)?; // 4 byte place-holder for summary size let data_size_offset = offset; offset += 4; - w.write_all(&[0; 4])?; + w.write_all(&[0; 4]).context(WritingError)?; // write the summary - n bytes let mut summary_hasher = crc32fast::Hasher::new(); // combined later @@ -429,11 +433,12 @@ where summary_size_offset .try_into() .expect("summary_size_offset did not fit in u64"), - ))?; + )) + .context(WritingError)?; let summary_size: u8 = summary_size .try_into() .expect("summary_size did not fit in u8"); - w.write_all(&[summary_size])?; + w.write_all(&[summary_size]).context(WritingError)?; hasher.update(&[summary_size]); // seek and write the data block size in the reserved offset @@ -441,10 +446,12 @@ where data_size_offset .try_into() .expect("data_size_offset did not fit in u64"), - ))?; + )) + .context(SeekError)?; let data_size: u32 = data_size.try_into().expect("data_size did not fit in u32"); - w.write_all(&(data_size).to_be_bytes())?; + w.write_all(&(data_size).to_be_bytes()) + .context(WritingError)?; hasher.update(&(data_size).to_be_bytes()); // combine hasher with summary hasher and data block hasher. @@ -452,12 +459,12 @@ where hasher.combine(&data_block_hasher); // seek back and write the checksum in. - w.seek(SeekFrom::Start(0))?; + w.seek(SeekFrom::Start(0)).context(WritingError)?; let checksum = hasher.finalize(); - w.write_all(&checksum.to_be_bytes())?; + w.write_all(&checksum.to_be_bytes()).context(WritingError)?; // seek to last written offset for next caller. - w.seek(SeekFrom::Start(offset as u64))?; + w.seek(SeekFrom::Start(offset as u64)).context(SeekError)?; Ok(offset) } } @@ -513,7 +520,7 @@ where /// `write_to` serialises the block to the provided `Writer`, compressing the /// timestamps and values using the most appropriate encoder for the data. - fn write_to(&mut self, w: &mut W, h: &mut H) -> Result + fn write_to(&mut self, w: &mut W, h: &mut H) -> Result where W: Write, H: Hasher, @@ -531,20 +538,18 @@ where // TODO(edd): pool this buffer let mut data_buf: Vec = vec![]; - timestamp::encode(&ts, &mut data_buf).map_err(|e| StorageError { - description: e.to_string(), - })?; + timestamp::encode(&ts, &mut data_buf).context(EncodingError)?; total += write_64_bit_varint(ts.len(), w, h)?; total += data_buf.len(); - w.write_all(&data_buf)?; // timestamp block + w.write_all(&data_buf).context(WritingError)?; // timestamp block h.write(&data_buf); data_buf.clear(); values.encode(&mut data_buf)?; total += data_buf.len(); - w.write_all(&data_buf)?; // values block + w.write_all(&data_buf).context(WritingError)?; // values block h.write(&data_buf); Ok(total) @@ -612,7 +617,7 @@ impl BlockSummary for FloatBlockSummary { /// `write_to` serialises the summary to the provided writer and calculates a /// checksum of the data written. The number of bytes written is returned. - fn write_to(&self, w: &mut W, h: &mut H) -> Result + fn write_to(&self, w: &mut W, h: &mut H) -> Result where W: Write, H: Hasher, @@ -692,7 +697,7 @@ impl BlockSummary for IntegerBlockSummary { /// `write_to` serialises the summary to the provided writer and calculates a /// checksum. The number of bytes written is returned. - fn write_to(&self, w: &mut W, h: &mut H) -> Result + fn write_to(&self, w: &mut W, h: &mut H) -> Result where W: Write, H: Hasher, @@ -706,7 +711,7 @@ impl BlockSummary for IntegerBlockSummary { let (sign, sum_bytes) = self.sum.to_bytes_be(); let sign_bytes = [sign as u8]; total += sign_bytes.len(); - w.write_all(&sign_bytes)?; + w.write_all(&sign_bytes).context(WritingError)?; h.write(&sign_bytes); // next, write out the number of bytes needed to store the big int data. @@ -720,13 +725,13 @@ impl BlockSummary for IntegerBlockSummary { .expect("sum_bytes.len() did not fit in u16"); let len_bytes = len.to_be_bytes(); total += len_bytes.len(); - w.write_all(&len_bytes)?; + w.write_all(&len_bytes).context(WritingError)?; h.write(&len_bytes); // finally, write out the variable number of bytes to represent the big // int. total += sum_bytes.len(); - w.write_all(&sum_bytes)?; + w.write_all(&sum_bytes).context(WritingError)?; h.write(&sum_bytes); // The rest of the summary values are varint encoded i64s. @@ -785,7 +790,7 @@ impl BlockSummary for BoolBlockSummary { /// `write_to` serialises the summary to the provided writer and calculates a /// checksum. The number of bytes written is returned. - fn write_to(&self, w: &mut W, h: &mut H) -> Result { + fn write_to(&self, w: &mut W, h: &mut H) -> Result { write_64_bit_varint(self.count, w, h) } } @@ -837,7 +842,7 @@ impl<'a> BlockSummary<&'a str> for StringBlockSummary<'a> { /// `write_to` serialises the summary to the provided writer and calculates a /// checksum. The number of bytes written is returned. - fn write_to(&self, w: &mut W, h: &mut H) -> Result { + fn write_to(&self, w: &mut W, h: &mut H) -> Result { write_64_bit_varint(self.count, w, h) } } @@ -905,7 +910,7 @@ impl BlockSummary for UnsignedBlockSummary { /// `write_to` serialises the summary to the provided writer and calculates a /// checksum. The number of bytes written is returned. - fn write_to(&self, w: &mut W, h: &mut H) -> Result + fn write_to(&self, w: &mut W, h: &mut H) -> Result where W: Write, H: Hasher, @@ -926,13 +931,13 @@ impl BlockSummary for UnsignedBlockSummary { .expect("sum_bytes.len() did not fit in u16"); let sum_bytes_len_bytes = sum_bytes_len.to_be_bytes(); total += sum_bytes_len_bytes.len(); - w.write_all(&sum_bytes_len_bytes)?; + w.write_all(&sum_bytes_len_bytes).context(WritingError)?; h.write(&sum_bytes_len_bytes); // finally, write out the variable number of bytes to represent the big // int. total += sum_bytes.len(); - w.write_all(&sum_bytes)?; + w.write_all(&sum_bytes).context(WritingError)?; h.write(&sum_bytes); // The rest of the summary values are varint encoded i64s. @@ -944,7 +949,7 @@ impl BlockSummary for UnsignedBlockSummary { } } -fn write_64_bit_varint(val: impl VarInt, w: &mut W, h: &mut H) -> Result +fn write_64_bit_varint(val: impl VarInt, w: &mut W, h: &mut H) -> Result where W: Write, H: Hasher, @@ -952,7 +957,7 @@ where // 10 bytes is enough to hold the maximum varint for a 64-bit number. let mut size_buf = [0; 10]; let n = val.encode_var(&mut size_buf); - w.write_all(&size_buf[..n])?; + w.write_all(&size_buf[..n]).context(WritingError)?; h.write(&size_buf[..n]); Ok(n)