Merge pull request #2456 from influxdata/crepererum/issue1565

feat: correctly account MUB sizes
pull/24376/head
kodiakhq[bot] 2021-09-03 07:24:14 +00:00 committed by GitHub
commit 704a892003
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 171 additions and 97 deletions

View File

@ -93,7 +93,7 @@ impl<K: AsPrimitive<usize> + FromPrimitive + Zero> PackedStringArray<K> {
/// Return the amount of memory in bytes taken up by this array
pub fn size(&self) -> usize {
self.storage.len() + self.offsets.len() * std::mem::size_of::<K>()
self.storage.capacity() + self.offsets.capacity() * std::mem::size_of::<K>()
}
pub fn into_inner(self) -> (Vec<K>, String) {

View File

@ -126,23 +126,23 @@ impl MBChunk {
Ok(())
}
/// Returns a queryable snapshot of this chunk
/// Returns a queryable snapshot of this chunk and an indicator if the snapshot was just cached.
#[cfg(not(feature = "nocache"))]
pub fn snapshot(&self) -> Arc<ChunkSnapshot> {
pub fn snapshot(&self) -> (Arc<ChunkSnapshot>, bool) {
let mut guard = self.snapshot.lock();
if let Some(snapshot) = &*guard {
return Arc::clone(snapshot);
return (Arc::clone(snapshot), false);
}
let snapshot = Arc::new(ChunkSnapshot::new(self));
*guard = Some(Arc::clone(&snapshot));
snapshot
(snapshot, true)
}
/// Returns a queryable snapshot of this chunk
/// Returns a queryable snapshot of this chunk and an indicator if the snapshot was just cached.
#[cfg(feature = "nocache")]
pub fn snapshot(&self) -> Arc<ChunkSnapshot> {
Arc::new(ChunkSnapshot::new(self))
pub fn snapshot(&self) -> (Arc<ChunkSnapshot>, bool) {
(Arc::new(ChunkSnapshot::new(self)), false)
}
/// Return the name of the table in this chunk
@ -227,14 +227,26 @@ impl MBChunk {
/// Return the approximate memory size of the chunk, in bytes including the
/// dictionary, tables, and their rows.
///
/// This includes the size of `self`.
///
/// Note: This does not include the size of any cached ChunkSnapshot
pub fn size(&self) -> usize {
// TODO: Better accounting of non-column data (#1565)
self.columns
let size_self = std::mem::size_of::<Self>();
let size_columns = self
.columns
.iter()
.map(|(k, v)| k.len() + v.size())
.sum::<usize>()
+ self.table_name.len()
.map(|(k, v)| k.capacity() + v.size())
.sum::<usize>();
let size_table_name = self.table_name.len();
let snapshot_size = {
let guard = self.snapshot.lock();
guard.as_ref().map(|snapshot| snapshot.size()).unwrap_or(0)
};
size_self + size_columns + size_table_name + snapshot_size
}
/// Returns an iterator over (column_name, estimated_size) for all
@ -814,12 +826,16 @@ mod tests {
let lp = vec!["cpu,host=a val=23 1", "cpu,host=b val=2 1"].join("\n");
let mut chunk = write_lp_to_new_chunk(&lp).unwrap();
let s1 = chunk.snapshot();
let s2 = chunk.snapshot();
let (s1, c1) = chunk.snapshot();
assert!(c1);
let (s2, c2) = chunk.snapshot();
assert!(!c2);
write_lp_to_chunk(&lp, &mut chunk).unwrap();
let s3 = chunk.snapshot();
let s4 = chunk.snapshot();
let (s3, c3) = chunk.snapshot();
assert!(c3);
let (s4, c4) = chunk.snapshot();
assert!(!c4);
assert_eq!(Arc::as_ptr(&s1), Arc::as_ptr(&s2));
assert_ne!(Arc::as_ptr(&s1), Arc::as_ptr(&s3));
@ -846,8 +862,12 @@ mod tests {
write_lp_to_chunk(&lp, &mut chunk).unwrap();
let s3 = chunk.size();
// Should increase by a constant amount each time
assert_eq!(s2 - s1, s3 - s2);
// Should increase or stay identical (if array capacities are sufficient) each time
assert!(s2 >= s1);
assert!(s3 >= s2);
// also assume that we wrote enough data to bump the capacity at least once
assert!(s3 > s1);
}
#[test]

View File

@ -319,24 +319,29 @@ impl Column {
}
}
/// The approximate memory size of the data in the column. Note that
/// the space taken for the tag string values is represented in
/// the dictionary size in the chunk that holds the table that has this
/// column. The size returned here is only for their identifiers.
/// The approximate memory size of the data in the column.
///
/// This includes the size of `self`.
pub fn size(&self) -> usize {
let data_size = match &self.data {
ColumnData::F64(v, stats) => mem::size_of::<f64>() * v.len() + mem::size_of_val(&stats),
ColumnData::I64(v, stats) => mem::size_of::<i64>() * v.len() + mem::size_of_val(&stats),
ColumnData::U64(v, stats) => mem::size_of::<u64>() * v.len() + mem::size_of_val(&stats),
ColumnData::F64(v, stats) => {
mem::size_of::<f64>() * v.capacity() + mem::size_of_val(&stats)
}
ColumnData::I64(v, stats) => {
mem::size_of::<i64>() * v.capacity() + mem::size_of_val(&stats)
}
ColumnData::U64(v, stats) => {
mem::size_of::<u64>() * v.capacity() + mem::size_of_val(&stats)
}
ColumnData::Bool(v, stats) => v.byte_len() + mem::size_of_val(&stats),
ColumnData::Tag(v, dictionary, stats) => {
mem::size_of::<DID>() * v.len() + dictionary.size() + mem::size_of_val(&stats)
mem::size_of::<DID>() * v.capacity() + dictionary.size() + mem::size_of_val(&stats)
}
ColumnData::String(v, stats) => {
v.size() + mem::size_of_val(&stats) + stats.string_size()
}
};
data_size + self.valid.byte_len()
mem::size_of::<Self>() + data_size + self.valid.byte_len()
}
pub fn to_arrow(&self) -> Result<ArrayRef> {

View File

@ -312,8 +312,8 @@ async fn sql_select_from_system_chunks() {
"+----+---------------+------------+-------------------+--------------+-----------+",
"| id | partition_key | table_name | storage | memory_bytes | row_count |",
"+----+---------------+------------+-------------------+--------------+-----------+",
"| 0 | 1970-01-01T00 | h2o | OpenMutableBuffer | 213 | 3 |",
"| 0 | 1970-01-01T00 | o2 | OpenMutableBuffer | 177 | 2 |",
"| 0 | 1970-01-01T00 | h2o | OpenMutableBuffer | 1639 | 3 |",
"| 0 | 1970-01-01T00 | o2 | OpenMutableBuffer | 1635 | 2 |",
"+----+---------------+------------+-------------------+--------------+-----------+",
];
run_sql_test_case!(
@ -368,15 +368,15 @@ async fn sql_select_from_system_chunk_columns() {
"| 1970-01-01T00 | 0 | h2o | state | ReadBuffer | 2 | 0 | MA | MA | 347 |",
"| 1970-01-01T00 | 0 | h2o | temp | ReadBuffer | 2 | 1 | 70.4 | 70.4 | 471 |",
"| 1970-01-01T00 | 0 | h2o | time | ReadBuffer | 2 | 0 | 50 | 250 | 110 |",
"| 1970-01-01T00 | 0 | o2 | city | OpenMutableBuffer | 2 | 1 | Boston | Boston | 35 |",
"| 1970-01-01T00 | 0 | o2 | reading | OpenMutableBuffer | 2 | 1 | 51 | 51 | 25 |",
"| 1970-01-01T00 | 0 | o2 | state | OpenMutableBuffer | 2 | 0 | CA | MA | 41 |",
"| 1970-01-01T00 | 0 | o2 | temp | OpenMutableBuffer | 2 | 0 | 53.4 | 79 | 25 |",
"| 1970-01-01T00 | 0 | o2 | time | OpenMutableBuffer | 2 | 0 | 50 | 300 | 25 |",
"| 1970-01-01T00 | 1 | h2o | city | OpenMutableBuffer | 1 | 0 | Boston | Boston | 31 |",
"| 1970-01-01T00 | 1 | h2o | other_temp | OpenMutableBuffer | 1 | 0 | 72.4 | 72.4 | 17 |",
"| 1970-01-01T00 | 1 | h2o | state | OpenMutableBuffer | 1 | 0 | CA | CA | 27 |",
"| 1970-01-01T00 | 1 | h2o | time | OpenMutableBuffer | 1 | 0 | 350 | 350 | 17 |",
"| 1970-01-01T00 | 0 | o2 | city | OpenMutableBuffer | 2 | 1 | Boston | Boston | 309 |",
"| 1970-01-01T00 | 0 | o2 | reading | OpenMutableBuffer | 2 | 1 | 51 | 51 | 297 |",
"| 1970-01-01T00 | 0 | o2 | state | OpenMutableBuffer | 2 | 0 | CA | MA | 313 |",
"| 1970-01-01T00 | 0 | o2 | temp | OpenMutableBuffer | 2 | 0 | 53.4 | 79 | 297 |",
"| 1970-01-01T00 | 0 | o2 | time | OpenMutableBuffer | 2 | 0 | 50 | 300 | 297 |",
"| 1970-01-01T00 | 1 | h2o | city | OpenMutableBuffer | 1 | 0 | Boston | Boston | 309 |",
"| 1970-01-01T00 | 1 | h2o | other_temp | OpenMutableBuffer | 1 | 0 | 72.4 | 72.4 | 297 |",
"| 1970-01-01T00 | 1 | h2o | state | OpenMutableBuffer | 1 | 0 | CA | CA | 309 |",
"| 1970-01-01T00 | 1 | h2o | time | OpenMutableBuffer | 1 | 0 | 350 | 350 | 297 |",
"+---------------+----------+------------+-------------+-------------------+-----------+------------+-----------+-----------+--------------+",
];
run_sql_test_case!(

View File

@ -2107,13 +2107,18 @@ mod tests {
assert_metric("catalog_loaded_rows", "object_store", 0.0);
// verify chunk size updated
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "mutable_buffer", 44).unwrap();
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "mutable_buffer", 700)
.unwrap();
// write into same chunk again.
write_lp(db.as_ref(), "cpu bar=2 20").await;
write_lp(db.as_ref(), "cpu bar=3 30").await;
write_lp(db.as_ref(), "cpu bar=4 40").await;
write_lp(db.as_ref(), "cpu bar=5 50").await;
// verify chunk size updated
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "mutable_buffer", 60).unwrap();
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "mutable_buffer", 764)
.unwrap();
// Still only one chunk open
test_db
@ -2131,7 +2136,7 @@ mod tests {
assert_metric("catalog_loaded_chunks", "mutable_buffer", 1.0);
assert_metric("catalog_loaded_chunks", "read_buffer", 0.0);
assert_metric("catalog_loaded_chunks", "object_store", 0.0);
assert_metric("catalog_loaded_rows", "mutable_buffer", 2.0);
assert_metric("catalog_loaded_rows", "mutable_buffer", 5.0);
assert_metric("catalog_loaded_rows", "read_buffer", 0.0);
assert_metric("catalog_loaded_rows", "object_store", 0.0);
@ -2153,7 +2158,7 @@ mod tests {
assert_metric("catalog_loaded_chunks", "mutable_buffer", 1.0);
assert_metric("catalog_loaded_chunks", "read_buffer", 0.0);
assert_metric("catalog_loaded_chunks", "object_store", 0.0);
assert_metric("catalog_loaded_rows", "mutable_buffer", 2.0);
assert_metric("catalog_loaded_rows", "mutable_buffer", 5.0);
assert_metric("catalog_loaded_rows", "read_buffer", 0.0);
assert_metric("catalog_loaded_rows", "object_store", 0.0);
@ -2181,12 +2186,12 @@ mod tests {
assert_metric("catalog_loaded_chunks", "read_buffer", 1.0);
assert_metric("catalog_loaded_chunks", "object_store", 0.0);
assert_metric("catalog_loaded_rows", "mutable_buffer", 0.0);
assert_metric("catalog_loaded_rows", "read_buffer", 2.0);
assert_metric("catalog_loaded_rows", "read_buffer", 5.0);
assert_metric("catalog_loaded_rows", "object_store", 0.0);
// verify chunk size updated (chunk moved from closing to moving to moved)
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "mutable_buffer", 0).unwrap();
let expected_read_buffer_size = 1916;
let expected_read_buffer_size = 1922;
catalog_chunk_size_bytes_metric_eq(
&test_db.metric_registry,
"read_buffer",
@ -2234,8 +2239,8 @@ mod tests {
assert_metric("catalog_loaded_chunks", "read_buffer", 1.0);
assert_metric("catalog_loaded_chunks", "object_store", 1.0);
assert_metric("catalog_loaded_rows", "mutable_buffer", 0.0);
assert_metric("catalog_loaded_rows", "read_buffer", 2.0);
assert_metric("catalog_loaded_rows", "object_store", 2.0);
assert_metric("catalog_loaded_rows", "read_buffer", 5.0);
assert_metric("catalog_loaded_rows", "object_store", 5.0);
db.unload_read_buffer("cpu", "1970-01-01T00", 1).unwrap();
@ -2253,7 +2258,7 @@ mod tests {
assert_metric("catalog_loaded_chunks", "object_store", 1.0);
assert_metric("catalog_loaded_rows", "mutable_buffer", 0.0);
assert_metric("catalog_loaded_rows", "read_buffer", 0.0);
assert_metric("catalog_loaded_rows", "object_store", 2.0);
assert_metric("catalog_loaded_rows", "object_store", 5.0);
// verify chunk size not increased for OS (it was in OS before unload)
catalog_chunk_size_bytes_metric_eq(
@ -2574,7 +2579,7 @@ mod tests {
("svr_id", "1"),
])
.histogram()
.sample_sum_eq(280.0)
.sample_sum_eq(5085.0)
.unwrap();
// RB chunk size
@ -3161,7 +3166,7 @@ mod tests {
id: 0,
storage: ChunkStorage::OpenMutableBuffer,
lifecycle_action: None,
memory_bytes: 70, // memory_size
memory_bytes: 1006, // memory_size
object_store_bytes: 0, // os_size
row_count: 1,
time_of_last_access: None,
@ -3479,7 +3484,7 @@ mod tests {
id: 1,
storage: ChunkStorage::OpenMutableBuffer,
lifecycle_action,
memory_bytes: 87,
memory_bytes: 1303,
object_store_bytes: 0, // no OS chunks
row_count: 1,
time_of_last_access: None,
@ -3501,7 +3506,7 @@ mod tests {
);
}
assert_eq!(db.catalog.metrics().memory().mutable_buffer(), 2486 + 87);
assert_eq!(db.catalog.metrics().memory().mutable_buffer(), 2486 + 1303);
assert_eq!(db.catalog.metrics().memory().read_buffer(), 2766);
assert_eq!(db.catalog.metrics().memory().object_store(), 2007);
}

View File

@ -276,7 +276,7 @@ impl CatalogChunk {
.state
.inc_with_attributes(&[KeyValue::new("state", "open")]);
let mut chunk = Self {
let chunk = Self {
addr,
stage,
lifecycle_action: None,
@ -313,7 +313,7 @@ impl CatalogChunk {
.state
.inc_with_attributes(&[KeyValue::new("state", "compacted")]);
let mut chunk = Self {
let chunk = Self {
addr,
stage,
lifecycle_action: None,
@ -350,7 +350,7 @@ impl CatalogChunk {
meta,
};
let mut chunk = Self {
let chunk = Self {
addr,
stage,
lifecycle_action: None,
@ -412,7 +412,7 @@ impl CatalogChunk {
}
/// Updates `self.metrics` to match the contents of `self.stage`
fn update_metrics(&mut self) {
pub fn update_metrics(&self) {
match &self.stage {
ChunkStage::Open { mb_chunk } => {
self.metrics.memory_metrics.set_mub_only(mb_chunk.size());
@ -627,7 +627,7 @@ impl CatalogChunk {
assert!(self.time_closed.is_none());
self.time_closed = Some(Utc::now());
let s = mb_chunk.snapshot();
let (s, _) = mb_chunk.snapshot();
self.metrics
.state
.inc_with_attributes(&[KeyValue::new("state", "closed")]);
@ -880,9 +880,7 @@ impl CatalogChunk {
self.set_lifecycle_action(ChunkLifecycleAction::Dropping, registration)?;
// set memory metrics to 0 to stop accounting for this chunk within the catalog
self.metrics.memory_metrics.mutable_buffer.set(0);
self.metrics.memory_metrics.read_buffer.set(0);
self.metrics.memory_metrics.object_store.set(0);
self.metrics.memory_metrics.set_to_zero();
Ok(())
}

View File

@ -214,82 +214,122 @@ impl PartitionMetrics {
///
/// This can then be used within each `CatalogChunk` to record its observations for
/// the different storages
#[derive(Debug)]
pub struct StorageGauge {
pub(super) mutable_buffer: GaugeValue,
pub(super) read_buffer: GaugeValue,
pub(super) object_store: GaugeValue,
inner: Mutex<StorageGaugeInner>,
}
impl std::fmt::Debug for StorageGauge {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("StorageGauge").finish_non_exhaustive()
}
}
struct StorageGaugeInner {
mutable_buffer: GaugeValue,
read_buffer: GaugeValue,
object_store: GaugeValue,
}
impl StorageGauge {
pub(super) fn new_unregistered() -> Self {
Self {
let inner = StorageGaugeInner {
mutable_buffer: GaugeValue::new_unregistered(),
read_buffer: GaugeValue::new_unregistered(),
object_store: GaugeValue::new_unregistered(),
};
Self {
inner: Mutex::new(inner),
}
}
pub(super) fn new(gauge: &Gauge) -> Self {
Self {
let inner = StorageGaugeInner {
mutable_buffer: gauge.gauge_value(&[KeyValue::new("location", "mutable_buffer")]),
read_buffer: gauge.gauge_value(&[KeyValue::new("location", "read_buffer")]),
object_store: gauge.gauge_value(&[KeyValue::new("location", "object_store")]),
};
Self {
inner: Mutex::new(inner),
}
}
pub(super) fn set_mub_only(&mut self, value: usize) {
self.mutable_buffer.set(value);
self.read_buffer.set(0);
self.object_store.set(0);
pub(super) fn set_mub_only(&self, value: usize) {
let mut guard = self.inner.lock();
guard.mutable_buffer.set(value);
guard.read_buffer.set(0);
guard.object_store.set(0);
}
pub(super) fn set_rub_only(&mut self, value: usize) {
self.mutable_buffer.set(0);
self.read_buffer.set(value);
self.object_store.set(0);
pub(super) fn set_rub_only(&self, value: usize) {
let mut guard = self.inner.lock();
guard.mutable_buffer.set(0);
guard.read_buffer.set(value);
guard.object_store.set(0);
}
pub(super) fn set_rub_and_object_store_only(&mut self, rub: usize, parquet: usize) {
self.mutable_buffer.set(0);
self.read_buffer.set(rub);
self.object_store.set(parquet);
pub(super) fn set_rub_and_object_store_only(&self, rub: usize, parquet: usize) {
let mut guard = self.inner.lock();
guard.mutable_buffer.set(0);
guard.read_buffer.set(rub);
guard.object_store.set(parquet);
}
pub(super) fn set_object_store_only(&mut self, value: usize) {
self.mutable_buffer.set(0);
self.read_buffer.set(0);
self.object_store.set(value);
pub(super) fn set_object_store_only(&self, value: usize) {
let mut guard = self.inner.lock();
guard.mutable_buffer.set(0);
guard.read_buffer.set(0);
guard.object_store.set(value);
}
pub(super) fn set_to_zero(&self) {
let mut guard = self.inner.lock();
guard.mutable_buffer.set(0);
guard.read_buffer.set(0);
guard.object_store.set(0);
}
fn clone_empty(&self) -> Self {
let guard = self.inner.lock();
let inner = StorageGaugeInner {
mutable_buffer: guard.mutable_buffer.clone_empty(),
read_buffer: guard.read_buffer.clone_empty(),
object_store: guard.object_store.clone_empty(),
};
Self {
mutable_buffer: self.mutable_buffer.clone_empty(),
read_buffer: self.read_buffer.clone_empty(),
object_store: self.object_store.clone_empty(),
inner: Mutex::new(inner),
}
}
/// Returns the total for the mutable buffer
pub fn mutable_buffer(&self) -> usize {
self.mutable_buffer.get_total()
let guard = self.inner.lock();
guard.mutable_buffer.get_total()
}
/// Returns the total for the read buffer
pub fn read_buffer(&self) -> usize {
self.read_buffer.get_total()
let guard = self.inner.lock();
guard.read_buffer.get_total()
}
/// Returns the total for object storage
pub fn object_store(&self) -> usize {
self.object_store.get_total()
let guard = self.inner.lock();
guard.object_store.get_total()
}
/// Returns the total over all storages
pub fn total(&self) -> usize {
self.mutable_buffer.get_total()
+ self.read_buffer.get_total()
+ self.object_store.get_total()
let guard = self.inner.lock();
guard.mutable_buffer.get_total()
+ guard.read_buffer.get_total()
+ guard.object_store.get_total()
}
}

View File

@ -110,7 +110,13 @@ impl DbChunk {
let (state, meta) = match chunk.stage() {
ChunkStage::Open { mb_chunk, .. } => {
let snapshot = mb_chunk.snapshot();
let (snapshot, just_cached) = mb_chunk.snapshot();
// the snapshot might be cached, so we need to update the chunk metrics
if just_cached {
chunk.update_metrics();
}
let state = State::MutableBuffer {
chunk: Arc::clone(&snapshot),
};

View File

@ -742,7 +742,7 @@ mod tests {
tokio::sync::Mutex::new(Box::new(write_buffer) as _),
)))
.lifecycle_rules(data_types::database_rules::LifecycleRules {
buffer_size_hard: Some(NonZeroUsize::new(10_000).unwrap()),
buffer_size_hard: Some(NonZeroUsize::new(12_000).unwrap()),
late_arrive_window_seconds: NonZeroU32::try_from(1).unwrap(),
catalog_transactions_until_checkpoint,
mub_row_threshold: NonZeroUsize::new(10).unwrap(),

View File

@ -501,7 +501,7 @@ async fn test_chunk_get() {
id: 0,
storage: ChunkStorage::OpenMutableBuffer.into(),
lifecycle_action,
memory_bytes: 100,
memory_bytes: 1016,
object_store_bytes: 0,
row_count: 2,
time_of_last_access: None,
@ -515,7 +515,7 @@ async fn test_chunk_get() {
id: 0,
storage: ChunkStorage::OpenMutableBuffer.into(),
lifecycle_action,
memory_bytes: 82,
memory_bytes: 1018,
object_store_bytes: 0,
row_count: 1,
time_of_last_access: None,
@ -686,7 +686,7 @@ async fn test_list_partition_chunks() {
id: 0,
storage: ChunkStorage::OpenMutableBuffer.into(),
lifecycle_action: ChunkLifecycleAction::Unspecified.into(),
memory_bytes: 100,
memory_bytes: 1016,
object_store_bytes: 0,
row_count: 2,
time_of_last_access: None,

View File

@ -271,7 +271,7 @@ async fn test_get_chunks() {
.and(predicate::str::contains(
r#""storage": "OpenMutableBuffer","#,
))
.and(predicate::str::contains(r#""memory_bytes": 100"#))
.and(predicate::str::contains(r#""memory_bytes": 1016"#))
// Check for a non empty timestamp such as
// "time_of_first_write": "2021-03-30T17:11:10.723866Z",
.and(predicate::str::contains(r#""time_of_first_write": "20"#));