diff --git a/data_types/src/partition_metadata.rs b/data_types/src/partition_metadata.rs index eb9a5c1545..d766f0cea0 100644 --- a/data_types/src/partition_metadata.rs +++ b/data_types/src/partition_metadata.rs @@ -180,7 +180,7 @@ impl ColumnSummary { /// Return size in bytes of this Column metadata (not the underlying column) pub fn size(&self) -> usize { - mem::size_of::() + self.name.len() + mem::size_of_val(&self.stats) + mem::size_of::() + self.name.len() + self.stats.size() } // Updates statistics from other if the same type, otherwise a noop @@ -280,6 +280,14 @@ impl Statistics { Self::String(v) => v.max.as_deref().map(|x| Cow::Borrowed(x)), } } + + /// Return the size in bytes of this stats instance + pub fn size(&self) -> usize { + match self { + Self::String(v) => std::mem::size_of::() + v.string_size(), + _ => std::mem::size_of::(), + } + } } /// Summary statistics for a column. @@ -397,6 +405,14 @@ impl StatValues { } } +impl StatValues { + /// Returns the bytes associated by storing min/max string values + pub fn string_size(&self) -> usize { + self.min.as_ref().map(|x| x.len()).unwrap_or(0) + + self.max.as_ref().map(|x| x.len()).unwrap_or(0) + } +} + pub trait IsNan { fn is_nan(&self) -> bool; } diff --git a/mutable_buffer/src/chunk.rs b/mutable_buffer/src/chunk.rs index 504f42b755..f19079cf6d 100644 --- a/mutable_buffer/src/chunk.rs +++ b/mutable_buffer/src/chunk.rs @@ -157,8 +157,10 @@ impl Chunk { return Arc::clone(snapshot); } - // TODO: Incremental snapshot generation - let snapshot = Arc::new(ChunkSnapshot::new(self)); + let snapshot = Arc::new(ChunkSnapshot::new( + self, + self.metrics.memory_bytes.clone_empty(), + )); *guard = Some(Arc::clone(&snapshot)); snapshot } @@ -166,7 +168,10 @@ impl Chunk { /// Returns a queryable snapshot of this chunk #[cfg(feature = "nocache")] pub fn snapshot(&self) -> Arc { - Arc::new(ChunkSnapshot::new(self)) + Arc::new(ChunkSnapshot::new( + self, + self.metrics.memory_bytes.clone_empty(), + )) } /// Return the name of the table in this chunk @@ -201,6 +206,8 @@ impl Chunk { /// Return the approximate memory size of the chunk, in bytes including the /// dictionary, tables, and their rows. + /// + /// Note: This does not include the size of any cached ChunkSnapshot pub fn size(&self) -> usize { self.table.size() + self.dictionary.size() } diff --git a/mutable_buffer/src/chunk/snapshot.rs b/mutable_buffer/src/chunk/snapshot.rs index b8e44b3873..72d391007e 100644 --- a/mutable_buffer/src/chunk/snapshot.rs +++ b/mutable_buffer/src/chunk/snapshot.rs @@ -1,14 +1,23 @@ use std::collections::BTreeSet; use std::sync::Arc; -use arrow::record_batch::RecordBatch; -use data_types::timestamp::TimestampRange; -use internal_types::schema::{Schema, TIME_COLUMN_NAME}; -use internal_types::selection::Selection; +use arrow::{ + array::DictionaryArray, + datatypes::{DataType, Int32Type}, + record_batch::RecordBatch, +}; use snafu::{ensure, ResultExt, Snafu}; +use data_types::partition_metadata::TableSummary; +use data_types::timestamp::TimestampRange; +use data_types::{ + error::ErrorLogger, + partition_metadata::{ColumnSummary, Statistics}, +}; +use internal_types::schema::{Schema, TIME_COLUMN_NAME}; +use internal_types::selection::Selection; + use super::Chunk; -use data_types::{error::ErrorLogger, partition_metadata::Statistics}; #[derive(Debug, Snafu)] pub enum Error { @@ -29,12 +38,12 @@ pub struct ChunkSnapshot { schema: Schema, batch: RecordBatch, table_name: Arc, - timestamp_range: Option, - // TODO: Memory tracking + stats: Vec, + memory: metrics::GaugeValue, } impl ChunkSnapshot { - pub fn new(chunk: &Chunk) -> Self { + pub(crate) fn new(chunk: &Chunk, memory: metrics::GaugeValue) -> Self { let table = &chunk.table; let schema = table @@ -47,29 +56,19 @@ impl ChunkSnapshot { .log_if_error("ChunkSnapshot converting table to arrow") .unwrap(); - let timestamp_range = - chunk - .dictionary - .lookup_value(TIME_COLUMN_NAME) - .and_then(|column_id| { - table - .column(column_id) - .ok() - .and_then(|column| match column.stats() { - Statistics::I64(stats) => match (stats.min, stats.max) { - (Some(min), Some(max)) => Some(TimestampRange::new(min, max)), - _ => None, - }, - _ => None, - }) - }); + // The returned record batch has its columns sorted by name so must also sort the stats + let mut stats = table.stats(&chunk.dictionary); + stats.sort_by(|a, b| a.name.cmp(&b.name)); - Self { + let mut s = Self { schema, batch, table_name: Arc::clone(&chunk.table_name), - timestamp_range, - } + stats, + memory, + }; + s.memory.set(s.size()); + s } /// returns true if there is no data in this snapshot @@ -158,12 +157,85 @@ impl ChunkSnapshot { }) } - fn matches_predicate(&self, timestamp_range: &Option) -> bool { - match (self.timestamp_range, timestamp_range) { - (Some(a), Some(b)) => !a.disjoint(b), - (None, Some(_)) => false, /* If this chunk doesn't have a time column it can't match */ - // the predicate - (_, None) => true, + /// Returns a vec of the summary statistics of the tables in this chunk + pub fn table_summary(&self) -> TableSummary { + TableSummary { + name: self.table_name.to_string(), + columns: self.stats.clone(), } } + + /// Return the approximate memory size of the chunk, in bytes including the + /// dictionary, tables, statistics and their rows. + pub fn size(&self) -> usize { + let columns = self.column_sizes().map(|(_, size)| size).sum::(); + let stats = self.stats.iter().map(|c| c.size()).sum::(); + columns + stats + std::mem::size_of::() + } + + /// Returns the number of bytes taken up by the shared dictionary + pub fn dictionary_size(&self) -> usize { + self.batch + .columns() + .iter() + .filter_map(|array| { + let dict = array + .as_any() + .downcast_ref::>()?; + let values = dict.values(); + Some(values.get_buffer_memory_size() + values.get_array_memory_size()) + }) + .next() + .unwrap_or(0) + } + + /// Returns an iterator over (column_name, estimated_size) for all + /// columns in this chunk. + /// + /// Dictionary-encoded columns do not include the size of the shared dictionary + /// in their reported total + /// + /// This is instead returned as a special "__dictionary" column + pub fn column_sizes(&self) -> impl Iterator + '_ { + let dictionary_size = self.dictionary_size(); + self.batch + .columns() + .iter() + .zip(self.stats.iter()) + .map(move |(array, summary)| { + let size = match array.data_type() { + // Dictionary is only encoded once for all columns + DataType::Dictionary(_, _) => { + array.get_array_memory_size() + array.get_buffer_memory_size() + - dictionary_size + } + _ => array.get_array_memory_size() + array.get_buffer_memory_size(), + }; + (summary.name.as_str(), size) + }) + .chain(std::iter::once(("__dictionary", dictionary_size))) + } + + /// Return the number of rows in this chunk + pub fn rows(&self) -> usize { + self.batch.num_rows() + } + + fn matches_predicate(&self, timestamp_range: &Option) -> bool { + let timestamp_range = match timestamp_range { + Some(t) => t, + None => return true, + }; + + self.schema + .find_index_of(TIME_COLUMN_NAME) + .and_then(|idx| match &self.stats[idx].stats { + Statistics::I64(stats) => Some( + !TimestampRange::new(stats.min? as _, stats.max? as _) + .disjoint(timestamp_range), + ), + _ => panic!("invalid statistics for time column"), + }) + .unwrap_or(false) // If no time column or no time column values - cannot match + } } diff --git a/mutable_buffer/src/column.rs b/mutable_buffer/src/column.rs index 2eca3c8e69..87c04e9e3a 100644 --- a/mutable_buffer/src/column.rs +++ b/mutable_buffer/src/column.rs @@ -276,9 +276,7 @@ impl Column { ColumnData::Bool(v, stats) => v.byte_len() + mem::size_of_val(&stats), ColumnData::Tag(v, stats) => mem::size_of::() * v.len() + mem::size_of_val(&stats), ColumnData::String(v, stats) => { - let string_bytes_size = v.iter().fold(0, |acc, val| acc + val.len()); - let vec_pointer_sizes = mem::size_of::() * v.len(); - string_bytes_size + vec_pointer_sizes + mem::size_of_val(&stats) + v.size() + mem::size_of_val(&stats) + stats.string_size() } }; data_size + self.valid.byte_len() diff --git a/mutable_buffer/src/table.rs b/mutable_buffer/src/table.rs index d0ec784e4c..870f0b15c7 100644 --- a/mutable_buffer/src/table.rs +++ b/mutable_buffer/src/table.rs @@ -189,8 +189,8 @@ impl Table { Ok(()) } - /// Returns the column selection for all the columns in this table, orderd - /// by table name + /// Returns the column selection for all the columns in this table, ordered + /// by column name fn all_columns_selection<'a>( &self, dictionary: &'a Dictionary, diff --git a/server/src/db.rs b/server/src/db.rs index de727fa97a..b405f0a2c8 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -588,7 +588,6 @@ impl Db { info!(%partition_key, %table_name, %chunk_id, "chunk marked MOVING, loading tables into read buffer"); - let mut batches = Vec::new(); let table_summary = mb_chunk.table_summary(); // create a new read buffer chunk with memory tracking @@ -602,17 +601,15 @@ impl Db { // load table into the new chunk one by one. debug!(%partition_key, %table_name, %chunk_id, table=%table_summary.name, "loading table to read buffer"); - mb_chunk - .table_to_arrow(&mut batches, Selection::All) + let batch = mb_chunk + .read_filter(table_name, Selection::All) // It is probably reasonable to recover from this error // (reset the chunk state to Open) but until that is // implemented (and tested) just panic .expect("Loading chunk to mutable buffer"); - for batch in batches.drain(..) { - let sorted = sort_record_batch(batch).expect("failed to sort"); - rb_chunk.upsert_table(&table_summary.name, sorted) - } + let sorted = sort_record_batch(batch).expect("failed to sort"); + rb_chunk.upsert_table(&table_summary.name, sorted); // Relock the chunk again (nothing else should have been able // to modify the chunk state while we were moving it @@ -1428,7 +1425,8 @@ mod tests { .eq(1.0) .unwrap(); - catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "mutable_buffer", 88).unwrap(); + catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "mutable_buffer", 1111) + .unwrap(); db.load_chunk_to_read_buffer("1970-01-01T00", "cpu", 0) .await @@ -2338,7 +2336,7 @@ mod tests { Arc::from("cpu"), 0, ChunkStorage::ClosedMutableBuffer, - 129, + 2126, 1, ), ChunkSummary::new_without_timestamps( @@ -2364,7 +2362,7 @@ mod tests { .memory() .mutable_buffer() .get_total(), - 100 + 129 + 131 + 100 + 2126 + 131 ); assert_eq!( db.catalog diff --git a/server/src/db/catalog/chunk.rs b/server/src/db/catalog/chunk.rs index 3abb0c9b0f..6c2975784a 100644 --- a/server/src/db/catalog/chunk.rs +++ b/server/src/db/catalog/chunk.rs @@ -5,7 +5,7 @@ use data_types::{ chunk_metadata::{ChunkColumnSummary, ChunkStorage, ChunkSummary, DetailedChunkSummary}, partition_metadata::TableSummary, }; -use mutable_buffer::chunk::Chunk as MBChunk; +use mutable_buffer::chunk::{snapshot::ChunkSnapshot as MBChunkSnapshot, Chunk as MBChunk}; use parquet_file::chunk::Chunk as ParquetChunk; use read_buffer::Chunk as ReadBufferChunk; @@ -48,7 +48,7 @@ pub enum ChunkState { Open(MBChunk), /// Chunk is closed for new writes - Closed(Arc), + Closed(Arc), /// Chunk has been completely loaded in the read buffer Moved(Arc), @@ -384,15 +384,15 @@ impl Chunk { } /// Set the chunk to the Closed state - pub fn set_closed(&mut self) -> Result> { + pub fn set_closed(&mut self) -> Result> { let mut s = ChunkState::Invalid; std::mem::swap(&mut s, &mut self.state); match s { - ChunkState::Open(s) => { + ChunkState::Open(chunk) => { assert!(self.time_closed.is_none()); self.time_closed = Some(Utc::now()); - let s = Arc::new(s); + let s = chunk.snapshot(); self.state = ChunkState::Closed(Arc::clone(&s)); self.metrics .state @@ -400,7 +400,7 @@ impl Chunk { self.metrics .immutable_chunk_size - .observe_with_labels(s.size() as f64, &[KeyValue::new("state", "closed")]); + .observe_with_labels(chunk.size() as f64, &[KeyValue::new("state", "closed")]); Ok(s) } @@ -415,7 +415,7 @@ impl Chunk { /// storage /// /// If called on an open chunk will first close the chunk - pub fn set_moving(&mut self) -> Result> { + pub fn set_moving(&mut self) -> Result> { // This ensures the closing logic is consistent but doesn't break code that // assumes a chunk can be moved from open if matches!(self.state, ChunkState::Open(_)) { diff --git a/server/src/db/chunk.rs b/server/src/db/chunk.rs index f6b053de41..65db43c885 100644 --- a/server/src/db/chunk.rs +++ b/server/src/db/chunk.rs @@ -100,7 +100,7 @@ impl DbChunk { chunk: chunk.snapshot(), }, ChunkState::Closed(chunk) => State::MutableBuffer { - chunk: chunk.snapshot(), + chunk: Arc::clone(chunk), }, ChunkState::Moved(chunk) => State::ReadBuffer { chunk: Arc::clone(chunk),