feat: add size_data to mutable batch (#6425)
This method will be used in the new ingestion pipeline to approximate how much memory a butable batch will take to convert to arrow and persist. It is meant only as a very rough estimate to trigger persistence for hot partitions. Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>pull/24376/head
parent
d96ba835bc
commit
84698b3532
|
@ -3314,6 +3314,7 @@ dependencies = [
|
|||
"hashbrown 0.13.1",
|
||||
"iox_time",
|
||||
"itertools",
|
||||
"mutable_batch_lp",
|
||||
"rand",
|
||||
"schema",
|
||||
"snafu",
|
||||
|
|
|
@ -19,4 +19,5 @@ itertools = "0.10"
|
|||
workspace-hack = { path = "../workspace-hack"}
|
||||
|
||||
[dev-dependencies]
|
||||
mutable_batch_lp = { path = "../mutable_batch_lp" }
|
||||
rand = "0.8"
|
||||
|
|
|
@ -243,6 +243,21 @@ impl Column {
|
|||
mem::size_of::<Self>() + data_size + self.valid.byte_len()
|
||||
}
|
||||
|
||||
/// The approximate memory size of the data in the column, not counting for stats or self or
|
||||
/// whatever extra space has been allocated for the vecs
|
||||
pub fn size_data(&self) -> usize {
|
||||
match &self.data {
|
||||
ColumnData::F64(_, _) => mem::size_of::<f64>() * self.len(),
|
||||
ColumnData::I64(_, _) => mem::size_of::<i64>() * self.len(),
|
||||
ColumnData::U64(_, _) => mem::size_of::<u64>() * self.len(),
|
||||
ColumnData::Bool(_, _) => mem::size_of::<bool>() * self.len(),
|
||||
ColumnData::Tag(_, dictionary, _) => {
|
||||
mem::size_of::<DID>() * self.len() + dictionary.size()
|
||||
}
|
||||
ColumnData::String(v, _) => v.size(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Converts this column to an arrow [`ArrayRef`]
|
||||
pub fn to_arrow(&self) -> Result<ArrayRef> {
|
||||
let nulls = self.valid.to_arrow();
|
||||
|
|
|
@ -212,6 +212,11 @@ impl MutableBatch {
|
|||
.sum::<usize>()
|
||||
+ self.columns.iter().map(|c| c.size()).sum::<usize>()
|
||||
}
|
||||
|
||||
/// Return the approximate memory size of the data in the batch, in bytes.
|
||||
pub fn size_data(&self) -> usize {
|
||||
self.columns.iter().map(|c| c.size_data()).sum::<usize>()
|
||||
}
|
||||
}
|
||||
|
||||
/// A description of the distribution of timestamps in a
|
||||
|
@ -249,3 +254,40 @@ impl TimestampSummary {
|
|||
self.record(Time::from_timestamp_nanos(timestamp_nanos))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use mutable_batch_lp::lines_to_batches;
|
||||
|
||||
#[test]
|
||||
fn size_data_without_nulls() {
|
||||
let batches = lines_to_batches(
|
||||
"cpu,t1=hello,t2=world f1=1.1,f2=1i 1234\ncpu,t1=h,t2=w f1=2.2,f2=2i 1234",
|
||||
0,
|
||||
)
|
||||
.unwrap();
|
||||
let batch = batches.get("cpu").unwrap();
|
||||
|
||||
assert_eq!(batch.size_data(), 128);
|
||||
|
||||
let batches = lines_to_batches(
|
||||
"cpu,t1=hellomore,t2=world f1=1.1,f2=1i 1234\ncpu,t1=h,t2=w f1=2.2,f2=2i 1234",
|
||||
0,
|
||||
)
|
||||
.unwrap();
|
||||
let batch = batches.get("cpu").unwrap();
|
||||
assert_eq!(batch.size_data(), 138);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn size_data_with_nulls() {
|
||||
let batches = lines_to_batches(
|
||||
"cpu,t1=hello,t2=world f1=1.1 1234\ncpu,t2=w f1=2.2,f2=2i 1234",
|
||||
0,
|
||||
)
|
||||
.unwrap();
|
||||
let batch = batches.get("cpu").unwrap();
|
||||
|
||||
assert_eq!(batch.size_data(), 124);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue