feat: only store ChunkSnapshot in Closed state (#1560)

* feat: only store ChunkSnapshot in Closed state

* chore: review feedback

* feat: record MUB size as closed size

* chore: document column ordering assumption

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Raphael Taylor-Davies 2021-05-27 11:36:47 +01:00 committed by GitHub
parent 4fcc04e6c9
commit 792bff07d1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 151 additions and 60 deletions

View File

@ -180,7 +180,7 @@ impl ColumnSummary {
/// Return size in bytes of this Column metadata (not the underlying column)
pub fn size(&self) -> usize {
mem::size_of::<Self>() + self.name.len() + mem::size_of_val(&self.stats)
mem::size_of::<Self>() + self.name.len() + self.stats.size()
}
// Updates statistics from other if the same type, otherwise a noop
@ -280,6 +280,14 @@ impl Statistics {
Self::String(v) => v.max.as_deref().map(|x| Cow::Borrowed(x)),
}
}
/// Return the size in bytes of this stats instance
pub fn size(&self) -> usize {
match self {
Self::String(v) => std::mem::size_of::<Self>() + v.string_size(),
_ => std::mem::size_of::<Self>(),
}
}
}
/// Summary statistics for a column.
@ -397,6 +405,14 @@ impl<T> StatValues<T> {
}
}
impl StatValues<String> {
/// Returns the bytes associated by storing min/max string values
pub fn string_size(&self) -> usize {
self.min.as_ref().map(|x| x.len()).unwrap_or(0)
+ self.max.as_ref().map(|x| x.len()).unwrap_or(0)
}
}
pub trait IsNan {
fn is_nan(&self) -> bool;
}

View File

@ -157,8 +157,10 @@ impl Chunk {
return Arc::clone(snapshot);
}
// TODO: Incremental snapshot generation
let snapshot = Arc::new(ChunkSnapshot::new(self));
let snapshot = Arc::new(ChunkSnapshot::new(
self,
self.metrics.memory_bytes.clone_empty(),
));
*guard = Some(Arc::clone(&snapshot));
snapshot
}
@ -166,7 +168,10 @@ impl Chunk {
/// Returns a queryable snapshot of this chunk
#[cfg(feature = "nocache")]
pub fn snapshot(&self) -> Arc<ChunkSnapshot> {
Arc::new(ChunkSnapshot::new(self))
Arc::new(ChunkSnapshot::new(
self,
self.metrics.memory_bytes.clone_empty(),
))
}
/// Return the name of the table in this chunk
@ -201,6 +206,8 @@ impl Chunk {
/// Return the approximate memory size of the chunk, in bytes including the
/// dictionary, tables, and their rows.
///
/// Note: This does not include the size of any cached ChunkSnapshot
pub fn size(&self) -> usize {
self.table.size() + self.dictionary.size()
}

View File

@ -1,14 +1,23 @@
use std::collections::BTreeSet;
use std::sync::Arc;
use arrow::record_batch::RecordBatch;
use data_types::timestamp::TimestampRange;
use internal_types::schema::{Schema, TIME_COLUMN_NAME};
use internal_types::selection::Selection;
use arrow::{
array::DictionaryArray,
datatypes::{DataType, Int32Type},
record_batch::RecordBatch,
};
use snafu::{ensure, ResultExt, Snafu};
use data_types::partition_metadata::TableSummary;
use data_types::timestamp::TimestampRange;
use data_types::{
error::ErrorLogger,
partition_metadata::{ColumnSummary, Statistics},
};
use internal_types::schema::{Schema, TIME_COLUMN_NAME};
use internal_types::selection::Selection;
use super::Chunk;
use data_types::{error::ErrorLogger, partition_metadata::Statistics};
#[derive(Debug, Snafu)]
pub enum Error {
@ -29,12 +38,12 @@ pub struct ChunkSnapshot {
schema: Schema,
batch: RecordBatch,
table_name: Arc<str>,
timestamp_range: Option<TimestampRange>,
// TODO: Memory tracking
stats: Vec<ColumnSummary>,
memory: metrics::GaugeValue,
}
impl ChunkSnapshot {
pub fn new(chunk: &Chunk) -> Self {
pub(crate) fn new(chunk: &Chunk, memory: metrics::GaugeValue) -> Self {
let table = &chunk.table;
let schema = table
@ -47,29 +56,19 @@ impl ChunkSnapshot {
.log_if_error("ChunkSnapshot converting table to arrow")
.unwrap();
let timestamp_range =
chunk
.dictionary
.lookup_value(TIME_COLUMN_NAME)
.and_then(|column_id| {
table
.column(column_id)
.ok()
.and_then(|column| match column.stats() {
Statistics::I64(stats) => match (stats.min, stats.max) {
(Some(min), Some(max)) => Some(TimestampRange::new(min, max)),
_ => None,
},
_ => None,
})
});
// The returned record batch has its columns sorted by name so must also sort the stats
let mut stats = table.stats(&chunk.dictionary);
stats.sort_by(|a, b| a.name.cmp(&b.name));
Self {
let mut s = Self {
schema,
batch,
table_name: Arc::clone(&chunk.table_name),
timestamp_range,
}
stats,
memory,
};
s.memory.set(s.size());
s
}
/// returns true if there is no data in this snapshot
@ -158,12 +157,85 @@ impl ChunkSnapshot {
})
}
fn matches_predicate(&self, timestamp_range: &Option<TimestampRange>) -> bool {
match (self.timestamp_range, timestamp_range) {
(Some(a), Some(b)) => !a.disjoint(b),
(None, Some(_)) => false, /* If this chunk doesn't have a time column it can't match */
// the predicate
(_, None) => true,
/// Returns a vec of the summary statistics of the tables in this chunk
pub fn table_summary(&self) -> TableSummary {
TableSummary {
name: self.table_name.to_string(),
columns: self.stats.clone(),
}
}
/// Return the approximate memory size of the chunk, in bytes including the
/// dictionary, tables, statistics and their rows.
pub fn size(&self) -> usize {
let columns = self.column_sizes().map(|(_, size)| size).sum::<usize>();
let stats = self.stats.iter().map(|c| c.size()).sum::<usize>();
columns + stats + std::mem::size_of::<Self>()
}
/// Returns the number of bytes taken up by the shared dictionary
pub fn dictionary_size(&self) -> usize {
self.batch
.columns()
.iter()
.filter_map(|array| {
let dict = array
.as_any()
.downcast_ref::<DictionaryArray<Int32Type>>()?;
let values = dict.values();
Some(values.get_buffer_memory_size() + values.get_array_memory_size())
})
.next()
.unwrap_or(0)
}
/// Returns an iterator over (column_name, estimated_size) for all
/// columns in this chunk.
///
/// Dictionary-encoded columns do not include the size of the shared dictionary
/// in their reported total
///
/// This is instead returned as a special "__dictionary" column
pub fn column_sizes(&self) -> impl Iterator<Item = (&str, usize)> + '_ {
let dictionary_size = self.dictionary_size();
self.batch
.columns()
.iter()
.zip(self.stats.iter())
.map(move |(array, summary)| {
let size = match array.data_type() {
// Dictionary is only encoded once for all columns
DataType::Dictionary(_, _) => {
array.get_array_memory_size() + array.get_buffer_memory_size()
- dictionary_size
}
_ => array.get_array_memory_size() + array.get_buffer_memory_size(),
};
(summary.name.as_str(), size)
})
.chain(std::iter::once(("__dictionary", dictionary_size)))
}
/// Return the number of rows in this chunk
pub fn rows(&self) -> usize {
self.batch.num_rows()
}
fn matches_predicate(&self, timestamp_range: &Option<TimestampRange>) -> bool {
let timestamp_range = match timestamp_range {
Some(t) => t,
None => return true,
};
self.schema
.find_index_of(TIME_COLUMN_NAME)
.and_then(|idx| match &self.stats[idx].stats {
Statistics::I64(stats) => Some(
!TimestampRange::new(stats.min? as _, stats.max? as _)
.disjoint(timestamp_range),
),
_ => panic!("invalid statistics for time column"),
})
.unwrap_or(false) // If no time column or no time column values - cannot match
}
}

View File

@ -276,9 +276,7 @@ impl Column {
ColumnData::Bool(v, stats) => v.byte_len() + mem::size_of_val(&stats),
ColumnData::Tag(v, stats) => mem::size_of::<DID>() * v.len() + mem::size_of_val(&stats),
ColumnData::String(v, stats) => {
let string_bytes_size = v.iter().fold(0, |acc, val| acc + val.len());
let vec_pointer_sizes = mem::size_of::<String>() * v.len();
string_bytes_size + vec_pointer_sizes + mem::size_of_val(&stats)
v.size() + mem::size_of_val(&stats) + stats.string_size()
}
};
data_size + self.valid.byte_len()

View File

@ -189,8 +189,8 @@ impl Table {
Ok(())
}
/// Returns the column selection for all the columns in this table, orderd
/// by table name
/// Returns the column selection for all the columns in this table, ordered
/// by column name
fn all_columns_selection<'a>(
&self,
dictionary: &'a Dictionary,

View File

@ -588,7 +588,6 @@ impl Db {
info!(%partition_key, %table_name, %chunk_id, "chunk marked MOVING, loading tables into read buffer");
let mut batches = Vec::new();
let table_summary = mb_chunk.table_summary();
// create a new read buffer chunk with memory tracking
@ -602,17 +601,15 @@ impl Db {
// load table into the new chunk one by one.
debug!(%partition_key, %table_name, %chunk_id, table=%table_summary.name, "loading table to read buffer");
mb_chunk
.table_to_arrow(&mut batches, Selection::All)
let batch = mb_chunk
.read_filter(table_name, Selection::All)
// It is probably reasonable to recover from this error
// (reset the chunk state to Open) but until that is
// implemented (and tested) just panic
.expect("Loading chunk to mutable buffer");
for batch in batches.drain(..) {
let sorted = sort_record_batch(batch).expect("failed to sort");
rb_chunk.upsert_table(&table_summary.name, sorted)
}
let sorted = sort_record_batch(batch).expect("failed to sort");
rb_chunk.upsert_table(&table_summary.name, sorted);
// Relock the chunk again (nothing else should have been able
// to modify the chunk state while we were moving it
@ -1428,7 +1425,8 @@ mod tests {
.eq(1.0)
.unwrap();
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "mutable_buffer", 88).unwrap();
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "mutable_buffer", 1111)
.unwrap();
db.load_chunk_to_read_buffer("1970-01-01T00", "cpu", 0)
.await
@ -2338,7 +2336,7 @@ mod tests {
Arc::from("cpu"),
0,
ChunkStorage::ClosedMutableBuffer,
129,
2126,
1,
),
ChunkSummary::new_without_timestamps(
@ -2364,7 +2362,7 @@ mod tests {
.memory()
.mutable_buffer()
.get_total(),
100 + 129 + 131
100 + 2126 + 131
);
assert_eq!(
db.catalog

View File

@ -5,7 +5,7 @@ use data_types::{
chunk_metadata::{ChunkColumnSummary, ChunkStorage, ChunkSummary, DetailedChunkSummary},
partition_metadata::TableSummary,
};
use mutable_buffer::chunk::Chunk as MBChunk;
use mutable_buffer::chunk::{snapshot::ChunkSnapshot as MBChunkSnapshot, Chunk as MBChunk};
use parquet_file::chunk::Chunk as ParquetChunk;
use read_buffer::Chunk as ReadBufferChunk;
@ -48,7 +48,7 @@ pub enum ChunkState {
Open(MBChunk),
/// Chunk is closed for new writes
Closed(Arc<MBChunk>),
Closed(Arc<MBChunkSnapshot>),
/// Chunk has been completely loaded in the read buffer
Moved(Arc<ReadBufferChunk>),
@ -384,15 +384,15 @@ impl Chunk {
}
/// Set the chunk to the Closed state
pub fn set_closed(&mut self) -> Result<Arc<MBChunk>> {
pub fn set_closed(&mut self) -> Result<Arc<MBChunkSnapshot>> {
let mut s = ChunkState::Invalid;
std::mem::swap(&mut s, &mut self.state);
match s {
ChunkState::Open(s) => {
ChunkState::Open(chunk) => {
assert!(self.time_closed.is_none());
self.time_closed = Some(Utc::now());
let s = Arc::new(s);
let s = chunk.snapshot();
self.state = ChunkState::Closed(Arc::clone(&s));
self.metrics
.state
@ -400,7 +400,7 @@ impl Chunk {
self.metrics
.immutable_chunk_size
.observe_with_labels(s.size() as f64, &[KeyValue::new("state", "closed")]);
.observe_with_labels(chunk.size() as f64, &[KeyValue::new("state", "closed")]);
Ok(s)
}
@ -415,7 +415,7 @@ impl Chunk {
/// storage
///
/// If called on an open chunk will first close the chunk
pub fn set_moving(&mut self) -> Result<Arc<MBChunk>> {
pub fn set_moving(&mut self) -> Result<Arc<MBChunkSnapshot>> {
// This ensures the closing logic is consistent but doesn't break code that
// assumes a chunk can be moved from open
if matches!(self.state, ChunkState::Open(_)) {

View File

@ -100,7 +100,7 @@ impl DbChunk {
chunk: chunk.snapshot(),
},
ChunkState::Closed(chunk) => State::MutableBuffer {
chunk: chunk.snapshot(),
chunk: Arc::clone(chunk),
},
ChunkState::Moved(chunk) => State::ReadBuffer {
chunk: Arc::clone(chunk),