diff --git a/wal/src/blocking/writer.rs b/wal/src/blocking/writer.rs index 2f885f40ab..1767fb796d 100644 --- a/wal/src/blocking/writer.rs +++ b/wal/src/blocking/writer.rs @@ -13,6 +13,17 @@ use std::{ }, }; +/// Defines the desired maximum size of the re-used write +/// [`OpenSegmentFileWriter`] buffer. +/// +/// The buffer is free to exceed this soft limit as necessary, but will always +/// be shrunk back down to at most this size eventually. +/// +/// Setting this too low causes needless reallocations for each write that +/// exceeds it. Setting it too high wastes memory. Configure it to a tolerable +/// amount of memory overhead for the lifetime of the writer. +const SOFT_MAX_BUFFER_LEN: usize = 1024 * 128; // 128kiB + /// Struct for writing data to a segment file in a wal #[derive(Debug)] pub struct OpenSegmentFileWriter { @@ -20,6 +31,8 @@ pub struct OpenSegmentFileWriter { path: PathBuf, f: File, bytes_written: usize, + + buffer: Vec, } impl OpenSegmentFileWriter { @@ -53,6 +66,7 @@ impl OpenSegmentFileWriter { path, f, bytes_written, + buffer: Vec::with_capacity(8 * 1204), // 8kiB initial size }) } @@ -61,28 +75,28 @@ impl OpenSegmentFileWriter { } pub fn write(&mut self, data: &[u8]) -> Result { + // Ensure the write buffer is always empty before using it. + self.buffer.clear(); + // And shrink the buffer below the maximum permitted size should the odd + // large batch grow it. This is a NOP if the size is less than + // SOFT_MAX_BUFFER_LEN already. + self.buffer.shrink_to(SOFT_MAX_BUFFER_LEN); + // Only designed to support chunks up to `u32::max` bytes long. let uncompressed_len = data.len(); u32::try_from(uncompressed_len).context(ChunkSizeTooLargeSnafu { actual: uncompressed_len, })?; - // Allocate a buffer to hold the compressed data + chunk header fields. - // - // This allocates another buffer of "data.len()", and because we expect - // the compressed representation to be smaller, this wastes some memory - // for a very short period of time, but prevents the need to - // grow/reallocate the buffer in a latency sensitive part of the code. - let mut buf = Vec::with_capacity(uncompressed_len); - // The chunk header is two u32 values, so write a dummy u64 value and // come back to fill them in later. - buf.write_u64::(0) + self.buffer + .write_u64::(0) .expect("cannot fail to write to buffer"); - // Compress the payload into the buffer, recording the crc hash as it is - // wrote. - let mut encoder = snap::write::FrameEncoder::new(HasherWrapper::new(buf)); + // Compress the payload into the reused buffer, recording the crc hash + // as it is wrote. + let mut encoder = snap::write::FrameEncoder::new(HasherWrapper::new(&mut self.buffer)); encoder.write_all(data).context(UnableToCompressDataSnafu)?; let (checksum, buf) = encoder .into_inner() @@ -108,7 +122,7 @@ impl OpenSegmentFileWriter { // Write the entire buffer to the file let buf = buf.into_inner(); let bytes_written = buf.len(); - self.f.write_all(&buf).context(SegmentWriteDataSnafu)?; + self.f.write_all(buf).context(SegmentWriteDataSnafu)?; // fsync the fd self.f.sync_all().expect("fsync failure");