diff --git a/Cargo.lock b/Cargo.lock index 926f985bf2..15d2e86ef7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -809,6 +809,7 @@ dependencies = [ "once_cell", "regex", "snafu", + "snap", "tokio", ] diff --git a/delorean_wal/Cargo.toml b/delorean_wal/Cargo.toml index a7ce6a7f70..7c791aa765 100644 --- a/delorean_wal/Cargo.toml +++ b/delorean_wal/Cargo.toml @@ -10,6 +10,7 @@ byteorder = "1.3.4" bytes = "0.5.4" crc32fast = "1.2.0" snafu = "0.6.6" +snap = "1.0.0" regex = "1.3.7" itertools = "0.9.0" once_cell = "1.4.0" diff --git a/delorean_wal/src/lib.rs b/delorean_wal/src/lib.rs index cf61d1bcb7..33b9f297b6 100644 --- a/delorean_wal/src/lib.rs +++ b/delorean_wal/src/lib.rs @@ -90,6 +90,14 @@ enum InternalError { source: io::Error, }, + UnableToCompressData { + source: snap::Error, + }, + + UnableToDecompressData { + source: snap::Error, + }, + UnableToSync { source: io::Error, }, @@ -520,22 +528,23 @@ impl Loader { let expected_len_us = usize::try_from(header.len).expect("Only designed to run on 32-bit systems or higher"); - let mut data = Vec::with_capacity(expected_len_us); - let actual_len = file + let mut compressed_data = Vec::with_capacity(expected_len_us); + + let actual_compressed_len = file .take(u64::from(header.len)) - .read_to_end(&mut data) + .read_to_end(&mut compressed_data) .context(UnableToReadData)?; ensure!( - expected_len_us == actual_len, + expected_len_us == actual_compressed_len, LengthMismatch { expected: expected_len_us, - actual: actual_len + actual: actual_compressed_len } ); let mut hasher = Hasher::new(); - hasher.update(&data); + hasher.update(&compressed_data); let actual_checksum = hasher.finalize(); ensure!( @@ -546,6 +555,11 @@ impl Loader { } ); + let mut decoder = snap::raw::Decoder::new(); + let data = decoder + .decompress_vec(&compressed_data) + .context(UnableToDecompressData)?; + let entry = Entry { sequence_number: header.sequence_number, data, @@ -627,20 +641,32 @@ pub struct WritePayload { } impl WritePayload { - /// Initializes a write payload and computes its CRC from the passed in vec. - pub fn new(data: Vec) -> Result { + /// Initializes a write payload, compresses the data, and computes its CRC. + pub fn new(uncompressed_data: Vec) -> Result { // Only designed to support chunks up to `u32::max` bytes long. - let len = data.len(); - let len = u32::try_from(len).context(ChunkSizeTooLarge { actual: len })?; + let uncompressed_len = uncompressed_data.len(); + let _ = u32::try_from(uncompressed_len).context(ChunkSizeTooLarge { + actual: uncompressed_len, + })?; + + let mut encoder = snap::raw::Encoder::new(); + let compressed_data = encoder + .compress_vec(&uncompressed_data) + .context(UnableToCompressData)?; + let actual_compressed_len = compressed_data.len(); + let actual_compressed_len = + u32::try_from(actual_compressed_len).context(ChunkSizeTooLarge { + actual: actual_compressed_len, + })?; let mut hasher = Hasher::new(); - hasher.update(&data); + hasher.update(&compressed_data); let checksum = hasher.finalize(); Ok(Self { checksum, - data, - len, + data: compressed_data, + len: actual_compressed_len, }) } }