Merge pull request #6426 from influxdata/dom/hot-partitions
feat(ingester2): hot partition persistencepull/24376/head
commit
97d90f5615
|
@ -50,4 +50,14 @@ pub struct Ingester2Config {
|
|||
action
|
||||
)]
|
||||
pub persist_queue_depth: usize,
|
||||
|
||||
/// The limit at which a partition's estimated persistence cost causes it to
|
||||
/// be queued for persistence.
|
||||
#[clap(
|
||||
long = "persist-hot-partition-cost",
|
||||
env = "INFLUXDB_IOX_PERSIST_HOT_PARTITION_COST",
|
||||
default_value = "20000000", // 20,000,000
|
||||
action
|
||||
)]
|
||||
pub persist_hot_partition_cost: usize,
|
||||
}
|
||||
|
|
|
@ -6,3 +6,5 @@ pub(crate) mod table;
|
|||
mod root;
|
||||
#[allow(unused_imports)]
|
||||
pub(crate) use root::*;
|
||||
|
||||
pub(crate) mod post_write;
|
||||
|
|
|
@ -13,6 +13,7 @@ use trace::span::Span;
|
|||
|
||||
use super::{
|
||||
partition::resolver::PartitionProvider,
|
||||
post_write::PostWriteObserver,
|
||||
table::{name_resolver::TableNameProvider, TableData},
|
||||
};
|
||||
use crate::{
|
||||
|
@ -53,7 +54,7 @@ impl std::fmt::Display for NamespaceName {
|
|||
|
||||
/// Data of a Namespace that belongs to a given Shard
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct NamespaceData {
|
||||
pub(crate) struct NamespaceData<O> {
|
||||
namespace_id: NamespaceId,
|
||||
namespace_name: Arc<DeferredLoad<NamespaceName>>,
|
||||
|
||||
|
@ -64,7 +65,7 @@ pub(crate) struct NamespaceData {
|
|||
/// resolve the [`TableName`] for new [`TableData`] out of the hot path.
|
||||
///
|
||||
/// [`TableName`]: crate::buffer_tree::table::TableName
|
||||
tables: ArcMap<TableId, TableData>,
|
||||
tables: ArcMap<TableId, TableData<O>>,
|
||||
table_name_resolver: Arc<dyn TableNameProvider>,
|
||||
/// The count of tables initialised in this Ingester so far, across all
|
||||
/// namespaces.
|
||||
|
@ -74,15 +75,18 @@ pub(crate) struct NamespaceData {
|
|||
///
|
||||
/// [`PartitionData`]: super::partition::PartitionData
|
||||
partition_provider: Arc<dyn PartitionProvider>,
|
||||
|
||||
post_write_observer: Arc<O>,
|
||||
}
|
||||
|
||||
impl NamespaceData {
|
||||
impl<O> NamespaceData<O> {
|
||||
/// Initialize new tables with default partition template of daily
|
||||
pub(super) fn new(
|
||||
namespace_id: NamespaceId,
|
||||
namespace_name: DeferredLoad<NamespaceName>,
|
||||
table_name_resolver: Arc<dyn TableNameProvider>,
|
||||
partition_provider: Arc<dyn PartitionProvider>,
|
||||
post_write_observer: Arc<O>,
|
||||
metrics: &metric::Registry,
|
||||
) -> Self {
|
||||
let table_count = metrics
|
||||
|
@ -99,11 +103,12 @@ impl NamespaceData {
|
|||
table_name_resolver,
|
||||
table_count,
|
||||
partition_provider,
|
||||
post_write_observer,
|
||||
}
|
||||
}
|
||||
|
||||
/// Return the table data by ID.
|
||||
pub(crate) fn table(&self, table_id: TableId) -> Option<Arc<TableData>> {
|
||||
pub(crate) fn table(&self, table_id: TableId) -> Option<Arc<TableData<O>>> {
|
||||
self.tables.get(&table_id)
|
||||
}
|
||||
|
||||
|
@ -122,13 +127,16 @@ impl NamespaceData {
|
|||
/// NOTE: the snapshot is an atomic / point-in-time snapshot of the set of
|
||||
/// [`NamespaceData`], but the tables (and partitions) within them may
|
||||
/// change as they continue to buffer DML operations.
|
||||
pub(super) fn tables(&self) -> Vec<Arc<TableData>> {
|
||||
pub(super) fn tables(&self) -> Vec<Arc<TableData<O>>> {
|
||||
self.tables.values()
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl DmlSink for NamespaceData {
|
||||
impl<O> DmlSink for NamespaceData<O>
|
||||
where
|
||||
O: PostWriteObserver,
|
||||
{
|
||||
type Error = mutable_batch::Error;
|
||||
|
||||
async fn apply(&self, op: DmlOperation) -> Result<(), Self::Error> {
|
||||
|
@ -154,6 +162,7 @@ impl DmlSink for NamespaceData {
|
|||
self.namespace_id,
|
||||
Arc::clone(&self.namespace_name),
|
||||
Arc::clone(&self.partition_provider),
|
||||
Arc::clone(&self.post_write_observer),
|
||||
))
|
||||
});
|
||||
|
||||
|
@ -180,7 +189,10 @@ impl DmlSink for NamespaceData {
|
|||
}
|
||||
|
||||
#[async_trait]
|
||||
impl QueryExec for NamespaceData {
|
||||
impl<O> QueryExec for NamespaceData<O>
|
||||
where
|
||||
O: Send + Sync + std::fmt::Debug,
|
||||
{
|
||||
type Response = QueryResponse;
|
||||
|
||||
async fn query_exec(
|
||||
|
@ -222,6 +234,7 @@ mod tests {
|
|||
buffer_tree::{
|
||||
namespace::NamespaceData,
|
||||
partition::{resolver::mock::MockPartitionProvider, PartitionData, SortKeyState},
|
||||
post_write::mock::MockPostWriteObserver,
|
||||
table::{name_resolver::mock::MockTableNameProvider, TableName},
|
||||
},
|
||||
deferred_load::{self, DeferredLoad},
|
||||
|
@ -261,6 +274,7 @@ mod tests {
|
|||
DeferredLoad::new(Duration::from_millis(1), async { NAMESPACE_NAME.into() }),
|
||||
Arc::new(MockTableNameProvider::new(TABLE_NAME)),
|
||||
partition_provider,
|
||||
Arc::new(MockPostWriteObserver::default()),
|
||||
&metrics,
|
||||
);
|
||||
|
||||
|
|
|
@ -139,6 +139,10 @@ impl PartitionData {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn persist_cost_estimate(&self) -> usize {
|
||||
self.buffer.persist_cost_estimate()
|
||||
}
|
||||
|
||||
/// Return all data for this partition, ordered by the calls to
|
||||
/// [`PartitionData::buffer_write()`].
|
||||
pub(crate) fn get_query_data(&mut self) -> Option<QueryAdaptor> {
|
||||
|
|
|
@ -55,6 +55,12 @@ impl DataBuffer {
|
|||
})
|
||||
}
|
||||
|
||||
pub(crate) fn persist_cost_estimate(&self) -> usize {
|
||||
match self.0.get() {
|
||||
FsmState::Buffering(b) => b.persist_cost_estimate(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Return all data for this buffer, ordered by the [`SequenceNumber`] from
|
||||
/// which it was buffered with.
|
||||
pub(crate) fn get_query_data(&mut self) -> Vec<Arc<RecordBatch>> {
|
||||
|
|
|
@ -43,6 +43,11 @@ impl<T> AlwaysSome<T> {
|
|||
pub(crate) fn into_inner(self) -> T {
|
||||
self.0.unwrap()
|
||||
}
|
||||
|
||||
/// Return an immutable reference to the inner `T`.
|
||||
pub(crate) fn get(&self) -> &T {
|
||||
self.0.as_ref().unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
|
|
@ -54,4 +54,8 @@ impl Buffer {
|
|||
pub(super) fn buffer(&self) -> Option<&MutableBatch> {
|
||||
self.buffer.as_ref()
|
||||
}
|
||||
|
||||
pub(crate) fn persist_cost_estimate(&self) -> usize {
|
||||
self.buffer().map(|v| v.size_data()).unwrap_or_default()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -76,6 +76,10 @@ impl BufferState<Buffering> {
|
|||
// And transition to the WithSnapshot state.
|
||||
Transition::ok(Snapshot::new(vec![snap]))
|
||||
}
|
||||
|
||||
pub(crate) fn persist_cost_estimate(&self) -> usize {
|
||||
self.state.buffer.persist_cost_estimate()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
|
|
@ -0,0 +1,22 @@
|
|||
use std::sync::Arc;
|
||||
|
||||
use parking_lot::Mutex;
|
||||
|
||||
use crate::buffer_tree::partition::PartitionData;
|
||||
|
||||
use super::PostWriteObserver;
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub(crate) struct MockPostWriteObserver {
|
||||
saw: Mutex<Vec<Arc<Mutex<PartitionData>>>>,
|
||||
}
|
||||
|
||||
impl PostWriteObserver for MockPostWriteObserver {
|
||||
fn observe(
|
||||
&self,
|
||||
partition: Arc<Mutex<PartitionData>>,
|
||||
_guard: parking_lot::MutexGuard<'_, PartitionData>,
|
||||
) {
|
||||
self.saw.lock().push(partition);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,5 @@
|
|||
mod r#trait;
|
||||
pub(crate) use r#trait::*;
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) mod mock;
|
|
@ -0,0 +1,9 @@
|
|||
use std::{fmt::Debug, sync::Arc};
|
||||
|
||||
use parking_lot::{Mutex, MutexGuard};
|
||||
|
||||
use crate::buffer_tree::partition::PartitionData;
|
||||
|
||||
pub(crate) trait PostWriteObserver: Send + Sync + Debug {
|
||||
fn observe(&self, partition: Arc<Mutex<PartitionData>>, guard: MutexGuard<'_, PartitionData>);
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
use std::sync::Arc;
|
||||
use std::{fmt::Debug, sync::Arc};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use data_types::{NamespaceId, TableId};
|
||||
|
@ -10,6 +10,7 @@ use trace::span::Span;
|
|||
use super::{
|
||||
namespace::{name_resolver::NamespaceNameProvider, NamespaceData},
|
||||
partition::{resolver::PartitionProvider, PartitionData},
|
||||
post_write::PostWriteObserver,
|
||||
table::name_resolver::TableNameProvider,
|
||||
};
|
||||
use crate::{
|
||||
|
@ -68,7 +69,7 @@ use crate::{
|
|||
/// [`TableData`]: crate::buffer_tree::table::TableData
|
||||
/// [`PartitionData`]: crate::buffer_tree::partition::PartitionData
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct BufferTree {
|
||||
pub(crate) struct BufferTree<O> {
|
||||
/// The resolver of `(table_id, partition_key)` to [`PartitionData`].
|
||||
///
|
||||
/// [`PartitionData`]: super::partition::PartitionData
|
||||
|
@ -83,7 +84,7 @@ pub(crate) struct BufferTree {
|
|||
///
|
||||
/// [`DeferredLoad`]: crate::deferred_load::DeferredLoad
|
||||
/// [`NamespaceName`]: data_types::NamespaceName
|
||||
namespaces: ArcMap<NamespaceId, NamespaceData>,
|
||||
namespaces: ArcMap<NamespaceId, NamespaceData<O>>,
|
||||
namespace_name_resolver: Arc<dyn NamespaceNameProvider>,
|
||||
/// The [`TableName`] provider used by [`NamespaceData`] to initialise a
|
||||
/// [`TableData`].
|
||||
|
@ -94,14 +95,20 @@ pub(crate) struct BufferTree {
|
|||
|
||||
metrics: Arc<metric::Registry>,
|
||||
namespace_count: U64Counter,
|
||||
|
||||
post_write_observer: Arc<O>,
|
||||
}
|
||||
|
||||
impl BufferTree {
|
||||
impl<O> BufferTree<O>
|
||||
where
|
||||
O: Send + Sync + Debug,
|
||||
{
|
||||
/// Initialise a new [`BufferTree`] that emits metrics to `metrics`.
|
||||
pub(crate) fn new(
|
||||
namespace_name_resolver: Arc<dyn NamespaceNameProvider>,
|
||||
table_name_resolver: Arc<dyn TableNameProvider>,
|
||||
partition_provider: Arc<dyn PartitionProvider>,
|
||||
post_write_observer: Arc<O>,
|
||||
metrics: Arc<metric::Registry>,
|
||||
) -> Self {
|
||||
let namespace_count = metrics
|
||||
|
@ -117,12 +124,13 @@ impl BufferTree {
|
|||
table_name_resolver,
|
||||
metrics,
|
||||
partition_provider,
|
||||
post_write_observer,
|
||||
namespace_count,
|
||||
}
|
||||
}
|
||||
|
||||
/// Gets the namespace data out of the map
|
||||
pub(crate) fn namespace(&self, namespace_id: NamespaceId) -> Option<Arc<NamespaceData>> {
|
||||
pub(crate) fn namespace(&self, namespace_id: NamespaceId) -> Option<Arc<NamespaceData<O>>> {
|
||||
self.namespaces.get(&namespace_id)
|
||||
}
|
||||
|
||||
|
@ -148,7 +156,10 @@ impl BufferTree {
|
|||
}
|
||||
|
||||
#[async_trait]
|
||||
impl DmlSink for BufferTree {
|
||||
impl<O> DmlSink for BufferTree<O>
|
||||
where
|
||||
O: PostWriteObserver,
|
||||
{
|
||||
type Error = mutable_batch::Error;
|
||||
|
||||
async fn apply(&self, op: DmlOperation) -> Result<(), Self::Error> {
|
||||
|
@ -163,6 +174,7 @@ impl DmlSink for BufferTree {
|
|||
self.namespace_name_resolver.for_namespace(namespace_id),
|
||||
Arc::clone(&self.table_name_resolver),
|
||||
Arc::clone(&self.partition_provider),
|
||||
Arc::clone(&self.post_write_observer),
|
||||
&self.metrics,
|
||||
))
|
||||
});
|
||||
|
@ -172,7 +184,10 @@ impl DmlSink for BufferTree {
|
|||
}
|
||||
|
||||
#[async_trait]
|
||||
impl QueryExec for BufferTree {
|
||||
impl<O> QueryExec for BufferTree<O>
|
||||
where
|
||||
O: Send + Sync + Debug,
|
||||
{
|
||||
type Response = QueryResponse;
|
||||
|
||||
async fn query_exec(
|
||||
|
@ -212,6 +227,7 @@ mod tests {
|
|||
name_resolver::mock::MockNamespaceNameProvider, NamespaceData, NamespaceName,
|
||||
},
|
||||
partition::{resolver::mock::MockPartitionProvider, PartitionData, SortKeyState},
|
||||
post_write::mock::MockPostWriteObserver,
|
||||
table::{name_resolver::mock::MockTableNameProvider, TableName},
|
||||
},
|
||||
deferred_load::{self, DeferredLoad},
|
||||
|
@ -252,6 +268,7 @@ mod tests {
|
|||
DeferredLoad::new(Duration::from_millis(1), async { NAMESPACE_NAME.into() }),
|
||||
Arc::new(MockTableNameProvider::new(TABLE_NAME)),
|
||||
partition_provider,
|
||||
Arc::new(MockPostWriteObserver::default()),
|
||||
&metrics,
|
||||
);
|
||||
|
||||
|
@ -320,6 +337,7 @@ mod tests {
|
|||
Arc::new(MockNamespaceNameProvider::default()),
|
||||
Arc::new(MockTableNameProvider::new(TABLE_NAME)),
|
||||
partition_provider,
|
||||
Arc::new(MockPostWriteObserver::default()),
|
||||
Arc::new(metric::Registry::default()),
|
||||
);
|
||||
|
||||
|
@ -651,6 +669,7 @@ mod tests {
|
|||
Arc::new(MockNamespaceNameProvider::default()),
|
||||
Arc::new(MockTableNameProvider::new(TABLE_NAME)),
|
||||
partition_provider,
|
||||
Arc::new(MockPostWriteObserver::default()),
|
||||
Arc::clone(&metrics),
|
||||
);
|
||||
|
||||
|
@ -755,6 +774,7 @@ mod tests {
|
|||
Arc::new(MockNamespaceNameProvider::default()),
|
||||
Arc::new(MockTableNameProvider::new(TABLE_NAME)),
|
||||
partition_provider,
|
||||
Arc::new(MockPostWriteObserver::default()),
|
||||
Arc::clone(&Arc::new(metric::Registry::default())),
|
||||
);
|
||||
|
||||
|
@ -837,6 +857,7 @@ mod tests {
|
|||
Arc::new(MockNamespaceNameProvider::default()),
|
||||
Arc::new(MockTableNameProvider::new(TABLE_NAME)),
|
||||
partition_provider,
|
||||
Arc::new(MockPostWriteObserver::default()),
|
||||
Arc::new(metric::Registry::default()),
|
||||
);
|
||||
|
||||
|
@ -932,6 +953,7 @@ mod tests {
|
|||
Arc::new(MockNamespaceNameProvider::default()),
|
||||
Arc::new(MockTableNameProvider::new(TABLE_NAME)),
|
||||
partition_provider,
|
||||
Arc::new(MockPostWriteObserver::default()),
|
||||
Arc::new(metric::Registry::default()),
|
||||
);
|
||||
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
|
||||
pub(crate) mod name_resolver;
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::{fmt::Debug, sync::Arc};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use data_types::{NamespaceId, PartitionId, PartitionKey, SequenceNumber, TableId};
|
||||
|
@ -15,6 +15,7 @@ use trace::span::{Span, SpanRecorder};
|
|||
use super::{
|
||||
namespace::NamespaceName,
|
||||
partition::{resolver::PartitionProvider, PartitionData},
|
||||
post_write::PostWriteObserver,
|
||||
};
|
||||
use crate::{
|
||||
arcmap::ArcMap,
|
||||
|
@ -79,7 +80,7 @@ impl From<TableName> for Arc<str> {
|
|||
|
||||
impl std::fmt::Display for TableName {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
self.0.fmt(f)
|
||||
std::fmt::Display::fmt(&self.0, f)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -99,7 +100,7 @@ impl PartialEq<str> for TableName {
|
|||
|
||||
/// Data of a Table in a given Namesapce that belongs to a given Shard
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct TableData {
|
||||
pub(crate) struct TableData<O> {
|
||||
table_id: TableId,
|
||||
table_name: Arc<DeferredLoad<TableName>>,
|
||||
|
||||
|
@ -113,9 +114,11 @@ pub(crate) struct TableData {
|
|||
|
||||
// Map of partition key to its data
|
||||
partition_data: RwLock<DoubleRef>,
|
||||
|
||||
post_write_observer: Arc<O>,
|
||||
}
|
||||
|
||||
impl TableData {
|
||||
impl<O> TableData<O> {
|
||||
/// Initialize new table buffer identified by [`TableId`] in the catalog.
|
||||
///
|
||||
/// Optionally the given tombstone max [`SequenceNumber`] identifies the
|
||||
|
@ -132,6 +135,7 @@ impl TableData {
|
|||
namespace_id: NamespaceId,
|
||||
namespace_name: Arc<DeferredLoad<NamespaceName>>,
|
||||
partition_provider: Arc<dyn PartitionProvider>,
|
||||
post_write_observer: Arc<O>,
|
||||
) -> Self {
|
||||
Self {
|
||||
table_id,
|
||||
|
@ -140,44 +144,10 @@ impl TableData {
|
|||
namespace_name,
|
||||
partition_data: Default::default(),
|
||||
partition_provider,
|
||||
post_write_observer,
|
||||
}
|
||||
}
|
||||
|
||||
// buffers the table write and returns true if the lifecycle manager indicates that
|
||||
// ingest should be paused.
|
||||
pub(super) async fn buffer_table_write(
|
||||
&self,
|
||||
sequence_number: SequenceNumber,
|
||||
batch: MutableBatch,
|
||||
partition_key: PartitionKey,
|
||||
) -> Result<(), mutable_batch::Error> {
|
||||
let p = self.partition_data.read().by_key(&partition_key);
|
||||
let partition_data = match p {
|
||||
Some(p) => p,
|
||||
None => {
|
||||
let p = self
|
||||
.partition_provider
|
||||
.get_partition(
|
||||
partition_key.clone(),
|
||||
self.namespace_id,
|
||||
Arc::clone(&self.namespace_name),
|
||||
self.table_id,
|
||||
Arc::clone(&self.table_name),
|
||||
)
|
||||
.await;
|
||||
// Add the double-referenced partition to the map.
|
||||
//
|
||||
// This MAY return a different instance than `p` if another
|
||||
// thread has already initialised the partition.
|
||||
self.partition_data.write().try_insert(p)
|
||||
}
|
||||
};
|
||||
|
||||
partition_data.lock().buffer_write(batch, sequence_number)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Return a mutable reference to all partitions buffered for this table.
|
||||
///
|
||||
/// # Ordering
|
||||
|
@ -224,8 +194,59 @@ impl TableData {
|
|||
}
|
||||
}
|
||||
|
||||
impl<O> TableData<O>
|
||||
where
|
||||
O: PostWriteObserver,
|
||||
{
|
||||
// buffers the table write and returns true if the lifecycle manager indicates that
|
||||
// ingest should be paused.
|
||||
pub(super) async fn buffer_table_write(
|
||||
&self,
|
||||
sequence_number: SequenceNumber,
|
||||
batch: MutableBatch,
|
||||
partition_key: PartitionKey,
|
||||
) -> Result<(), mutable_batch::Error> {
|
||||
let p = self.partition_data.read().by_key(&partition_key);
|
||||
let partition_data = match p {
|
||||
Some(p) => p,
|
||||
None => {
|
||||
let p = self
|
||||
.partition_provider
|
||||
.get_partition(
|
||||
partition_key.clone(),
|
||||
self.namespace_id,
|
||||
Arc::clone(&self.namespace_name),
|
||||
self.table_id,
|
||||
Arc::clone(&self.table_name),
|
||||
)
|
||||
.await;
|
||||
// Add the double-referenced partition to the map.
|
||||
//
|
||||
// This MAY return a different instance than `p` if another
|
||||
// thread has already initialised the partition.
|
||||
self.partition_data.write().try_insert(p)
|
||||
}
|
||||
};
|
||||
|
||||
// Obtain the partition lock.
|
||||
let mut p = partition_data.lock();
|
||||
|
||||
// Enqueue the write, returning any error.
|
||||
p.buffer_write(batch, sequence_number)?;
|
||||
|
||||
// If successful, allow the observer to inspect the partition.
|
||||
self.post_write_observer
|
||||
.observe(Arc::clone(&partition_data), p);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl QueryExec for TableData {
|
||||
impl<O> QueryExec for TableData<O>
|
||||
where
|
||||
O: Send + Sync + Debug,
|
||||
{
|
||||
type Response = PartitionStream;
|
||||
|
||||
async fn query_exec(
|
||||
|
@ -288,8 +309,9 @@ mod tests {
|
|||
use mutable_batch_lp::lines_to_batches;
|
||||
|
||||
use super::*;
|
||||
use crate::buffer_tree::partition::{
|
||||
resolver::mock::MockPartitionProvider, PartitionData, SortKeyState,
|
||||
use crate::buffer_tree::{
|
||||
partition::{resolver::mock::MockPartitionProvider, PartitionData, SortKeyState},
|
||||
post_write::mock::MockPostWriteObserver,
|
||||
};
|
||||
|
||||
const TABLE_NAME: &str = "bananas";
|
||||
|
@ -328,6 +350,7 @@ mod tests {
|
|||
NamespaceName::from("platanos")
|
||||
})),
|
||||
partition_provider,
|
||||
Arc::new(MockPostWriteObserver::default()),
|
||||
);
|
||||
|
||||
let batch = lines_to_batches(r#"bananas,bat=man value=24 42"#, 0)
|
||||
|
|
|
@ -23,7 +23,7 @@ use crate::{
|
|||
table::name_resolver::{TableNameProvider, TableNameResolver},
|
||||
BufferTree,
|
||||
},
|
||||
persist::handle::PersistHandle,
|
||||
persist::{handle::PersistHandle, hot_partitions::HotPartitionPersister},
|
||||
server::grpc::GrpcDelegate,
|
||||
timestamp_oracle::TimestampOracle,
|
||||
wal::{rotate_task::periodic_rotation, wal_sink::WalSink},
|
||||
|
@ -155,6 +155,7 @@ pub async fn new(
|
|||
persist_executor: Arc<Executor>,
|
||||
persist_workers: usize,
|
||||
persist_queue_depth: usize,
|
||||
persist_hot_partition_cost: usize,
|
||||
object_store: ParquetStorage,
|
||||
) -> Result<IngesterGuard<impl IngesterRpcInterface>, InitError> {
|
||||
// Create the transition shard.
|
||||
|
@ -213,29 +214,6 @@ pub async fn new(
|
|||
);
|
||||
let partition_provider: Arc<dyn PartitionProvider> = Arc::new(partition_provider);
|
||||
|
||||
let buffer = Arc::new(BufferTree::new(
|
||||
namespace_name_provider,
|
||||
table_name_provider,
|
||||
partition_provider,
|
||||
Arc::clone(&metrics),
|
||||
));
|
||||
|
||||
// TODO: start hot-partition persist task before replaying the WAL
|
||||
//
|
||||
// By starting the persist task first, the ingester can persist files during
|
||||
// WAL replay if necessary. This could happen if the configuration of the
|
||||
// ingester was changed to persist smaller partitions in-between executions
|
||||
// (such as if the ingester was OOMing during WAL replay, and the
|
||||
// configuration was changed to mitigate it.)
|
||||
|
||||
// Initialise the WAL
|
||||
let wal = Wal::new(wal_directory).await.map_err(InitError::WalInit)?;
|
||||
|
||||
// Replay the WAL log files, if any.
|
||||
let max_sequence_number = wal_replay::replay(&wal, &buffer)
|
||||
.await
|
||||
.map_err(|e| InitError::WalReplay(e.into()))?;
|
||||
|
||||
// Spawn the persist workers to compact partition data, convert it into
|
||||
// Parquet files, and upload them to object storage.
|
||||
let (persist_handle, persist_state) = PersistHandle::new(
|
||||
|
@ -246,6 +224,34 @@ pub async fn new(
|
|||
Arc::clone(&catalog),
|
||||
);
|
||||
|
||||
// Instantiate a post-write observer for hot partition persistence.
|
||||
//
|
||||
// By enabling hot partition persistence before replaying the WAL, the
|
||||
// ingester can persist files during WAL replay.
|
||||
//
|
||||
// It is also important to respect potential configuration changes between
|
||||
// runs, such as if the configuration of the ingester was changed to persist
|
||||
// smaller partitions in-between executions because it was OOMing during WAL
|
||||
// replay (and the configuration was changed to mitigate it).
|
||||
let hot_partition_persister =
|
||||
HotPartitionPersister::new(persist_handle.clone(), persist_hot_partition_cost);
|
||||
|
||||
let buffer = Arc::new(BufferTree::new(
|
||||
namespace_name_provider,
|
||||
table_name_provider,
|
||||
partition_provider,
|
||||
Arc::new(hot_partition_persister),
|
||||
Arc::clone(&metrics),
|
||||
));
|
||||
|
||||
// Initialise the WAL
|
||||
let wal = Wal::new(wal_directory).await.map_err(InitError::WalInit)?;
|
||||
|
||||
// Replay the WAL log files, if any.
|
||||
let max_sequence_number = wal_replay::replay(&wal, &buffer)
|
||||
.await
|
||||
.map_err(|e| InitError::WalReplay(e.into()))?;
|
||||
|
||||
// Build the chain of DmlSink that forms the write path.
|
||||
let write_path = WalSink::new(Arc::clone(&buffer), Arc::clone(&wal));
|
||||
|
||||
|
|
|
@ -444,6 +444,7 @@ mod tests {
|
|||
buffer_tree::{
|
||||
namespace::{name_resolver::mock::MockNamespaceNameProvider, NamespaceName},
|
||||
partition::resolver::mock::MockPartitionProvider,
|
||||
post_write::mock::MockPostWriteObserver,
|
||||
table::{name_resolver::mock::MockTableNameProvider, TableName},
|
||||
BufferTree,
|
||||
},
|
||||
|
@ -493,6 +494,7 @@ mod tests {
|
|||
sort_key,
|
||||
)),
|
||||
),
|
||||
Arc::new(MockPostWriteObserver::default()),
|
||||
Default::default(),
|
||||
);
|
||||
|
||||
|
|
|
@ -0,0 +1,71 @@
|
|||
use std::{fmt::Debug, sync::Arc};
|
||||
|
||||
use observability_deps::tracing::info;
|
||||
use parking_lot::{Mutex, MutexGuard};
|
||||
|
||||
use crate::buffer_tree::{partition::PartitionData, post_write::PostWriteObserver};
|
||||
|
||||
use super::handle::PersistHandle;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct HotPartitionPersister {
|
||||
persist_handle: PersistHandle,
|
||||
max_estimated_persist_cost: usize,
|
||||
}
|
||||
|
||||
impl HotPartitionPersister {
|
||||
pub fn new(persist_handle: PersistHandle, max_estimated_persist_cost: usize) -> Self {
|
||||
Self {
|
||||
persist_handle,
|
||||
max_estimated_persist_cost,
|
||||
}
|
||||
}
|
||||
|
||||
#[cold]
|
||||
fn persist(
|
||||
&self,
|
||||
cost_estimate: usize,
|
||||
partition: Arc<Mutex<PartitionData>>,
|
||||
mut guard: MutexGuard<'_, PartitionData>,
|
||||
) {
|
||||
info!(
|
||||
partition_id = guard.partition_id().get(),
|
||||
cost_estimate, "marking hot partition for persistence"
|
||||
);
|
||||
|
||||
let data = guard
|
||||
.mark_persisting()
|
||||
.expect("failed to transition buffer fsm to persisting state");
|
||||
|
||||
// Perform the enqueue in a separate task, to avoid blocking this
|
||||
// writer if the persist system is saturated.
|
||||
let persist_handle = self.persist_handle.clone();
|
||||
tokio::spawn(async move {
|
||||
// There is no need to await on the completion handle.
|
||||
let _ = persist_handle.queue_persist(partition, data).await;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
impl PostWriteObserver for HotPartitionPersister {
|
||||
#[inline(always)]
|
||||
fn observe(&self, partition: Arc<Mutex<PartitionData>>, guard: MutexGuard<'_, PartitionData>) {
|
||||
// Without releasing the lock, obtain the new persist cost estimate.
|
||||
let cost_estimate = guard.persist_cost_estimate();
|
||||
|
||||
// This observer is called after a successful write, therefore
|
||||
// persisting the partition MUST have a non-zero cost.
|
||||
assert!(cost_estimate > 0);
|
||||
|
||||
// If the estimated persist cost is over the limit, mark the
|
||||
// partition as persisting.
|
||||
//
|
||||
// This SHOULD happen atomically with the above write, to ensure
|
||||
// accurate buffer costing - if the lock were to be released, more
|
||||
// writes could be added to the buffer in parallel, exceeding the
|
||||
// limit before it was marked as persisting.
|
||||
if cost_estimate >= self.max_estimated_persist_cost {
|
||||
self.persist(cost_estimate, partition, guard)
|
||||
}
|
||||
}
|
||||
}
|
|
@ -2,3 +2,4 @@ pub(crate) mod backpressure;
|
|||
pub(super) mod compact;
|
||||
mod context;
|
||||
pub(crate) mod handle;
|
||||
pub(crate) mod hot_partitions;
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
use futures::{stream, StreamExt};
|
||||
use observability_deps::tracing::*;
|
||||
use std::{future, sync::Arc, time::Duration};
|
||||
use std::{fmt::Debug, future, sync::Arc, time::Duration};
|
||||
use tokio::time::Instant;
|
||||
|
||||
use crate::{buffer_tree::BufferTree, persist::handle::PersistHandle};
|
||||
|
@ -10,12 +10,14 @@ use crate::{buffer_tree::BufferTree, persist::handle::PersistHandle};
|
|||
const PERSIST_ENQUEUE_CONCURRENCY: usize = 5;
|
||||
|
||||
/// Rotate the `wal` segment file every `period` duration of time.
|
||||
pub(crate) async fn periodic_rotation(
|
||||
pub(crate) async fn periodic_rotation<O>(
|
||||
wal: Arc<wal::Wal>,
|
||||
period: Duration,
|
||||
buffer: Arc<BufferTree>,
|
||||
buffer: Arc<BufferTree<O>>,
|
||||
persist: PersistHandle,
|
||||
) {
|
||||
) where
|
||||
O: Send + Sync + Debug,
|
||||
{
|
||||
let mut interval = tokio::time::interval(period);
|
||||
|
||||
loop {
|
||||
|
|
|
@ -157,6 +157,7 @@ pub async fn create_ingester_server_type(
|
|||
exec,
|
||||
ingester_config.persist_max_parallelism,
|
||||
ingester_config.persist_queue_depth,
|
||||
ingester_config.persist_hot_partition_cost,
|
||||
object_store,
|
||||
)
|
||||
.await?;
|
||||
|
|
Loading…
Reference in New Issue