feat: Compress WAL entries with Snappy

Fixes #276.
pull/24376/head
Carol (Nichols || Goulding) 2020-09-11 16:35:23 -04:00
parent 90eba702ca
commit 596c987956
3 changed files with 41 additions and 13 deletions

1
Cargo.lock generated
View File

@ -809,6 +809,7 @@ dependencies = [
"once_cell", "once_cell",
"regex", "regex",
"snafu", "snafu",
"snap",
"tokio", "tokio",
] ]

View File

@ -10,6 +10,7 @@ byteorder = "1.3.4"
bytes = "0.5.4" bytes = "0.5.4"
crc32fast = "1.2.0" crc32fast = "1.2.0"
snafu = "0.6.6" snafu = "0.6.6"
snap = "1.0.0"
regex = "1.3.7" regex = "1.3.7"
itertools = "0.9.0" itertools = "0.9.0"
once_cell = "1.4.0" once_cell = "1.4.0"

View File

@ -90,6 +90,14 @@ enum InternalError {
source: io::Error, source: io::Error,
}, },
UnableToCompressData {
source: snap::Error,
},
UnableToDecompressData {
source: snap::Error,
},
UnableToSync { UnableToSync {
source: io::Error, source: io::Error,
}, },
@ -520,22 +528,23 @@ impl Loader {
let expected_len_us = let expected_len_us =
usize::try_from(header.len).expect("Only designed to run on 32-bit systems or higher"); usize::try_from(header.len).expect("Only designed to run on 32-bit systems or higher");
let mut data = Vec::with_capacity(expected_len_us); let mut compressed_data = Vec::with_capacity(expected_len_us);
let actual_len = file
let actual_compressed_len = file
.take(u64::from(header.len)) .take(u64::from(header.len))
.read_to_end(&mut data) .read_to_end(&mut compressed_data)
.context(UnableToReadData)?; .context(UnableToReadData)?;
ensure!( ensure!(
expected_len_us == actual_len, expected_len_us == actual_compressed_len,
LengthMismatch { LengthMismatch {
expected: expected_len_us, expected: expected_len_us,
actual: actual_len actual: actual_compressed_len
} }
); );
let mut hasher = Hasher::new(); let mut hasher = Hasher::new();
hasher.update(&data); hasher.update(&compressed_data);
let actual_checksum = hasher.finalize(); let actual_checksum = hasher.finalize();
ensure!( ensure!(
@ -546,6 +555,11 @@ impl Loader {
} }
); );
let mut decoder = snap::raw::Decoder::new();
let data = decoder
.decompress_vec(&compressed_data)
.context(UnableToDecompressData)?;
let entry = Entry { let entry = Entry {
sequence_number: header.sequence_number, sequence_number: header.sequence_number,
data, data,
@ -627,20 +641,32 @@ pub struct WritePayload {
} }
impl WritePayload { impl WritePayload {
/// Initializes a write payload and computes its CRC from the passed in vec. /// Initializes a write payload, compresses the data, and computes its CRC.
pub fn new(data: Vec<u8>) -> Result<Self> { pub fn new(uncompressed_data: Vec<u8>) -> Result<Self> {
// Only designed to support chunks up to `u32::max` bytes long. // Only designed to support chunks up to `u32::max` bytes long.
let len = data.len(); let uncompressed_len = uncompressed_data.len();
let len = u32::try_from(len).context(ChunkSizeTooLarge { actual: len })?; let _ = u32::try_from(uncompressed_len).context(ChunkSizeTooLarge {
actual: uncompressed_len,
})?;
let mut encoder = snap::raw::Encoder::new();
let compressed_data = encoder
.compress_vec(&uncompressed_data)
.context(UnableToCompressData)?;
let actual_compressed_len = compressed_data.len();
let actual_compressed_len =
u32::try_from(actual_compressed_len).context(ChunkSizeTooLarge {
actual: actual_compressed_len,
})?;
let mut hasher = Hasher::new(); let mut hasher = Hasher::new();
hasher.update(&data); hasher.update(&compressed_data);
let checksum = hasher.finalize(); let checksum = hasher.finalize();
Ok(Self { Ok(Self {
checksum, checksum,
data, data: compressed_data,
len, len: actual_compressed_len,
}) })
} }
} }