Merge branch 'main' into cn/exploration
commit
833debd5b5
|
@ -3807,6 +3807,7 @@ version = "0.1.0"
|
|||
dependencies = [
|
||||
"arrow_util",
|
||||
"criterion",
|
||||
"data_types",
|
||||
"datafusion 0.1.0",
|
||||
"entry",
|
||||
"flate2",
|
||||
|
|
|
@ -207,10 +207,10 @@ impl GaugeValue {
|
|||
|
||||
/// Sets the local value for this GaugeValue
|
||||
pub fn set(&mut self, new: usize) {
|
||||
if new > self.local {
|
||||
self.inc(new - self.local)
|
||||
} else {
|
||||
self.decr(self.local - new)
|
||||
match new.cmp(&self.local) {
|
||||
std::cmp::Ordering::Less => self.decr(self.local - new),
|
||||
std::cmp::Ordering::Equal => {}
|
||||
std::cmp::Ordering::Greater => self.inc(new - self.local),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -11,7 +11,6 @@ use internal_types::{
|
|||
schema::{builder::SchemaBuilder, InfluxColumnType, Schema},
|
||||
selection::Selection,
|
||||
};
|
||||
use metrics::GaugeValue;
|
||||
use parking_lot::Mutex;
|
||||
use snafu::{ensure, OptionExt, ResultExt, Snafu};
|
||||
use std::{collections::BTreeSet, sync::Arc};
|
||||
|
@ -48,9 +47,9 @@ pub enum Error {
|
|||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
|
||||
#[derive(Debug)]
|
||||
#[allow(missing_copy_implementations)]
|
||||
pub struct ChunkMetrics {
|
||||
/// keep track of memory used by chunk
|
||||
memory_bytes: GaugeValue,
|
||||
// Placeholder
|
||||
}
|
||||
|
||||
impl ChunkMetrics {
|
||||
|
@ -59,13 +58,11 @@ impl ChunkMetrics {
|
|||
/// will therefore not be visible to other ChunkMetrics instances or metric instruments
|
||||
/// created on a metrics domain, and vice versa
|
||||
pub fn new_unregistered() -> Self {
|
||||
Self {
|
||||
memory_bytes: GaugeValue::new_unregistered(),
|
||||
}
|
||||
Self {}
|
||||
}
|
||||
|
||||
pub fn new(_metrics: &metrics::Domain, memory_bytes: GaugeValue) -> Self {
|
||||
Self { memory_bytes }
|
||||
pub fn new(_metrics: &metrics::Domain) -> Self {
|
||||
Self {}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -122,8 +119,6 @@ impl MBChunk {
|
|||
let columns = batch.columns();
|
||||
chunk.write_columns(sequence, columns)?;
|
||||
|
||||
chunk.metrics.memory_bytes.set(chunk.size());
|
||||
|
||||
Ok(chunk)
|
||||
}
|
||||
|
||||
|
@ -151,7 +146,6 @@ impl MBChunk {
|
|||
.try_lock()
|
||||
.expect("concurrent readers/writers to MBChunk") = None;
|
||||
|
||||
self.metrics.memory_bytes.set(self.size());
|
||||
self.time_of_last_write = Utc::now();
|
||||
|
||||
Ok(())
|
||||
|
@ -165,10 +159,7 @@ impl MBChunk {
|
|||
return Arc::clone(snapshot);
|
||||
}
|
||||
|
||||
let snapshot = Arc::new(ChunkSnapshot::new(
|
||||
self,
|
||||
self.metrics.memory_bytes.clone_empty(),
|
||||
));
|
||||
let snapshot = Arc::new(ChunkSnapshot::new(self));
|
||||
*guard = Some(Arc::clone(&snapshot));
|
||||
snapshot
|
||||
}
|
||||
|
|
|
@ -35,11 +35,10 @@ pub struct ChunkSnapshot {
|
|||
batch: RecordBatch,
|
||||
table_name: Arc<str>,
|
||||
stats: Vec<ColumnSummary>,
|
||||
memory: metrics::GaugeValue,
|
||||
}
|
||||
|
||||
impl ChunkSnapshot {
|
||||
pub(crate) fn new(chunk: &MBChunk, memory: metrics::GaugeValue) -> Self {
|
||||
pub(crate) fn new(chunk: &MBChunk) -> Self {
|
||||
let schema = chunk
|
||||
.schema(Selection::All)
|
||||
.log_if_error("ChunkSnapshot getting table schema")
|
||||
|
@ -52,15 +51,12 @@ impl ChunkSnapshot {
|
|||
|
||||
let summary = chunk.table_summary();
|
||||
|
||||
let mut s = Self {
|
||||
Self {
|
||||
schema: Arc::new(schema),
|
||||
batch,
|
||||
table_name: Arc::clone(&chunk.table_name),
|
||||
stats: summary.columns,
|
||||
memory,
|
||||
};
|
||||
s.memory.set(s.size());
|
||||
s
|
||||
}
|
||||
}
|
||||
|
||||
/// Return Schema for all columns in this snapshot
|
||||
|
|
|
@ -14,7 +14,6 @@ use internal_types::{
|
|||
use object_store::{path::Path, ObjectStore};
|
||||
use query::predicate::Predicate;
|
||||
|
||||
use metrics::GaugeValue;
|
||||
use std::mem;
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
|
@ -61,9 +60,9 @@ pub enum Error {
|
|||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
|
||||
#[derive(Debug)]
|
||||
#[allow(missing_copy_implementations)]
|
||||
pub struct ChunkMetrics {
|
||||
/// keep track of memory used by chunk
|
||||
memory_bytes: GaugeValue,
|
||||
// Placeholder
|
||||
}
|
||||
|
||||
impl ChunkMetrics {
|
||||
|
@ -72,13 +71,11 @@ impl ChunkMetrics {
|
|||
/// will therefore not be visible to other ChunkMetrics instances or metric instruments
|
||||
/// created on a metrics domain, and vice versa
|
||||
pub fn new_unregistered() -> Self {
|
||||
Self {
|
||||
memory_bytes: GaugeValue::new_unregistered(),
|
||||
}
|
||||
Self {}
|
||||
}
|
||||
|
||||
pub fn new(_metrics: &metrics::Domain, memory_bytes: GaugeValue) -> Self {
|
||||
Self { memory_bytes }
|
||||
pub fn new(_metrics: &metrics::Domain) -> Self {
|
||||
Self {}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -156,7 +153,7 @@ impl ParquetChunk {
|
|||
) -> Self {
|
||||
let timestamp_range = extract_range(&table_summary);
|
||||
|
||||
let mut chunk = Self {
|
||||
Self {
|
||||
partition_key,
|
||||
table_summary,
|
||||
schema,
|
||||
|
@ -165,10 +162,7 @@ impl ParquetChunk {
|
|||
object_store_path: file_location,
|
||||
parquet_metadata,
|
||||
metrics,
|
||||
};
|
||||
|
||||
chunk.metrics.memory_bytes.set(chunk.size());
|
||||
chunk
|
||||
}
|
||||
}
|
||||
|
||||
/// Return the chunk's partition key
|
||||
|
|
|
@ -197,6 +197,11 @@ impl PersistenceWindows {
|
|||
// Verify no active flush handles
|
||||
self.persistable.get_mut()?;
|
||||
|
||||
// Close current open window if any
|
||||
if let Some(open) = self.open.take() {
|
||||
self.closed.push_back(open)
|
||||
}
|
||||
|
||||
// Rotate into persistable window
|
||||
self.rotate(now);
|
||||
|
||||
|
@ -229,18 +234,19 @@ impl PersistenceWindows {
|
|||
persistable.max_time, timestamp,
|
||||
"persistable max time doesn't match handle"
|
||||
);
|
||||
|
||||
// Everything up to and including persistable max time will have been persisted
|
||||
let new_min = Utc.timestamp_nanos(persistable.max_time.timestamp_nanos() + 1);
|
||||
for w in self.closed.iter_mut().take(closed_count) {
|
||||
if w.min_time < new_min {
|
||||
w.min_time = new_min;
|
||||
if w.max_time < new_min {
|
||||
w.max_time = new_min;
|
||||
w.row_count = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Drop any now empty windows
|
||||
self.closed.retain(|x| x.row_count > 0);
|
||||
}
|
||||
|
||||
/// Returns an iterator over the windows starting with the oldest
|
||||
|
@ -826,13 +832,17 @@ mod tests {
|
|||
|
||||
let flush = w.flush_handle(end_at).unwrap();
|
||||
assert_eq!(flush.timestamp(), first_end);
|
||||
assert!(w.open.is_none());
|
||||
let flushed_time = flush.timestamp() + chrono::Duration::nanoseconds(1);
|
||||
|
||||
w.flush(flush);
|
||||
assert!(w.persistable.is_none());
|
||||
|
||||
let mins = w.closed[0].sequencer_numbers.clone();
|
||||
assert_eq!(mins, w.minimum_unpersisted_sequence().unwrap());
|
||||
|
||||
assert_eq!(w.closed.len(), 2);
|
||||
|
||||
// the closed window should have a min time equal to the flush
|
||||
let c = &w.closed[0];
|
||||
assert_eq!(c.row_count, 3);
|
||||
|
@ -840,10 +850,11 @@ mod tests {
|
|||
assert_eq!(c.max_time, second_end);
|
||||
assert_eq!(c.created_at, second_created_at);
|
||||
|
||||
// the open window should not have been modified by the flush
|
||||
let c = w.open.as_ref().unwrap();
|
||||
// the open window should have been closed as part of creating the flush
|
||||
// handle and then truncated by the flush timestamp
|
||||
let c = &w.closed[1];
|
||||
assert_eq!(c.row_count, 2);
|
||||
assert_eq!(c.min_time, third_start);
|
||||
assert_eq!(c.min_time, flushed_time);
|
||||
assert_eq!(c.max_time, third_end);
|
||||
assert_eq!(c.created_at, third_created_at);
|
||||
}
|
||||
|
@ -907,15 +918,16 @@ mod tests {
|
|||
// after flush we should see no more persistable window and the closed windows
|
||||
// should have min timestamps equal to the previous flush end.
|
||||
let flush = w.flush_handle(end_at).unwrap();
|
||||
|
||||
assert_eq!(flush.timestamp(), first_end);
|
||||
assert!(w.open.is_none());
|
||||
let flushed_time = flush.timestamp() + chrono::Duration::nanoseconds(1);
|
||||
|
||||
w.flush(flush);
|
||||
assert!(w.persistable.is_none());
|
||||
let mins = w.closed[0].sequencer_numbers.clone();
|
||||
assert_eq!(mins, w.minimum_unpersisted_sequence().unwrap());
|
||||
|
||||
assert_eq!(w.closed.len(), 2);
|
||||
|
||||
// the closed window should have a min time equal to the flush
|
||||
let c = &w.closed[0];
|
||||
assert_eq!(c.row_count, 3);
|
||||
|
@ -923,10 +935,11 @@ mod tests {
|
|||
assert_eq!(c.max_time, second_end);
|
||||
assert_eq!(c.created_at, second_created_at);
|
||||
|
||||
// the open window should not have been modified by the flush
|
||||
let c = w.open.as_ref().unwrap();
|
||||
// the open window should have been closed as part of creating the flush
|
||||
// handle and then truncated by the flush timestamp
|
||||
let c = &w.closed[1];
|
||||
assert_eq!(c.row_count, 2);
|
||||
assert_eq!(c.min_time, third_start);
|
||||
assert_eq!(c.min_time, flushed_time);
|
||||
assert_eq!(c.max_time, third_end);
|
||||
assert_eq!(c.created_at, third_created_at);
|
||||
}
|
||||
|
@ -1068,7 +1081,8 @@ mod tests {
|
|||
|
||||
let flush_t = flush.timestamp();
|
||||
|
||||
assert_eq!(flush.closed_count, 2);
|
||||
assert!(w.open.is_none());
|
||||
assert_eq!(flush.closed_count, 3);
|
||||
assert_eq!(flush_t, start + chrono::Duration::seconds(2));
|
||||
let truncated_time = flush_t + chrono::Duration::nanoseconds(1);
|
||||
|
||||
|
@ -1090,25 +1104,31 @@ mod tests {
|
|||
|
||||
w.flush(flush);
|
||||
|
||||
assert!(w.persistable.is_none());
|
||||
assert_eq!(w.closed.len(), 4);
|
||||
// Windows from writes at
|
||||
//
|
||||
// - `instant + DEFAULT_CLOSED_WINDOW_PERIOD * 2`
|
||||
// - `instant + DEFAULT_CLOSED_WINDOW_PERIOD * 3`
|
||||
//
|
||||
// have been completely persisted by the flush
|
||||
|
||||
assert!(w.persistable.is_none());
|
||||
assert_eq!(w.closed.len(), 2);
|
||||
|
||||
assert_eq!(
|
||||
w.closed[0].created_at,
|
||||
instant + DEFAULT_CLOSED_WINDOW_PERIOD
|
||||
);
|
||||
assert_eq!(w.closed[0].min_time, truncated_time);
|
||||
assert_eq!(w.closed[0].max_time, start + chrono::Duration::seconds(4));
|
||||
assert_eq!(w.closed[0].row_count, 5);
|
||||
|
||||
assert_eq!(w.closed[1].min_time, truncated_time);
|
||||
assert_eq!(w.closed[1].max_time, truncated_time);
|
||||
assert_eq!(w.closed[1].row_count, 0); // Entirely flushed window
|
||||
|
||||
// Window closed after flush handle - should be left alone
|
||||
assert_eq!(w.closed[2].min_time, start);
|
||||
assert_eq!(w.closed[2].max_time, start + chrono::Duration::seconds(2));
|
||||
assert_eq!(w.closed[2].row_count, 17); // Entirely flushed window
|
||||
|
||||
// Window created after flush handle - should be left alone
|
||||
assert_eq!(w.closed[3].min_time, start);
|
||||
assert_eq!(w.closed[3].max_time, start + chrono::Duration::seconds(2));
|
||||
assert_eq!(w.closed[3].row_count, 11);
|
||||
assert_eq!(
|
||||
w.closed[1].created_at,
|
||||
instant + DEFAULT_CLOSED_WINDOW_PERIOD * 4
|
||||
);
|
||||
assert_eq!(w.closed[1].min_time, start);
|
||||
assert_eq!(w.closed[1].max_time, start + chrono::Duration::seconds(2));
|
||||
assert_eq!(w.closed[1].row_count, 11);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -510,7 +510,9 @@ impl<C: QueryChunk + 'static> Deduplicater<C> {
|
|||
let plan = Arc::new(SortPreservingMergeExec::new(
|
||||
sort_exprs.clone(),
|
||||
Arc::new(plan),
|
||||
1024,
|
||||
// TODO(edd): temp experiment - should wire in `_batch_size` in the
|
||||
// table provider.
|
||||
1024 * 25,
|
||||
));
|
||||
|
||||
// Add DeduplicateExc
|
||||
|
|
|
@ -3,7 +3,7 @@ use std::{
|
|||
convert::TryFrom,
|
||||
};
|
||||
|
||||
use metrics::{Gauge, GaugeValue, KeyValue};
|
||||
use metrics::{Gauge, KeyValue};
|
||||
use snafu::{ResultExt, Snafu};
|
||||
|
||||
use arrow::record_batch::RecordBatch;
|
||||
|
@ -100,10 +100,6 @@ impl Chunk {
|
|||
|
||||
self.table.add_row_group(row_group);
|
||||
|
||||
// Get and set new size of chunk on memory tracker
|
||||
let size = Self::base_size() + self.table.size();
|
||||
self.metrics.memory_bytes.set(size);
|
||||
|
||||
// update column metrics associated with column storage
|
||||
self.metrics
|
||||
.update_column_storage_statistics(&storage_statistics);
|
||||
|
@ -325,9 +321,6 @@ impl std::fmt::Debug for Chunk {
|
|||
|
||||
#[derive(Debug)]
|
||||
pub struct ChunkMetrics {
|
||||
/// keep track of memory used by table data in chunk
|
||||
memory_bytes: GaugeValue,
|
||||
|
||||
// This metric tracks the total number of columns in read buffer.
|
||||
columns_total: Gauge,
|
||||
|
||||
|
@ -345,9 +338,8 @@ pub struct ChunkMetrics {
|
|||
}
|
||||
|
||||
impl ChunkMetrics {
|
||||
pub fn new(domain: &metrics::Domain, memory_bytes: GaugeValue) -> Self {
|
||||
pub fn new(domain: &metrics::Domain) -> Self {
|
||||
Self {
|
||||
memory_bytes,
|
||||
columns_total: domain.register_gauge_metric(
|
||||
"column",
|
||||
Some("total"),
|
||||
|
@ -377,7 +369,6 @@ impl ChunkMetrics {
|
|||
/// created on a metrics domain, and vice versa
|
||||
pub fn new_unregistered() -> Self {
|
||||
Self {
|
||||
memory_bytes: GaugeValue::new_unregistered(),
|
||||
columns_total: Gauge::new_unregistered(),
|
||||
column_values_total: Gauge::new_unregistered(),
|
||||
column_bytes_total: Gauge::new_unregistered(),
|
||||
|
@ -633,10 +624,7 @@ mod test {
|
|||
let domain =
|
||||
registry.register_domain_with_labels("read_buffer", vec![KeyValue::new("db", "mydb")]);
|
||||
|
||||
let mut chunk = Chunk::new(
|
||||
"a_table",
|
||||
ChunkMetrics::new(&domain, GaugeValue::new_unregistered()),
|
||||
);
|
||||
let mut chunk = Chunk::new("a_table", ChunkMetrics::new(&domain));
|
||||
|
||||
// Add a new table to the chunk.
|
||||
chunk.upsert_table(gen_recordbatch());
|
||||
|
|
|
@ -775,10 +775,7 @@ impl Db {
|
|||
self.metric_labels.clone(),
|
||||
);
|
||||
let chunk_result = MBChunk::new(
|
||||
MutableBufferChunkMetrics::new(
|
||||
&metrics,
|
||||
self.catalog.metrics().memory().mutable_buffer(),
|
||||
),
|
||||
MutableBufferChunkMetrics::new(&metrics),
|
||||
sequence,
|
||||
table_batch,
|
||||
)
|
||||
|
@ -960,6 +957,7 @@ mod tests {
|
|||
use arrow::record_batch::RecordBatch;
|
||||
use arrow_util::{assert_batches_eq, assert_batches_sorted_eq};
|
||||
use bytes::Bytes;
|
||||
use data_types::database_rules::LifecycleRules;
|
||||
use data_types::{
|
||||
chunk_metadata::ChunkStorage,
|
||||
database_rules::{PartitionTemplate, TemplatePart},
|
||||
|
@ -1345,7 +1343,7 @@ mod tests {
|
|||
.eq(1.0)
|
||||
.unwrap();
|
||||
|
||||
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "mutable_buffer", 1255)
|
||||
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "mutable_buffer", 1239)
|
||||
.unwrap();
|
||||
|
||||
db.move_chunk_to_read_buffer("cpu", "1970-01-01T00", 0)
|
||||
|
@ -1367,7 +1365,7 @@ mod tests {
|
|||
|
||||
// verify chunk size updated (chunk moved from closing to moving to moved)
|
||||
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "mutable_buffer", 0).unwrap();
|
||||
let expected_read_buffer_size = 1484;
|
||||
let expected_read_buffer_size = 1468;
|
||||
catalog_chunk_size_bytes_metric_eq(
|
||||
&test_db.metric_registry,
|
||||
"read_buffer",
|
||||
|
@ -1392,7 +1390,7 @@ mod tests {
|
|||
.eq(1.0)
|
||||
.unwrap();
|
||||
|
||||
let expected_parquet_size = 655;
|
||||
let expected_parquet_size = 639;
|
||||
catalog_chunk_size_bytes_metric_eq(
|
||||
&test_db.metric_registry,
|
||||
"read_buffer",
|
||||
|
@ -1566,7 +1564,7 @@ mod tests {
|
|||
.unwrap();
|
||||
|
||||
// verify chunk size updated (chunk moved from moved to writing to written)
|
||||
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "read_buffer", 1486).unwrap();
|
||||
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "read_buffer", 1470).unwrap();
|
||||
|
||||
// drop, the chunk from the read buffer
|
||||
db.drop_chunk("cpu", partition_key, mb_chunk.id()).unwrap();
|
||||
|
@ -1575,8 +1573,8 @@ mod tests {
|
|||
vec![] as Vec<u32>
|
||||
);
|
||||
|
||||
// verify size is reported until chunk dropped
|
||||
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "read_buffer", 1486).unwrap();
|
||||
// verify size is not accounted even though a reference to the RubChunk still exists
|
||||
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "read_buffer", 0).unwrap();
|
||||
std::mem::drop(rb_chunk);
|
||||
|
||||
// verify chunk size updated (chunk dropped from moved state)
|
||||
|
@ -1694,7 +1692,7 @@ mod tests {
|
|||
("svr_id", "1"),
|
||||
])
|
||||
.histogram()
|
||||
.sample_sum_eq(3042.0)
|
||||
.sample_sum_eq(3026.0)
|
||||
.unwrap();
|
||||
|
||||
let rb = collect_read_filter(&rb_chunk).await;
|
||||
|
@ -1796,7 +1794,7 @@ mod tests {
|
|||
("svr_id", "10"),
|
||||
])
|
||||
.histogram()
|
||||
.sample_sum_eq(2141.0)
|
||||
.sample_sum_eq(2109.0)
|
||||
.unwrap();
|
||||
|
||||
// it should be the same chunk!
|
||||
|
@ -1904,7 +1902,7 @@ mod tests {
|
|||
("svr_id", "10"),
|
||||
])
|
||||
.histogram()
|
||||
.sample_sum_eq(2141.0)
|
||||
.sample_sum_eq(2109.0)
|
||||
.unwrap();
|
||||
|
||||
// Unload RB chunk but keep it in OS
|
||||
|
@ -1931,7 +1929,7 @@ mod tests {
|
|||
("svr_id", "10"),
|
||||
])
|
||||
.histogram()
|
||||
.sample_sum_eq(655.0)
|
||||
.sample_sum_eq(639.0)
|
||||
.unwrap();
|
||||
|
||||
// Verify data written to the parquet file in object store
|
||||
|
@ -2275,10 +2273,7 @@ mod tests {
|
|||
.map(|x| x.estimated_bytes)
|
||||
.sum();
|
||||
|
||||
assert_eq!(
|
||||
db.catalog.metrics().memory().mutable_buffer().get_total(),
|
||||
size
|
||||
);
|
||||
assert_eq!(db.catalog.metrics().memory().mutable_buffer(), size);
|
||||
|
||||
assert_eq!(
|
||||
expected, chunk_summaries,
|
||||
|
@ -2376,7 +2371,7 @@ mod tests {
|
|||
0,
|
||||
ChunkStorage::ReadBufferAndObjectStore,
|
||||
lifecycle_action,
|
||||
2139, // size of RB and OS chunks
|
||||
2107, // size of RB and OS chunks
|
||||
1,
|
||||
),
|
||||
ChunkSummary::new_without_timestamps(
|
||||
|
@ -2394,7 +2389,7 @@ mod tests {
|
|||
0,
|
||||
ChunkStorage::ClosedMutableBuffer,
|
||||
lifecycle_action,
|
||||
2414,
|
||||
2398,
|
||||
1,
|
||||
),
|
||||
ChunkSummary::new_without_timestamps(
|
||||
|
@ -2415,14 +2410,11 @@ mod tests {
|
|||
);
|
||||
|
||||
assert_eq!(
|
||||
db.catalog.metrics().memory().mutable_buffer().get_total(),
|
||||
64 + 2414 + 87
|
||||
db.catalog.metrics().memory().mutable_buffer(),
|
||||
64 + 2398 + 87
|
||||
);
|
||||
assert_eq!(
|
||||
db.catalog.metrics().memory().read_buffer().get_total(),
|
||||
1484
|
||||
);
|
||||
assert_eq!(db.catalog.metrics().memory().parquet().get_total(), 655);
|
||||
assert_eq!(db.catalog.metrics().memory().read_buffer(), 1468);
|
||||
assert_eq!(db.catalog.metrics().memory().parquet(), 639);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
@ -3012,7 +3004,10 @@ mod tests {
|
|||
.object_store(Arc::clone(&object_store))
|
||||
.server_id(server_id)
|
||||
.db_name(db_name)
|
||||
.catalog_transactions_until_checkpoint(NonZeroU64::try_from(2).unwrap())
|
||||
.lifecycle_rules(LifecycleRules {
|
||||
catalog_transactions_until_checkpoint: NonZeroU64::try_from(2).unwrap(),
|
||||
..Default::default()
|
||||
})
|
||||
.build()
|
||||
.await;
|
||||
let db = Arc::new(test_db.db);
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
use crate::db::catalog::metrics::MemoryMetrics;
|
||||
use chrono::{DateTime, Utc};
|
||||
use data_types::{
|
||||
chunk_metadata::{
|
||||
|
@ -220,6 +221,7 @@ macro_rules! unexpected_state {
|
|||
pub struct ChunkMetrics {
|
||||
pub(super) state: Counter,
|
||||
pub(super) immutable_chunk_size: Histogram,
|
||||
pub(super) memory_metrics: MemoryMetrics,
|
||||
}
|
||||
|
||||
impl ChunkMetrics {
|
||||
|
@ -231,6 +233,7 @@ impl ChunkMetrics {
|
|||
Self {
|
||||
state: Counter::new_unregistered(),
|
||||
immutable_chunk_size: Histogram::new_unregistered(),
|
||||
memory_metrics: MemoryMetrics::new_unregistered(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -255,7 +258,7 @@ impl CatalogChunk {
|
|||
.state
|
||||
.inc_with_labels(&[KeyValue::new("state", "open")]);
|
||||
|
||||
Self {
|
||||
let mut chunk = Self {
|
||||
addr,
|
||||
stage,
|
||||
lifecycle_action: None,
|
||||
|
@ -263,7 +266,9 @@ impl CatalogChunk {
|
|||
time_of_first_write: Some(first_write),
|
||||
time_of_last_write: Some(last_write),
|
||||
time_closed: None,
|
||||
}
|
||||
};
|
||||
chunk.update_memory_metrics();
|
||||
chunk
|
||||
}
|
||||
|
||||
/// Creates a new RUB chunk from the provided RUB chunk and metadata
|
||||
|
@ -298,7 +303,7 @@ impl CatalogChunk {
|
|||
time_of_last_write: None,
|
||||
time_closed: None,
|
||||
};
|
||||
chunk.record_write(); // The creation is considered the first and only "write"
|
||||
chunk.update_memory_metrics();
|
||||
chunk
|
||||
}
|
||||
|
||||
|
@ -323,7 +328,7 @@ impl CatalogChunk {
|
|||
meta,
|
||||
};
|
||||
|
||||
Self {
|
||||
let mut chunk = Self {
|
||||
addr,
|
||||
stage,
|
||||
lifecycle_action: None,
|
||||
|
@ -331,7 +336,9 @@ impl CatalogChunk {
|
|||
time_of_first_write: None,
|
||||
time_of_last_write: None,
|
||||
time_closed: None,
|
||||
}
|
||||
};
|
||||
chunk.update_memory_metrics();
|
||||
chunk
|
||||
}
|
||||
|
||||
pub fn addr(&self) -> &ChunkAddr {
|
||||
|
@ -376,13 +383,54 @@ impl CatalogChunk {
|
|||
self.time_closed
|
||||
}
|
||||
|
||||
/// Update the write timestamps for this chunk
|
||||
/// Updates `self.memory_metrics` to match the contents of `self.stage`
|
||||
fn update_memory_metrics(&mut self) {
|
||||
match &self.stage {
|
||||
ChunkStage::Open { mb_chunk } => {
|
||||
self.metrics
|
||||
.memory_metrics
|
||||
.mutable_buffer
|
||||
.set(mb_chunk.size());
|
||||
self.metrics.memory_metrics.read_buffer.set(0);
|
||||
self.metrics.memory_metrics.parquet.set(0);
|
||||
}
|
||||
ChunkStage::Frozen { representation, .. } => match representation {
|
||||
ChunkStageFrozenRepr::MutableBufferSnapshot(snapshot) => {
|
||||
self.metrics
|
||||
.memory_metrics
|
||||
.mutable_buffer
|
||||
.set(snapshot.size());
|
||||
self.metrics.memory_metrics.read_buffer.set(0);
|
||||
self.metrics.memory_metrics.parquet.set(0);
|
||||
}
|
||||
ChunkStageFrozenRepr::ReadBuffer(rb_chunk) => {
|
||||
self.metrics.memory_metrics.mutable_buffer.set(0);
|
||||
self.metrics.memory_metrics.read_buffer.set(rb_chunk.size());
|
||||
self.metrics.memory_metrics.parquet.set(0);
|
||||
}
|
||||
},
|
||||
ChunkStage::Persisted {
|
||||
parquet,
|
||||
read_buffer,
|
||||
..
|
||||
} => {
|
||||
let rub_size = read_buffer.as_ref().map(|x| x.size()).unwrap_or(0);
|
||||
|
||||
self.metrics.memory_metrics.mutable_buffer.set(0);
|
||||
self.metrics.memory_metrics.read_buffer.set(rub_size);
|
||||
self.metrics.memory_metrics.parquet.set(parquet.size());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Update the metrics for this chunk
|
||||
pub fn record_write(&mut self) {
|
||||
let now = Utc::now();
|
||||
if self.time_of_first_write.is_none() {
|
||||
self.time_of_first_write = Some(now);
|
||||
}
|
||||
self.time_of_last_write = Some(now);
|
||||
self.update_memory_metrics();
|
||||
}
|
||||
|
||||
/// Returns the storage and the number of rows
|
||||
|
@ -539,6 +587,8 @@ impl CatalogChunk {
|
|||
representation: ChunkStageFrozenRepr::MutableBufferSnapshot(Arc::clone(&s)),
|
||||
meta: Arc::new(metadata),
|
||||
};
|
||||
self.update_memory_metrics();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
&ChunkStage::Frozen { .. } => {
|
||||
|
@ -632,6 +682,7 @@ impl CatalogChunk {
|
|||
&[KeyValue::new("state", "moved")],
|
||||
);
|
||||
*representation = ChunkStageFrozenRepr::ReadBuffer(chunk);
|
||||
self.update_memory_metrics();
|
||||
self.finish_lifecycle_action(ChunkLifecycleAction::Moving)?;
|
||||
Ok(())
|
||||
}
|
||||
|
@ -710,6 +761,7 @@ impl CatalogChunk {
|
|||
parquet: chunk,
|
||||
read_buffer: Some(db),
|
||||
};
|
||||
self.update_memory_metrics();
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
@ -742,6 +794,8 @@ impl CatalogChunk {
|
|||
&[KeyValue::new("state", "os")],
|
||||
);
|
||||
|
||||
self.update_memory_metrics();
|
||||
|
||||
Ok(rub_chunk)
|
||||
} else {
|
||||
// TODO: do we really need to error here or should unloading an unloaded chunk
|
||||
|
|
|
@ -8,7 +8,7 @@ pub struct CatalogMetrics {
|
|||
/// Metrics domain
|
||||
metrics_domain: Arc<metrics::Domain>,
|
||||
|
||||
/// Memory registries
|
||||
/// Catalog memory metrics
|
||||
memory_metrics: MemoryMetrics,
|
||||
}
|
||||
|
||||
|
@ -58,6 +58,7 @@ impl CatalogMetrics {
|
|||
|
||||
TableMetrics {
|
||||
metrics_domain: Arc::clone(&self.metrics_domain),
|
||||
memory_metrics: self.memory_metrics.clone_empty(),
|
||||
table_lock_tracker,
|
||||
partition_lock_tracker,
|
||||
chunk_lock_tracker,
|
||||
|
@ -70,6 +71,9 @@ pub struct TableMetrics {
|
|||
/// Metrics domain
|
||||
metrics_domain: Arc<metrics::Domain>,
|
||||
|
||||
/// Catalog memory metrics
|
||||
memory_metrics: MemoryMetrics,
|
||||
|
||||
/// Lock tracker for table-level locks
|
||||
table_lock_tracker: LockTracker,
|
||||
|
||||
|
@ -92,6 +96,7 @@ impl TableMetrics {
|
|||
pub(super) fn new_partition_metrics(&self) -> PartitionMetrics {
|
||||
// Lock tracker for chunk-level locks
|
||||
PartitionMetrics {
|
||||
memory_metrics: self.memory_metrics.clone_empty(),
|
||||
chunk_state: self.metrics_domain.register_counter_metric_with_labels(
|
||||
"chunks",
|
||||
None,
|
||||
|
@ -114,6 +119,9 @@ impl TableMetrics {
|
|||
|
||||
#[derive(Debug)]
|
||||
pub struct PartitionMetrics {
|
||||
/// Catalog memory metrics
|
||||
memory_metrics: MemoryMetrics,
|
||||
|
||||
chunk_state: Counter,
|
||||
|
||||
immutable_chunk_size: Histogram,
|
||||
|
@ -131,19 +139,28 @@ impl PartitionMetrics {
|
|||
ChunkMetrics {
|
||||
state: self.chunk_state.clone(),
|
||||
immutable_chunk_size: self.immutable_chunk_size.clone(),
|
||||
memory_metrics: self.memory_metrics.clone_empty(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct MemoryMetrics {
|
||||
mutable_buffer: GaugeValue,
|
||||
read_buffer: GaugeValue,
|
||||
parquet: GaugeValue,
|
||||
pub(super) mutable_buffer: GaugeValue,
|
||||
pub(super) read_buffer: GaugeValue,
|
||||
pub(super) parquet: GaugeValue,
|
||||
}
|
||||
|
||||
impl MemoryMetrics {
|
||||
fn new(metrics_domain: &metrics::Domain) -> Self {
|
||||
pub fn new_unregistered() -> Self {
|
||||
Self {
|
||||
mutable_buffer: GaugeValue::new_unregistered(),
|
||||
read_buffer: GaugeValue::new_unregistered(),
|
||||
parquet: GaugeValue::new_unregistered(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new(metrics_domain: &metrics::Domain) -> Self {
|
||||
let gauge = metrics_domain.register_gauge_metric(
|
||||
"chunks_mem_usage",
|
||||
Some("bytes"),
|
||||
|
@ -157,19 +174,26 @@ impl MemoryMetrics {
|
|||
}
|
||||
}
|
||||
|
||||
fn clone_empty(&self) -> Self {
|
||||
Self {
|
||||
mutable_buffer: self.mutable_buffer.clone_empty(),
|
||||
read_buffer: self.read_buffer.clone_empty(),
|
||||
parquet: self.parquet.clone_empty(),
|
||||
}
|
||||
}
|
||||
/// Returns the size of the mutable buffer
|
||||
pub fn mutable_buffer(&self) -> GaugeValue {
|
||||
self.mutable_buffer.clone_empty()
|
||||
pub fn mutable_buffer(&self) -> usize {
|
||||
self.mutable_buffer.get_total()
|
||||
}
|
||||
|
||||
/// Returns the size of the mutable buffer
|
||||
pub fn read_buffer(&self) -> GaugeValue {
|
||||
self.read_buffer.clone_empty()
|
||||
pub fn read_buffer(&self) -> usize {
|
||||
self.read_buffer.get_total()
|
||||
}
|
||||
|
||||
/// Returns the amount of data in parquet
|
||||
pub fn parquet(&self) -> GaugeValue {
|
||||
self.parquet.clone_empty()
|
||||
pub fn parquet(&self) -> usize {
|
||||
self.parquet.get_total()
|
||||
}
|
||||
|
||||
/// Total bytes over all registries.
|
||||
|
|
|
@ -148,11 +148,8 @@ impl Partition {
|
|||
chunk_id,
|
||||
};
|
||||
|
||||
let chunk = Arc::new(self.metrics.new_chunk_lock(CatalogChunk::new_open(
|
||||
addr,
|
||||
chunk,
|
||||
self.metrics.new_chunk_metrics(),
|
||||
)));
|
||||
let chunk = CatalogChunk::new_open(addr, chunk, self.metrics.new_chunk_metrics());
|
||||
let chunk = Arc::new(self.metrics.new_chunk_lock(chunk));
|
||||
|
||||
if self.chunks.insert(chunk_id, Arc::clone(&chunk)).is_some() {
|
||||
// A fundamental invariant has been violated - abort
|
||||
|
|
|
@ -371,10 +371,7 @@ fn new_rub_chunk(db: &Db, table_name: &str) -> read_buffer::RBChunk {
|
|||
.metrics_registry
|
||||
.register_domain_with_labels("read_buffer", db.metric_labels.clone());
|
||||
|
||||
read_buffer::RBChunk::new(
|
||||
table_name,
|
||||
read_buffer::ChunkMetrics::new(&metrics, db.catalog.metrics().memory().read_buffer()),
|
||||
)
|
||||
read_buffer::RBChunk::new(table_name, read_buffer::ChunkMetrics::new(&metrics))
|
||||
}
|
||||
|
||||
/// Executes a plan and collects the results into a read buffer chunk
|
||||
|
|
|
@ -66,10 +66,7 @@ pub(crate) fn compact_chunks(
|
|||
.metrics_registry
|
||||
.register_domain_with_labels("read_buffer", db.metric_labels.clone());
|
||||
|
||||
let mut rb_chunk = RBChunk::new(
|
||||
&table_name,
|
||||
ChunkMetrics::new(&metrics, db.catalog.metrics().memory().read_buffer()),
|
||||
);
|
||||
let mut rb_chunk = RBChunk::new(&table_name, ChunkMetrics::new(&metrics));
|
||||
|
||||
let ctx = db.exec.new_context(ExecutorType::Reorg);
|
||||
|
||||
|
|
|
@ -6,6 +6,7 @@ use std::sync::Arc;
|
|||
use data_types::job::Job;
|
||||
use lifecycle::{LifecycleWriteGuard, LockableChunk};
|
||||
use observability_deps::tracing::info;
|
||||
use persistence_windows::persistence_windows::FlushHandle;
|
||||
use query::exec::ExecutorType;
|
||||
use query::frontend::reorg::ReorgPlanner;
|
||||
use query::QueryChunkMeta;
|
||||
|
@ -19,7 +20,6 @@ use crate::db::lifecycle::{
|
|||
use crate::db::DbChunk;
|
||||
|
||||
use super::{LockableCatalogChunk, LockableCatalogPartition, Result};
|
||||
use persistence_windows::persistence_windows::FlushHandle;
|
||||
|
||||
/// Split and then persist the provided chunks
|
||||
///
|
||||
|
@ -156,3 +156,75 @@ pub(super) fn persist_chunks(
|
|||
|
||||
Ok((tracker, fut.track(registration)))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::num::{NonZeroU32, NonZeroU64};
|
||||
use std::time::Instant;
|
||||
|
||||
use chrono::{TimeZone, Utc};
|
||||
|
||||
use data_types::database_rules::LifecycleRules;
|
||||
use lifecycle::{LockableChunk, LockablePartition};
|
||||
use query::QueryDatabase;
|
||||
|
||||
use crate::db::test_helpers::write_lp;
|
||||
use crate::utils::TestDb;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_flush_overlapping() {
|
||||
let test_db = TestDb::builder()
|
||||
.lifecycle_rules(LifecycleRules {
|
||||
late_arrive_window_seconds: NonZeroU32::new(1).unwrap(),
|
||||
// Disable lifecycle manager - TODO: Better way to do this, as this will still run the loop once
|
||||
worker_backoff_millis: NonZeroU64::new(u64::MAX).unwrap(),
|
||||
..Default::default()
|
||||
})
|
||||
.build()
|
||||
.await;
|
||||
|
||||
let db = test_db.db;
|
||||
|
||||
write_lp(db.as_ref(), "cpu,tag1=cupcakes bar=1 10").await;
|
||||
|
||||
let partition_keys = db.partition_keys().unwrap();
|
||||
assert_eq!(partition_keys.len(), 1);
|
||||
let db_partition = db.partition("cpu", &partition_keys[0]).unwrap();
|
||||
|
||||
// Wait for the persistence window to be closed
|
||||
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
|
||||
|
||||
write_lp(db.as_ref(), "cpu,tag1=lagged bar=1 10").await;
|
||||
|
||||
let partition = LockableCatalogPartition::new(Arc::clone(&db), Arc::clone(&db_partition));
|
||||
let partition = partition.read();
|
||||
|
||||
let chunks = LockablePartition::chunks(&partition);
|
||||
let chunks: Vec<_> = chunks.iter().map(|x| x.1.read()).collect();
|
||||
|
||||
let mut partition = partition.upgrade();
|
||||
|
||||
let handle = LockablePartition::prepare_persist(&mut partition, Instant::now())
|
||||
.unwrap()
|
||||
.0;
|
||||
|
||||
assert_eq!(handle.timestamp(), Utc.timestamp_nanos(10));
|
||||
let chunks: Vec<_> = chunks.into_iter().map(|x| x.upgrade()).collect();
|
||||
|
||||
persist_chunks(partition, chunks, handle)
|
||||
.unwrap()
|
||||
.1
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
|
||||
assert!(db_partition
|
||||
.read()
|
||||
.persistence_windows()
|
||||
.unwrap()
|
||||
.minimum_unpersisted_age()
|
||||
.is_none());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -123,8 +123,7 @@ pub fn write_chunk_to_object_store(
|
|||
.catalog
|
||||
.metrics_registry
|
||||
.register_domain_with_labels("parquet", db.catalog.metric_labels.clone());
|
||||
let metrics =
|
||||
ParquetChunkMetrics::new(&metrics, db.catalog.metrics().memory().parquet());
|
||||
let metrics = ParquetChunkMetrics::new(&metrics);
|
||||
let parquet_chunk = Arc::new(
|
||||
ParquetChunk::new(
|
||||
path.clone(),
|
||||
|
|
|
@ -155,7 +155,7 @@ impl CatalogState for Catalog {
|
|||
.metrics_registry
|
||||
.register_domain_with_labels("parquet", self.metric_labels.clone());
|
||||
|
||||
let metrics = ParquetChunkMetrics::new(&metrics, self.metrics().memory().parquet());
|
||||
let metrics = ParquetChunkMetrics::new(&metrics);
|
||||
let parquet_chunk = ParquetChunk::new(
|
||||
object_store.path_from_dirs_and_filename(info.path.clone()),
|
||||
object_store,
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
use std::{borrow::Cow, convert::TryFrom, num::NonZeroU64, sync::Arc, time::Duration};
|
||||
use std::{borrow::Cow, convert::TryFrom, sync::Arc, time::Duration};
|
||||
|
||||
use data_types::{
|
||||
chunk_metadata::{ChunkStorage, ChunkSummary},
|
||||
|
@ -14,6 +14,7 @@ use crate::{
|
|||
db::{load::load_or_create_preserved_catalog, DatabaseToCommit, Db},
|
||||
JobRegistry,
|
||||
};
|
||||
use data_types::database_rules::LifecycleRules;
|
||||
|
||||
// A wrapper around a Db and a metrics registry allowing for isolated testing
|
||||
// of a Db and its metrics.
|
||||
|
@ -36,7 +37,7 @@ pub struct TestDbBuilder {
|
|||
db_name: Option<DatabaseName<'static>>,
|
||||
worker_cleanup_avg_sleep: Option<Duration>,
|
||||
write_buffer: Option<WriteBufferConfig>,
|
||||
catalog_transactions_until_checkpoint: Option<NonZeroU64>,
|
||||
lifecycle_rules: Option<LifecycleRules>,
|
||||
partition_template: Option<PartitionTemplate>,
|
||||
}
|
||||
|
||||
|
@ -76,10 +77,7 @@ impl TestDbBuilder {
|
|||
.worker_cleanup_avg_sleep
|
||||
.unwrap_or_else(|| Duration::from_secs(1));
|
||||
|
||||
// enable checkpointing
|
||||
if let Some(v) = self.catalog_transactions_until_checkpoint {
|
||||
rules.lifecycle_rules.catalog_transactions_until_checkpoint = v;
|
||||
}
|
||||
rules.lifecycle_rules = self.lifecycle_rules.unwrap_or_default();
|
||||
|
||||
// set partion template
|
||||
if let Some(partition_template) = self.partition_template {
|
||||
|
@ -132,8 +130,8 @@ impl TestDbBuilder {
|
|||
self
|
||||
}
|
||||
|
||||
pub fn catalog_transactions_until_checkpoint(mut self, interval: NonZeroU64) -> Self {
|
||||
self.catalog_transactions_until_checkpoint = Some(interval);
|
||||
pub fn lifecycle_rules(mut self, lifecycle_rules: LifecycleRules) -> Self {
|
||||
self.lifecycle_rules = Some(lifecycle_rules);
|
||||
self
|
||||
}
|
||||
|
||||
|
|
|
@ -15,6 +15,7 @@ arrow_util = { path = "../arrow_util" }
|
|||
entry = { path = "../entry" }
|
||||
criterion = { version = "0.3.4", features = ["async_tokio"] }
|
||||
datafusion = { path = "../datafusion" }
|
||||
data_types = { path = "../data_types" }
|
||||
flate2 = "1.0.20"
|
||||
influxdb_tsm = { path = "../influxdb_tsm" }
|
||||
influxdb_line_protocol = { path = "../influxdb_line_protocol" }
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
use criterion::{criterion_group, criterion_main, BatchSize, Criterion, SamplingMode};
|
||||
use data_types::database_rules::LifecycleRules;
|
||||
use object_store::{ObjectStore, ThrottleConfig};
|
||||
use server::{db::test_helpers::write_lp, utils::TestDb};
|
||||
use std::{convert::TryFrom, num::NonZeroU64, sync::Arc, time::Duration};
|
||||
|
@ -101,7 +102,11 @@ async fn setup(object_store: Arc<ObjectStore>, done: &Mutex<bool>) {
|
|||
async fn create_persisted_db(object_store: Arc<ObjectStore>) -> TestDb {
|
||||
TestDb::builder()
|
||||
.object_store(object_store)
|
||||
.catalog_transactions_until_checkpoint(NonZeroU64::try_from(CHECKPOINT_INTERVAL).unwrap())
|
||||
.lifecycle_rules(LifecycleRules {
|
||||
catalog_transactions_until_checkpoint: NonZeroU64::try_from(CHECKPOINT_INTERVAL)
|
||||
.unwrap(),
|
||||
..Default::default()
|
||||
})
|
||||
.build()
|
||||
.await
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue