fix: Use write_all to ensure entire buffer is processed
write doesn't guarantee the whole buffer gets written and returns the number of bytes it was able to write; whereas what's desired is everything gets written and the number of bytes is the total of what we're asking to write.pull/24376/head
parent
2879b6cad7
commit
8635076471
|
@ -366,43 +366,52 @@ where
|
||||||
let mut offset = 0;
|
let mut offset = 0;
|
||||||
|
|
||||||
// 4 byte place-holder for checksum.
|
// 4 byte place-holder for checksum.
|
||||||
offset += w.write(&[0; 4])?;
|
offset += 4;
|
||||||
|
w.write_all(&[0; 4])?;
|
||||||
|
|
||||||
// ID.
|
// ID.
|
||||||
let id_bytes = self.id.to_be_bytes();
|
let id_bytes = self.id.to_be_bytes();
|
||||||
offset += w.write(&id_bytes)?;
|
offset += id_bytes.len();
|
||||||
|
w.write_all(&id_bytes)?;
|
||||||
hasher.update(&id_bytes);
|
hasher.update(&id_bytes);
|
||||||
|
|
||||||
// minimum timestamp in block
|
// minimum timestamp in block
|
||||||
let time_range = summary.time_range();
|
let time_range = summary.time_range();
|
||||||
let min_time_bytes = time_range.0.to_be_bytes();
|
let min_time_bytes = time_range.0.to_be_bytes();
|
||||||
offset += w.write(&min_time_bytes)?;
|
offset += min_time_bytes.len();
|
||||||
|
w.write_all(&min_time_bytes)?;
|
||||||
hasher.update(&min_time_bytes);
|
hasher.update(&min_time_bytes);
|
||||||
|
|
||||||
// maximum timestamp in block
|
// maximum timestamp in block
|
||||||
let max_time_bytes = time_range.1.to_be_bytes();
|
let max_time_bytes = time_range.1.to_be_bytes();
|
||||||
offset += w.write(&max_time_bytes)?;
|
offset += max_time_bytes.len();
|
||||||
|
w.write_all(&max_time_bytes)?;
|
||||||
hasher.update(&max_time_bytes);
|
hasher.update(&max_time_bytes);
|
||||||
|
|
||||||
// 4 byte remaining block size place-holder
|
// 4 byte remaining block size place-holder
|
||||||
let remaining_size_offset = offset;
|
let remaining_size_offset = offset;
|
||||||
offset += w.write(&[0; 4])?;
|
offset += 4;
|
||||||
|
w.write_all(&[0; 4])?;
|
||||||
|
|
||||||
// write the block type - do not hash for checksum until later.
|
// write the block type - do not hash for checksum until later.
|
||||||
let marker_bytes = [T::BYTE_MARKER];
|
let marker_bytes = [T::BYTE_MARKER];
|
||||||
offset += w.write(&marker_bytes)?;
|
offset += marker_bytes.len();
|
||||||
|
w.write_all(&marker_bytes)?;
|
||||||
|
|
||||||
// 1 byte place-holder for summary size
|
// 1 byte place-holder for summary size
|
||||||
let summary_size_offset = offset;
|
let summary_size_offset = offset;
|
||||||
offset += w.write(&[0; 1])?;
|
offset += 1;
|
||||||
|
w.write_all(&[0; 1])?;
|
||||||
|
|
||||||
// 2 byte place-holder for block data offset
|
// 2 byte place-holder for block data offset
|
||||||
let data_offset_offset = offset;
|
let data_offset_offset = offset;
|
||||||
offset += w.write(&[0; 2])?;
|
offset += 2;
|
||||||
|
w.write_all(&[0; 2])?;
|
||||||
|
|
||||||
// 4 byte place-holder for block data size
|
// 4 byte place-holder for block data size
|
||||||
let data_size_offset = offset;
|
let data_size_offset = offset;
|
||||||
offset += w.write(&[0; 4])?;
|
offset += 4;
|
||||||
|
w.write_all(&[0; 4])?;
|
||||||
|
|
||||||
// write the summary - n bytes
|
// write the summary - n bytes
|
||||||
let mut summary_hasher = crc32fast::Hasher::new(); // combined later
|
let mut summary_hasher = crc32fast::Hasher::new(); // combined later
|
||||||
|
@ -426,7 +435,7 @@ where
|
||||||
.try_into()
|
.try_into()
|
||||||
.expect("remaining_size_offset did not fit in u64"),
|
.expect("remaining_size_offset did not fit in u64"),
|
||||||
))?;
|
))?;
|
||||||
w.write(&remaining_size.to_be_bytes())?;
|
w.write_all(&remaining_size.to_be_bytes())?;
|
||||||
hasher.update(&remaining_size.to_be_bytes());
|
hasher.update(&remaining_size.to_be_bytes());
|
||||||
|
|
||||||
// hash block type for checksum
|
// hash block type for checksum
|
||||||
|
@ -441,7 +450,7 @@ where
|
||||||
let summary_size: u8 = summary_size
|
let summary_size: u8 = summary_size
|
||||||
.try_into()
|
.try_into()
|
||||||
.expect("summary_size did not fit in u8");
|
.expect("summary_size did not fit in u8");
|
||||||
w.write(&[summary_size])?;
|
w.write_all(&[summary_size])?;
|
||||||
hasher.update(&[summary_size]);
|
hasher.update(&[summary_size]);
|
||||||
|
|
||||||
// seek and write the data block offset in the reserved offset
|
// seek and write the data block offset in the reserved offset
|
||||||
|
@ -454,7 +463,7 @@ where
|
||||||
let data_offset: u16 = data_offset
|
let data_offset: u16 = data_offset
|
||||||
.try_into()
|
.try_into()
|
||||||
.expect("data_offset did not fit in u16");
|
.expect("data_offset did not fit in u16");
|
||||||
w.write(&(data_offset).to_be_bytes())?;
|
w.write_all(&(data_offset).to_be_bytes())?;
|
||||||
hasher.update(&(data_offset).to_be_bytes());
|
hasher.update(&(data_offset).to_be_bytes());
|
||||||
|
|
||||||
// seek and write the data block size in the reserved offset
|
// seek and write the data block size in the reserved offset
|
||||||
|
@ -465,7 +474,7 @@ where
|
||||||
))?;
|
))?;
|
||||||
let data_size: u32 = data_size.try_into().expect("data_size did not fit in u32");
|
let data_size: u32 = data_size.try_into().expect("data_size did not fit in u32");
|
||||||
|
|
||||||
w.write(&(data_size).to_be_bytes())?;
|
w.write_all(&(data_size).to_be_bytes())?;
|
||||||
hasher.update(&(data_size).to_be_bytes());
|
hasher.update(&(data_size).to_be_bytes());
|
||||||
|
|
||||||
// combine hasher with summary hasher and data block hasher.
|
// combine hasher with summary hasher and data block hasher.
|
||||||
|
@ -475,7 +484,7 @@ where
|
||||||
// seek back and write the checksum in.
|
// seek back and write the checksum in.
|
||||||
w.seek(SeekFrom::Start(0))?;
|
w.seek(SeekFrom::Start(0))?;
|
||||||
let checksum = hasher.finalize();
|
let checksum = hasher.finalize();
|
||||||
w.write(&checksum.to_be_bytes())?;
|
w.write_all(&checksum.to_be_bytes())?;
|
||||||
|
|
||||||
Ok(offset)
|
Ok(offset)
|
||||||
}
|
}
|
||||||
|
@ -560,15 +569,18 @@ where
|
||||||
// length 10 is max needed for encoding varint.
|
// length 10 is max needed for encoding varint.
|
||||||
let mut size_buf = vec![0; 10];
|
let mut size_buf = vec![0; 10];
|
||||||
let n = ts.len().encode_var(&mut size_buf);
|
let n = ts.len().encode_var(&mut size_buf);
|
||||||
total += w.write(&size_buf[..n])?; // timestamp block size
|
total += n;
|
||||||
|
w.write_all(&size_buf[..n])?; // timestamp block size
|
||||||
h.write(&size_buf[..n]);
|
h.write(&size_buf[..n]);
|
||||||
|
|
||||||
total += w.write(&data_buf)?; // timestamp block
|
total += data_buf.len();
|
||||||
|
w.write_all(&data_buf)?; // timestamp block
|
||||||
h.write(&data_buf);
|
h.write(&data_buf);
|
||||||
|
|
||||||
data_buf.clear();
|
data_buf.clear();
|
||||||
values.encode(&mut data_buf)?;
|
values.encode(&mut data_buf)?;
|
||||||
total += w.write(&data_buf)?; // values block
|
total += data_buf.len();
|
||||||
|
w.write_all(&data_buf)?; // values block
|
||||||
h.write(&data_buf);
|
h.write(&data_buf);
|
||||||
|
|
||||||
Ok(total)
|
Ok(total)
|
||||||
|
@ -652,13 +664,14 @@ impl BlockSummary<f64> for FloatBlockSummary {
|
||||||
let mut total = 0;
|
let mut total = 0;
|
||||||
let mut buf = [0; 10]; // Maximum varint size for 64-bit number.
|
let mut buf = [0; 10]; // Maximum varint size for 64-bit number.
|
||||||
let n = self.count.encode_var(&mut buf);
|
let n = self.count.encode_var(&mut buf);
|
||||||
w.write(&buf[..n])?;
|
|
||||||
total += n;
|
total += n;
|
||||||
|
w.write_all(&buf[..n])?;
|
||||||
h.write(&buf[..n]);
|
h.write(&buf[..n]);
|
||||||
|
|
||||||
for v in &[self.sum, self.first.1, self.last.1, self.min, self.max] {
|
for v in &[self.sum, self.first.1, self.last.1, self.min, self.max] {
|
||||||
let n = v.to_bits().encode_var(&mut buf);
|
let n = v.to_bits().encode_var(&mut buf);
|
||||||
total += w.write(&buf[..n])?;
|
total += n;
|
||||||
|
w.write_all(&buf[..n])?;
|
||||||
h.write(&buf[..n]);
|
h.write(&buf[..n]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -745,7 +758,7 @@ impl BlockSummary<i64> for IntegerBlockSummary {
|
||||||
|
|
||||||
let mut buf = [0; 10]; // Maximum varint size for 64-bit number.
|
let mut buf = [0; 10]; // Maximum varint size for 64-bit number.
|
||||||
let n = self.count.encode_var(&mut buf);
|
let n = self.count.encode_var(&mut buf);
|
||||||
w.write(&buf[..n])?;
|
w.write_all(&buf[..n])?;
|
||||||
total += n;
|
total += n;
|
||||||
h.write(&buf[..n]);
|
h.write(&buf[..n]);
|
||||||
|
|
||||||
|
@ -753,7 +766,8 @@ impl BlockSummary<i64> for IntegerBlockSummary {
|
||||||
// first write out the sign of the integer.
|
// first write out the sign of the integer.
|
||||||
let (sign, sum_bytes) = self.sum.to_bytes_be();
|
let (sign, sum_bytes) = self.sum.to_bytes_be();
|
||||||
let sign_bytes = [sign as u8];
|
let sign_bytes = [sign as u8];
|
||||||
total += w.write(&sign_bytes)?;
|
total += sign_bytes.len();
|
||||||
|
w.write_all(&sign_bytes)?;
|
||||||
h.write(&sign_bytes);
|
h.write(&sign_bytes);
|
||||||
|
|
||||||
// next, write out the number of bytes needed to store the big int data.
|
// next, write out the number of bytes needed to store the big int data.
|
||||||
|
@ -766,18 +780,21 @@ impl BlockSummary<i64> for IntegerBlockSummary {
|
||||||
.try_into()
|
.try_into()
|
||||||
.expect("sum_bytes.len() did not fit in u16");
|
.expect("sum_bytes.len() did not fit in u16");
|
||||||
let len_bytes = len.to_be_bytes();
|
let len_bytes = len.to_be_bytes();
|
||||||
total += w.write(&len_bytes)?;
|
total += len_bytes.len();
|
||||||
|
w.write_all(&len_bytes)?;
|
||||||
h.write(&len_bytes);
|
h.write(&len_bytes);
|
||||||
|
|
||||||
// finally, write out the variable number of bytes to represent the big
|
// finally, write out the variable number of bytes to represent the big
|
||||||
// int.
|
// int.
|
||||||
total += w.write(&sum_bytes)?;
|
total += sum_bytes.len();
|
||||||
|
w.write_all(&sum_bytes)?;
|
||||||
h.write(&sum_bytes);
|
h.write(&sum_bytes);
|
||||||
|
|
||||||
// The rest of the summary values are varint encoded i64s.
|
// The rest of the summary values are varint encoded i64s.
|
||||||
for v in &[self.first.1, self.last.1, self.min, self.max] {
|
for v in &[self.first.1, self.last.1, self.min, self.max] {
|
||||||
let n = v.encode_var(&mut buf);
|
let n = v.encode_var(&mut buf);
|
||||||
total += w.write(&buf[..n])?;
|
total += n;
|
||||||
|
w.write_all(&buf[..n])?;
|
||||||
h.write(&buf[..n]);
|
h.write(&buf[..n]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -839,7 +856,8 @@ impl BlockSummary<bool> for BoolBlockSummary {
|
||||||
fn write_to<W: Write, H: Hasher>(&self, w: &mut W, h: &mut H) -> Result<usize, StorageError> {
|
fn write_to<W: Write, H: Hasher>(&self, w: &mut W, h: &mut H) -> Result<usize, StorageError> {
|
||||||
let mut buf = [0; 10]; // Maximum varint size for 64-bit number.
|
let mut buf = [0; 10]; // Maximum varint size for 64-bit number.
|
||||||
let n = self.count.encode_var(&mut buf);
|
let n = self.count.encode_var(&mut buf);
|
||||||
let total = w.write(&buf[..n])?;
|
let total = n;
|
||||||
|
w.write_all(&buf[..n])?;
|
||||||
h.write(&buf[..n]);
|
h.write(&buf[..n]);
|
||||||
|
|
||||||
Ok(total)
|
Ok(total)
|
||||||
|
@ -900,7 +918,8 @@ impl<'a> BlockSummary<&'a str> for StringBlockSummary<'a> {
|
||||||
fn write_to<W: Write, H: Hasher>(&self, w: &mut W, h: &mut H) -> Result<usize, StorageError> {
|
fn write_to<W: Write, H: Hasher>(&self, w: &mut W, h: &mut H) -> Result<usize, StorageError> {
|
||||||
let mut buf = [0; 10]; // Maximum varint size for 64-bit number.
|
let mut buf = [0; 10]; // Maximum varint size for 64-bit number.
|
||||||
let n = self.count.encode_var(&mut buf);
|
let n = self.count.encode_var(&mut buf);
|
||||||
let total = w.write(&buf[..n])?;
|
let total = n;
|
||||||
|
w.write_all(&buf[..n])?;
|
||||||
h.write(&buf[..n]);
|
h.write(&buf[..n]);
|
||||||
|
|
||||||
Ok(total)
|
Ok(total)
|
||||||
|
@ -986,7 +1005,7 @@ impl BlockSummary<u64> for UnsignedBlockSummary {
|
||||||
|
|
||||||
let mut buf = [0; 10]; // Maximum varint size for 64-bit number.
|
let mut buf = [0; 10]; // Maximum varint size for 64-bit number.
|
||||||
let n = self.count.encode_var(&mut buf);
|
let n = self.count.encode_var(&mut buf);
|
||||||
w.write(&buf[..n])?;
|
w.write_all(&buf[..n])?;
|
||||||
total += n;
|
total += n;
|
||||||
h.write(&buf[..n]);
|
h.write(&buf[..n]);
|
||||||
|
|
||||||
|
@ -1001,18 +1020,21 @@ impl BlockSummary<u64> for UnsignedBlockSummary {
|
||||||
.try_into()
|
.try_into()
|
||||||
.expect("sum_bytes.len() did not fit in u16");
|
.expect("sum_bytes.len() did not fit in u16");
|
||||||
let sum_bytes_len_bytes = sum_bytes_len.to_be_bytes();
|
let sum_bytes_len_bytes = sum_bytes_len.to_be_bytes();
|
||||||
total += w.write(&sum_bytes_len_bytes)?;
|
total += sum_bytes_len_bytes.len();
|
||||||
|
w.write_all(&sum_bytes_len_bytes)?;
|
||||||
h.write(&sum_bytes_len_bytes);
|
h.write(&sum_bytes_len_bytes);
|
||||||
|
|
||||||
// finally, write out the variable number of bytes to represent the big
|
// finally, write out the variable number of bytes to represent the big
|
||||||
// int.
|
// int.
|
||||||
total += w.write(&sum_bytes)?;
|
total += sum_bytes.len();
|
||||||
|
w.write_all(&sum_bytes)?;
|
||||||
h.write(&sum_bytes);
|
h.write(&sum_bytes);
|
||||||
|
|
||||||
// The rest of the summary values are varint encoded i64s.
|
// The rest of the summary values are varint encoded i64s.
|
||||||
for v in &[self.first.1, self.last.1, self.min, self.max] {
|
for v in &[self.first.1, self.last.1, self.min, self.max] {
|
||||||
let n = v.encode_var(&mut buf);
|
let n = v.encode_var(&mut buf);
|
||||||
total += w.write(&buf[..n])?;
|
total += n;
|
||||||
|
w.write_all(&buf[..n])?;
|
||||||
h.write(&buf[..n]);
|
h.write(&buf[..n]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue