perf(wal): avoid buffer allocation in writer
Eliminate buffer allocation (& growing) in the WAL file writer by reusing a single buffer each time. This implementation shrinks the buffer size down to 128KiB if it grows above that amount to prevent one large write from consuming memory forever more (128KiB should be plenty more than the common write size).pull/24376/head
parent
c180d3d8ac
commit
b9f7f12c0c
|
@ -13,6 +13,17 @@ use std::{
|
|||
},
|
||||
};
|
||||
|
||||
/// Defines the desired maximum size of the re-used write
|
||||
/// [`OpenSegmentFileWriter`] buffer.
|
||||
///
|
||||
/// The buffer is free to exceed this soft limit as necessary, but will always
|
||||
/// be shrunk back down to at most this size eventually.
|
||||
///
|
||||
/// Setting this too low causes needless reallocations for each write that
|
||||
/// exceeds it. Setting it too high wastes memory. Configure it to a tolerable
|
||||
/// amount of memory overhead for the lifetime of the writer.
|
||||
const SOFT_MAX_BUFFER_LEN: usize = 1024 * 128; // 128kiB
|
||||
|
||||
/// Struct for writing data to a segment file in a wal
|
||||
#[derive(Debug)]
|
||||
pub struct OpenSegmentFileWriter {
|
||||
|
@ -20,6 +31,8 @@ pub struct OpenSegmentFileWriter {
|
|||
path: PathBuf,
|
||||
f: File,
|
||||
bytes_written: usize,
|
||||
|
||||
buffer: Vec<u8>,
|
||||
}
|
||||
|
||||
impl OpenSegmentFileWriter {
|
||||
|
@ -53,6 +66,7 @@ impl OpenSegmentFileWriter {
|
|||
path,
|
||||
f,
|
||||
bytes_written,
|
||||
buffer: Vec::with_capacity(8 * 1204), // 8kiB initial size
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -61,28 +75,28 @@ impl OpenSegmentFileWriter {
|
|||
}
|
||||
|
||||
pub fn write(&mut self, data: &[u8]) -> Result<WriteSummary> {
|
||||
// Ensure the write buffer is always empty before using it.
|
||||
self.buffer.clear();
|
||||
// And shrink the buffer below the maximum permitted size should the odd
|
||||
// large batch grow it. This is a NOP if the size is less than
|
||||
// SOFT_MAX_BUFFER_LEN already.
|
||||
self.buffer.shrink_to(SOFT_MAX_BUFFER_LEN);
|
||||
|
||||
// Only designed to support chunks up to `u32::max` bytes long.
|
||||
let uncompressed_len = data.len();
|
||||
u32::try_from(uncompressed_len).context(ChunkSizeTooLargeSnafu {
|
||||
actual: uncompressed_len,
|
||||
})?;
|
||||
|
||||
// Allocate a buffer to hold the compressed data + chunk header fields.
|
||||
//
|
||||
// This allocates another buffer of "data.len()", and because we expect
|
||||
// the compressed representation to be smaller, this wastes some memory
|
||||
// for a very short period of time, but prevents the need to
|
||||
// grow/reallocate the buffer in a latency sensitive part of the code.
|
||||
let mut buf = Vec::with_capacity(uncompressed_len);
|
||||
|
||||
// The chunk header is two u32 values, so write a dummy u64 value and
|
||||
// come back to fill them in later.
|
||||
buf.write_u64::<BigEndian>(0)
|
||||
self.buffer
|
||||
.write_u64::<BigEndian>(0)
|
||||
.expect("cannot fail to write to buffer");
|
||||
|
||||
// Compress the payload into the buffer, recording the crc hash as it is
|
||||
// wrote.
|
||||
let mut encoder = snap::write::FrameEncoder::new(HasherWrapper::new(buf));
|
||||
// Compress the payload into the reused buffer, recording the crc hash
|
||||
// as it is wrote.
|
||||
let mut encoder = snap::write::FrameEncoder::new(HasherWrapper::new(&mut self.buffer));
|
||||
encoder.write_all(data).context(UnableToCompressDataSnafu)?;
|
||||
let (checksum, buf) = encoder
|
||||
.into_inner()
|
||||
|
@ -108,7 +122,7 @@ impl OpenSegmentFileWriter {
|
|||
// Write the entire buffer to the file
|
||||
let buf = buf.into_inner();
|
||||
let bytes_written = buf.len();
|
||||
self.f.write_all(&buf).context(SegmentWriteDataSnafu)?;
|
||||
self.f.write_all(buf).context(SegmentWriteDataSnafu)?;
|
||||
|
||||
// fsync the fd
|
||||
self.f.sync_all().expect("fsync failure");
|
||||
|
|
Loading…
Reference in New Issue