feat(ingester2): persist back-pressure

This commit causes an ingester2 instance to stop accepting new writes
when at least one persist queue is full. Writes continue to be rejected
until the persist workers have processed enough outstanding persist
tasks to drain the queues to half of their capacity, at which point
writes are accepted again.

When a write is rejected, the ingester returns a "resource exhausted"
RPC code to the caller.

Checking if the system is in a healthy state for writes is extremely
cheap, as it is on the hot path for all writes.
pull/24376/head
Dom Dwyer 2022-12-13 14:36:18 +01:00
parent 6c555600e0
commit e76b107332
No known key found for this signature in database
GPG Key ID: E4C40DBD9157879A
8 changed files with 727 additions and 42 deletions

View File

@ -241,7 +241,7 @@ pub async fn new(
// Spawn the persist workers to compact partition data, convert it into
// Parquet files, and upload them to object storage.
let persist_handle = PersistHandle::new(
let (persist_handle, persist_state) = PersistHandle::new(
persist_workers,
persist_worker_queue_depth,
persist_executor,
@ -273,7 +273,14 @@ pub async fn new(
));
Ok(IngesterGuard {
rpc: GrpcDelegate::new(Arc::new(write_path), buffer, timestamp, catalog, metrics),
rpc: GrpcDelegate::new(
Arc::new(write_path),
buffer,
timestamp,
persist_state,
catalog,
metrics,
),
rotation_task: handle,
})
}

View File

@ -95,7 +95,7 @@ where
"dropping empty wal segment",
);
// TODO(dom:test): empty WAL replay
// TODO(test): empty WAL replay
// A failure to delete an empty file should not prevent WAL
// replay from continuing.

View File

@ -0,0 +1,521 @@
use std::{
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
time::Duration,
};
use crossbeam_utils::CachePadded;
use observability_deps::tracing::*;
use parking_lot::Mutex;
use tokio::{
sync::mpsc,
task::JoinHandle,
time::{Interval, MissedTickBehavior},
};
/// The interval of time between evaluations of the state of the persist system
/// when [`CurrentState::Saturated`].
const EVALUATE_SATURATION_INTERVAL: Duration = Duration::from_secs(1);
/// A state of the persist system.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum CurrentState {
/// The system is operating normally.
Ok,
/// The persist system is overloaded.
Saturated,
}
/// A handle to read (and set, within the persist module) the state of the
/// persist system.
///
/// Clone operations are cheap, and state read operations are very cheap.
///
/// # Saturation Recovery
///
/// Once the persist system is marked as [`CurrentState::Saturated`], it remains
/// in that state until the following conditions are satisfied:
///
/// * There are no outstanding enqueue operations (no thread is blocked adding
/// an item to any work queue).
///
/// * All queues have at least half of their capacity free (being at most,
/// half full).
///
/// These conditions are evaluated periodically, at the interval specified in
/// [`EVALUATE_SATURATION_INTERVAL`].
#[derive(Debug)]
pub(crate) struct PersistState {
/// The actual state value.
///
/// The value of this variable is set to the [`CurrentState`] discriminant
/// for the respective state that was [`PersistState::set()`] in it.
///
/// This is cache padded due to the high read volume, preventing any
/// unfortunate false-sharing of cache lines from impacting the hot-path
/// reads.
state: CachePadded<AtomicUsize>,
/// Tracks the number of async tasks waiting within
/// [`PersistHandle::queue_persist()`], asynchronously blocking to enqueue a
/// persist job.
///
/// This is modified using [`Ordering::SeqCst`] as performance is not a
/// priority for code paths that modify it.
///
/// [`PersistHandle::queue_persist()`]:
/// super::handle::PersistHandle::queue_persist()
waiting_to_enqueue: Arc<AtomicUsize>,
/// The handle to the current saturation evaluation/recovery task, if any.
recovery_handle: Mutex<Option<JoinHandle<()>>>,
}
/// Initialise a [`PersistState`] with [`CurrentState::Ok`].
impl Default for PersistState {
fn default() -> Self {
let s = Self {
state: Default::default(),
waiting_to_enqueue: Arc::new(AtomicUsize::new(0)),
recovery_handle: Default::default(),
};
s.set(CurrentState::Ok);
s
}
}
impl PersistState {
/// Set the reported state of the [`PersistState`].
fn set(&self, s: CurrentState) -> bool {
// Set the new state, retaining the most recent state.
//
// SeqCst is absolute overkill, but is used here due to the strong
// ordering guarantees providing minimal risk of bugs. The low volume of
// writes to this variable means the overhead is more than acceptable.
let last = self.state.swap(s as usize, Ordering::SeqCst);
// If "s" does not match the old state, this is the first thread to
// switch the state from "last", to "s", since setting it to "last".
//
// Subsequent calls setting the state to "s" will return false, until a
// different state is set.
s as usize != last
}
/// Get the current reported state of the [`PersistState`].
///
/// Reading this value is extremely cheap and can be done without
/// performance concern.
///
/// This value is eventually consistent, with a presumption of
pub(crate) fn get(&self) -> CurrentState {
// Correctness: relaxed as reading the current state is allowed to be
// racy for performance reasons; this call should be as cheap as
// possible due to it being squarely in the hot path.
//
// Any value change will "eventually" be made visible to all threads, at
// which point this read converges to the latest value. A potential
// extra write or two arriving before this value is visible to all
// threads is acceptable in the "saturated" cold path, prioritising
// latency of the hot path.
match self.state.load(Ordering::Relaxed) {
v if v == CurrentState::Ok as usize => CurrentState::Ok,
v if v == CurrentState::Saturated as usize => CurrentState::Saturated,
_ => unreachable!(),
}
}
/// A convenience method that returns true if `self` is
/// [`CurrentState::Saturated`].
pub(crate) fn is_saturated(&self) -> bool {
self.get() == CurrentState::Saturated
}
/// Mark the persist system as saturated, returning a [`WaitGuard`] that
/// MUST be held during any subsequent async-blocking enqueue request
/// ([`mpsc::Sender::send()`] and the like).
///
/// Holding the guard over the `send()` await allows the saturation
/// evaluation to track the number of threads with an ongoing enqueue wait.
pub(super) fn set_saturated<T>(s: Arc<Self>, persist_queues: Vec<mpsc::Sender<T>>) -> WaitGuard
where
T: Send + 'static,
{
// Increment the number of tasks waiting to push into a queue.
//
// INVARIANT: this increment MUST happen-before returning the guard, and
// waiting on the queue send(), and before starting the saturation
// monitor task so that it observes this waiter.
let _ = s.waiting_to_enqueue.fetch_add(1, Ordering::SeqCst);
// Attempt to set the system to "saturated".
let first = s.set(CurrentState::Saturated);
if first {
// This is the first thread to mark the system as saturated.
warn!("persist queues saturated, blocking ingest");
// Always check the state of the system EVALUATE_SATURATION_INTERVAL
// duration of time after the last completed evaluation - do not
// attempt to check continuously should the check fall behind the
// ticker.
let mut interval = tokio::time::interval(EVALUATE_SATURATION_INTERVAL);
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
// Spawn a task that marks the system as not saturated after the queues
// have processed some of the backlog.
let h = tokio::spawn(saturation_monitor_task(
interval,
Arc::clone(&s),
persist_queues,
));
// Retain the task handle to avoid leaking it if dropped.
*s.recovery_handle.lock() = Some(h);
}
WaitGuard(Arc::clone(&s.waiting_to_enqueue))
}
/// A test-only helper that sets the state of `self` only. It does not spawn
/// a recovery task.
#[cfg(test)]
pub(crate) fn test_set_state(&self, s: CurrentState) {
self.set(s);
}
}
impl Drop for PersistState {
fn drop(&mut self) {
if let Some(h) = self.recovery_handle.lock().as_ref() {
h.abort();
}
}
}
/// A guard that decrements the number of writers waiting to enqueue an item
/// into the persistence queue when dropped.
///
/// This MUST be held whilst calling [`mpsc::Sender::send()`].
#[must_use = "must hold wait guard while waiting for enqueue"]
pub(super) struct WaitGuard(Arc<AtomicUsize>);
impl Drop for WaitGuard {
fn drop(&mut self) {
let _ = self.0.fetch_sub(1, Ordering::SeqCst);
}
}
/// A task that monitors the `waiters` and `queues` to determine when the
/// persist system is no longer saturated.
///
/// Once the system is no longer saturated (as determined according to the
/// documentation for [`PersistState`]), the [`PersistState`] is set to
/// [`CurrentState::Ok`].
async fn saturation_monitor_task<T>(
mut interval: Interval,
state: Arc<PersistState>,
queues: Vec<mpsc::Sender<T>>,
) where
T: Send,
{
loop {
// Wait before evaluating the state of the system.
interval.tick().await;
// INVARIANT: this task only ever runs when the system is saturated.
assert!(state.is_saturated());
// First check if any tasks are waiting to enqueue an item (an
// indication that one or more queues is full).
let n_waiting = state.waiting_to_enqueue.load(Ordering::SeqCst);
if n_waiting > 0 {
debug!(
n_waiting,
"waiting for outstanding persist jobs to be enqueued"
);
continue;
}
// No async task WAS currently waiting to enqueue a persist job when
// checking above, but one may want to immediately enqueue one now (or
// later).
//
// In order to minimise health flip-flopping, only mark the persist
// system as healthy once there is some capacity in the queues to accept
// new persist jobs. This avoids a queue having 1 slot free, only to be
// immediately filled and the system pause again.
//
// This check below ensures that all queues are at least half empty
// before marking the system as recovered.
let n_queues = queues
.iter()
.filter(|q| !has_sufficient_capacity(q.capacity(), q.max_capacity()))
.count();
if n_queues != 0 {
debug!(n_queues, "waiting for queues to drain");
continue;
}
// There are no outstanding enqueue waiters, and all queues are at half
// capacity or better.
info!("persist queue saturation reduced, resuming ingest");
// INVARIANT: there is only ever one task that monitors the queue state
// and transitions the persist state to OK, therefore this task is
// always the first to set the state to OK.
assert!(state.set(CurrentState::Ok));
// The task MUST immediately stop so any subsequent saturation is
// handled by the newly spawned task, upholding the above invariant.
return;
}
}
/// Returns true if `capacity` is sufficient to be considered ready for more
/// requests to be enqueued.
fn has_sufficient_capacity(capacity: usize, max_capacity: usize) -> bool {
// Did this fire? You have your arguments the wrong way around.
assert!(capacity <= max_capacity);
let want_at_least = (max_capacity + 1) / 2;
trace!(
available = capacity,
max = max_capacity,
want_at_least,
"evaluating queue backlog"
);
capacity >= want_at_least
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use test_helpers::timeout::FutureTimeout;
use super::*;
const POLL_INTERVAL: Duration = Duration::from_millis(5);
#[test]
fn test_has_sufficient_capacity() {
// A queue of minimal depth (1).
//
// Validates there are no off-by-one errors.
assert!(!has_sufficient_capacity(0, 1));
assert!(has_sufficient_capacity(1, 1));
// Even queues
assert!(!has_sufficient_capacity(0, 2));
assert!(has_sufficient_capacity(1, 2));
assert!(has_sufficient_capacity(2, 2));
// Odd queues
assert!(!has_sufficient_capacity(0, 3));
assert!(!has_sufficient_capacity(1, 3));
assert!(has_sufficient_capacity(2, 3));
assert!(has_sufficient_capacity(3, 3));
}
/// Validate the state setters and getters are correct, and that only the
/// first thread that changes the state observes the "first=true" response.
#[test]
fn test_state_transitions() {
let s = PersistState::default();
assert_eq!(s.get(), CurrentState::Ok);
assert!(!s.is_saturated());
assert!(!s.set(CurrentState::Ok)); // Already OK
assert_eq!(s.get(), CurrentState::Ok);
assert!(!s.is_saturated());
assert!(!s.set(CurrentState::Ok)); // Already OK
assert_eq!(s.get(), CurrentState::Ok);
assert!(!s.is_saturated());
assert!(s.set(CurrentState::Saturated)); // First to change
assert!(s.is_saturated());
assert!(!s.set(CurrentState::Saturated)); // Not first
assert!(s.is_saturated());
assert_eq!(s.get(), CurrentState::Saturated);
assert!(s.is_saturated());
assert!(!s.set(CurrentState::Saturated)); // Not first
assert_eq!(s.get(), CurrentState::Saturated);
assert!(s.is_saturated());
assert!(s.set(CurrentState::Ok)); // First to change
assert_eq!(s.get(), CurrentState::Ok);
assert!(!s.is_saturated());
}
/// Ensure that the saturation evaluation checks for outstanding enqueue
/// waiters (as tracked by the [`WaitGuard`]).
#[tokio::test]
async fn test_saturation_recovery_enqueue_waiters() {
let s = Arc::new(PersistState::default());
// Use no queues to ensure only the waiters are blocking recovery.
assert!(!s.is_saturated());
let w1 = PersistState::set_saturated::<()>(Arc::clone(&s), vec![]);
let w2 = PersistState::set_saturated::<()>(Arc::clone(&s), vec![]);
assert!(s.is_saturated());
// Kill the actual recovery task (there must be one running at this
// point).
s.recovery_handle.lock().take().unwrap().abort();
// Spawn a replacement that ticks way more often to speed up the test.
let h = tokio::spawn(saturation_monitor_task::<()>(
tokio::time::interval(POLL_INTERVAL),
Arc::clone(&s),
vec![],
));
// Drop a waiter and ensure the system is still saturated.
drop(w1);
assert!(s.is_saturated());
// Sleep a little to ensure it remains saturated with 1 outstanding
// waiter.
//
// This is false-negative racy - if this assert fires, there is a
// legitimate problem - one outstanding waiter should prevent the system
// from ever transitioning to a healthy state.
tokio::time::sleep(POLL_INTERVAL * 4).await;
assert!(s.is_saturated());
// Drop the other waiter.
drop(w2);
// Wait up to 5 seconds to observe the system recovery.
async {
loop {
if !s.is_saturated() {
return;
}
tokio::time::sleep(POLL_INTERVAL).await;
}
}
.with_timeout_panic(Duration::from_secs(5))
.await;
// Wait up to 60 seconds to observe the recovery task finish.
//
// The recovery task sets the system state as healthy, and THEN exits,
// so there exists a window of time where the system has passed the
// saturation check above, but the recovery task MAY still be running.
//
// By waiting an excessive duration of time, we ensure the task does
// indeed finish.
async {
loop {
if h.is_finished() {
return;
}
tokio::time::sleep(POLL_INTERVAL).await;
}
}
.with_timeout_panic(Duration::from_secs(60))
.await;
// No task panic occurred.
assert!(h.with_timeout_panic(Duration::from_secs(5)).await.is_ok());
assert!(!s.is_saturated());
}
/// Ensure that the saturation evaluation checks for free queue slots before
/// marking the system as healthy.
#[tokio::test]
async fn test_saturation_recovery_queue_capacity() {
let s = Arc::new(PersistState::default());
async fn fill(q: &mpsc::Sender<()>, times: usize) {
for _ in 0..times {
q.send(()).await.unwrap();
}
}
// Use no waiters to ensure only the queue slots are blocking recovery.
let (tx1, mut rx1) = mpsc::channel(5);
let (tx2, mut rx2) = mpsc::channel(5);
// Place some items in the queues
fill(&tx1, 3).await; // Over the threshold of 5/2 = 2.5, rounded down to 2.
fill(&tx2, 3).await; // Over the threshold of 5/2 = 2.5, rounded down to 2.
assert!(!s.is_saturated());
assert!(s.set(CurrentState::Saturated));
assert!(s.is_saturated());
// Spawn the recovery task directly, not via set_saturated() for
// simplicity - the test above asserts the task is started by a call to
// set_saturated().
let h = tokio::spawn(saturation_monitor_task::<()>(
tokio::time::interval(POLL_INTERVAL),
Arc::clone(&s),
vec![tx1, tx2],
));
// Wait a little and ensure the state hasn't changed.
//
// While this could be a false negative, if this assert fires there is a
// legitimate problem.
tokio::time::sleep(POLL_INTERVAL * 4).await;
assert!(s.is_saturated());
// Drain one of the queues to below the saturation point.
rx1.recv().await.expect("no recovery task running");
// Wait a little and ensure the state still hasn't changed.
//
// While this could also be a false negative, if this assert fires there
// is a legitimate problem.
tokio::time::sleep(POLL_INTERVAL * 4).await;
assert!(s.is_saturated());
// Drain the remaining queue below the threshold for recovery.
rx2.recv().await.expect("no recovery task running");
// Wait up to 5 seconds to observe the system recovery.
async {
loop {
if !s.is_saturated() {
return;
}
tokio::time::sleep(POLL_INTERVAL).await;
}
}
.with_timeout_panic(Duration::from_secs(5))
.await;
// Wait up to 60 seconds to observe the recovery task finish.
//
// The recovery task sets the system state as healthy, and THEN exits,
// so there exists a window of time where the system has passed the
// saturation check above, but the recovery task MAY still be running.
//
// By waiting an excessive duration of time, we ensure the task does
// indeed finish.
async {
loop {
if h.is_finished() {
return;
}
tokio::time::sleep(POLL_INTERVAL).await;
}
}
.with_timeout_panic(Duration::from_secs(60))
.await;
// No task panic occurred.
assert!(h.with_timeout_panic(Duration::from_secs(5)).await.is_ok());
assert!(!s.is_saturated());
}
}

View File

@ -2,22 +2,21 @@ use std::sync::Arc;
use iox_catalog::interface::Catalog;
use iox_query::exec::Executor;
use observability_deps::tracing::{debug, info};
use observability_deps::tracing::*;
use parking_lot::Mutex;
use parquet_file::storage::ParquetStorage;
use sharder::JumpHash;
use thiserror::Error;
use tokio::sync::{mpsc, oneshot};
use tokio::sync::{
mpsc::{self, error::TrySendError},
oneshot,
};
use crate::buffer_tree::partition::{persisting::PersistingData, PartitionData};
use super::context::{Context, PersistRequest};
#[derive(Debug, Error)]
pub(crate) enum PersistError {
#[error("persist queue is full")]
QueueFull,
}
use super::{
backpressure::PersistState,
context::{Context, PersistRequest},
};
/// A persistence task submission handle.
///
@ -90,6 +89,28 @@ pub(crate) enum PersistError {
/// always placed in the same worker queue, ensuring they execute sequentially.
///
/// [`SortKey`]: schema::sort::SortKey
///
/// # Overload & Back-pressure
///
/// The persist queue is bounded, but the caller must prevent new persist jobs
/// from being generated and blocked whilst waiting to add the persist job to
/// the bounded queue, otherwise the system is effectively unbounded. If an
/// unbounded number of threads block on [`PersistHandle::queue_persist()`]
/// waiting to successfully enqueue the job, then there is no bound on
/// outstanding persist jobs at all.
///
/// To prevent this, the persistence system exposes an indicator of saturation
/// (readable via the [`PersistState`]) that the caller MUST use to prevent the
/// generation of new persist tasks (for example, by blocking any further
/// ingest) on a best-effort basis.
///
/// When the persist queue is saturated, the [`PersistState::is_saturated()`]
/// returns true. Once the backlog of persist jobs is reduced, the
/// [`PersistState`] is switched back to a healthy state and new persist jobs
/// may be generated as normal.
///
/// For details of the exact saturation detection & recovery logic, see
/// [`PersistState`].
#[derive(Debug, Clone)]
pub(crate) struct PersistHandle {
/// THe state/dependencies shared across all worker tasks.
@ -105,6 +126,9 @@ pub(crate) struct PersistHandle {
/// Task handles for the worker tasks, aborted on drop of all
/// [`PersistHandle`] instances.
tasks: Arc<Vec<AbortOnDrop<()>>>,
/// Records the saturation state of the persist system.
persist_state: Arc<PersistState>,
}
impl PersistHandle {
@ -115,7 +139,7 @@ impl PersistHandle {
exec: Arc<Executor>,
store: ParquetStorage,
catalog: Arc<dyn Catalog>,
) -> Self {
) -> (Self, Arc<PersistState>) {
assert_ne!(n_workers, 0, "must run at least 1 persist worker");
assert_ne!(worker_queue_depth, 0, "worker queue depth must be non-zero");
@ -143,11 +167,18 @@ impl PersistHandle {
assert!(!tasks.is_empty());
Self {
inner,
persist_queues: Arc::new(JumpHash::new(tx_handles)),
tasks: Arc::new(tasks),
}
// Initialise the saturation state as "not saturated".
let persist_state = Default::default();
(
Self {
inner,
persist_queues: Arc::new(JumpHash::new(tx_handles)),
tasks: Arc::new(tasks),
persist_state: Arc::clone(&persist_state),
},
persist_state,
)
}
/// Place `data` from `partition` into the persistence queue.
@ -159,12 +190,12 @@ impl PersistHandle {
/// key will be updated, and [`PartitionData::mark_persisted()`] is called
/// with `data`.
///
/// Once all persistence related tasks are complete, the returned channel
/// publishes a notification.
/// Once all persistence related tasks for `data` are complete, the returned
/// channel publishes a notification.
///
/// # Panics
///
/// Panics if one or more worker threads have stopped.
/// Panics if the assigned persist worker task has stopped.
///
/// Panics (asynchronously) if the [`PartitionData`]'s sort key is updated
/// between persistence starting and ending.
@ -180,14 +211,40 @@ impl PersistHandle {
"enqueuing persistence task"
);
// Build the persist task request
// Build the persist task request.
let (r, notify) = PersistRequest::new(partition, data);
self.persist_queues
.hash(r.partition_id())
.send(r)
.await
.expect("persist worker has stopped");
// Select a worker to dispatch this request to.
let queue = self.persist_queues.hash(r.partition_id());
// Try and enqueue the persist task immediately.
match queue.try_send(r) {
Ok(()) => {} // Success!
Err(TrySendError::Closed(_)) => panic!("persist worker has stopped"),
Err(TrySendError::Full(r)) => {
// The worker's queue is full. Mark the persist system as being
// saturated, requiring some time to clear outstanding persist
// operations.
//
// The returned guard MUST be held during the send() await
// below.
let _guard = PersistState::set_saturated(
Arc::clone(&self.persist_state),
self.persist_queues.shards().to_owned(),
);
// TODO(test): the guard is held over the await point below
// Park this task waiting to enqueue the persist whilst holding
// the guard above.
//
// If this send() is aborted, the guard is dropped and the
// number of waiters is decremented. If the send() is
// successful, the guard is dropped immediately when leaving
// this scope.
queue.send(r).await.expect("persist worker stopped");
}
};
notify
}

View File

@ -1,3 +1,4 @@
pub(crate) mod backpressure;
pub(super) mod compact;
mod context;
pub(crate) mod handle;

View File

@ -16,6 +16,7 @@ use service_grpc_catalog::CatalogService;
use crate::{
dml_sink::DmlSink,
init::IngesterRpcInterface,
persist::backpressure::PersistState,
query::{response::QueryResponse, QueryExec},
timestamp_oracle::TimestampOracle,
};
@ -32,6 +33,7 @@ pub(crate) struct GrpcDelegate<D, Q> {
dml_sink: Arc<D>,
query_exec: Arc<Q>,
timestamp: Arc<TimestampOracle>,
persist_state: Arc<PersistState>,
catalog: Arc<dyn Catalog>,
metrics: Arc<metric::Registry>,
}
@ -46,6 +48,7 @@ where
dml_sink: Arc<D>,
query_exec: Arc<Q>,
timestamp: Arc<TimestampOracle>,
persist_state: Arc<PersistState>,
catalog: Arc<dyn Catalog>,
metrics: Arc<metric::Registry>,
) -> Self {
@ -53,6 +56,7 @@ where
dml_sink,
query_exec,
timestamp,
persist_state,
catalog,
metrics,
}
@ -84,6 +88,7 @@ where
WriteServiceServer::new(RpcWrite::new(
Arc::clone(&self.dml_sink),
Arc::clone(&self.timestamp),
Arc::clone(&self.persist_state),
))
}

View File

@ -9,10 +9,11 @@ use mutable_batch::writer;
use mutable_batch_pb::decode::decode_database_batch;
use observability_deps::tracing::*;
use thiserror::Error;
use tonic::{Request, Response};
use tonic::{Code, Request, Response};
use crate::{
dml_sink::{DmlError, DmlSink},
persist::backpressure::PersistState,
timestamp_oracle::TimestampOracle,
TRANSITION_SHARD_INDEX,
};
@ -36,15 +37,25 @@ enum RpcError {
/// The serialised write payload could not be read.
#[error(transparent)]
Decode(mutable_batch_pb::decode::Error),
/// The ingester's [`PersistState`] is marked as
/// [`CurrentState::Saturated`]. See [`PersistHandle`] for documentation.
///
/// [`PersistHandle`]: crate::persist::handle::PersistHandle
/// [`CurrentState::Saturated`]:
/// crate::persist::backpressure::CurrentState::Saturated
#[error("ingester overloaded")]
PersistSaturated,
}
impl From<RpcError> for tonic::Status {
fn from(e: RpcError) -> Self {
match e {
RpcError::Decode(_) | RpcError::NoPayload | RpcError::NoTables => {
Self::invalid_argument(e.to_string())
}
}
let code = match e {
RpcError::Decode(_) | RpcError::NoPayload | RpcError::NoTables => Code::InvalidArgument,
RpcError::PersistSaturated => Code::ResourceExhausted,
};
Self::new(code, e.to_string())
}
}
@ -94,14 +105,22 @@ fn map_write_error(e: mutable_batch::Error) -> tonic::Status {
pub(crate) struct RpcWrite<T> {
sink: T,
timestamp: Arc<TimestampOracle>,
persist_state: Arc<PersistState>,
}
impl<T> RpcWrite<T> {
/// Instantiate a new [`RpcWrite`] that pushes [`DmlOperation`] instances
/// into `sink`.
#[allow(dead_code)]
pub(crate) fn new(sink: T, timestamp: Arc<TimestampOracle>) -> Self {
Self { sink, timestamp }
pub(crate) fn new(
sink: T,
timestamp: Arc<TimestampOracle>,
persist_state: Arc<PersistState>,
) -> Self {
Self {
sink,
timestamp,
persist_state,
}
}
}
@ -115,6 +134,27 @@ where
&self,
request: Request<proto::WriteRequest>,
) -> Result<Response<proto::WriteResponse>, tonic::Status> {
// Drop writes if the persistence is saturated.
//
// This gives the ingester a chance to reduce the backlog of persistence
// tasks, which in turn reduces the memory usage of the ingester. If
// ingest was to continue unabated, an OOM would be inevitable.
//
// If you're seeing these error responses in RPC requests, you need to
// either:
//
// * Increase the persist queue depth if there is a decent headroom of
// unused RAM allocated to the ingester.
// * Increase the RAM allocation, and increase the persist queue
// depth proportionally.
// * Deploy more ingesters to reduce the request load on any single
// ingester.
//
if self.persist_state.is_saturated() {
return Err(RpcError::PersistSaturated)?;
}
// Extract the remote address for debugging.
let remote_addr = request
.remote_addr()
.map(|v| v.to_string())
@ -188,7 +228,7 @@ mod tests {
};
use super::*;
use crate::dml_sink::mock_sink::MockDmlSink;
use crate::{dml_sink::mock_sink::MockDmlSink, persist::backpressure::CurrentState};
const NAMESPACE_ID: NamespaceId = NamespaceId::new(42);
const PARTITION_KEY: &str = "bananas";
@ -208,7 +248,7 @@ mod tests {
MockDmlSink::default().with_apply_return(vec![$sink_ret]),
);
let timestamp = Arc::new(TimestampOracle::new(0));
let handler = RpcWrite::new(Arc::clone(&mock), timestamp);
let handler = RpcWrite::new(Arc::clone(&mock), timestamp, Default::default());
let ret = handler
.write(Request::new($request))
@ -319,7 +359,7 @@ mod tests {
async fn test_rpc_write_ordered_timestamps() {
let mock = Arc::new(MockDmlSink::default().with_apply_return(vec![Ok(()), Ok(())]));
let timestamp = Arc::new(TimestampOracle::new(0));
let handler = RpcWrite::new(Arc::clone(&mock), timestamp);
let handler = RpcWrite::new(Arc::clone(&mock), timestamp, Default::default());
let req = proto::WriteRequest {
payload: Some(DatabaseBatch {
@ -366,4 +406,58 @@ mod tests {
}
);
}
/// Validate that the persist system being marked as saturated prevents the
/// ingester from accepting new writes.
#[tokio::test]
async fn test_rpc_write_persist_saturation() {
let mock = Arc::new(MockDmlSink::default().with_apply_return(vec![Ok(()), Ok(())]));
let timestamp = Arc::new(TimestampOracle::new(0));
let persist_state = Default::default();
let handler = RpcWrite::new(Arc::clone(&mock), timestamp, Arc::clone(&persist_state));
let req = proto::WriteRequest {
payload: Some(DatabaseBatch {
database_id: NAMESPACE_ID.get(),
partition_key: PARTITION_KEY.to_string(),
table_batches: vec![TableBatch {
table_id: 42,
columns: vec![Column {
column_name: "time".to_string(),
semantic_type: SemanticType::Time.into(),
values: Some(Values {
i64_values: vec![4242],
f64_values: vec![],
u64_values: vec![],
string_values: vec![],
bool_values: vec![],
bytes_values: vec![],
packed_string_values: None,
interned_string_values: None,
}),
null_mask: vec![0],
}],
row_count: 1,
}],
}),
};
handler
.write(Request::new(req.clone()))
.await
.expect("write should succeed");
persist_state.test_set_state(CurrentState::Saturated);
let err = handler
.write(Request::new(req))
.await
.expect_err("write should fail");
// Validate the error code returned to the user.
assert_eq!(err.code(), Code::ResourceExhausted);
// One write should have been passed through to the DML sinks.
assert_matches!(*mock.get_calls(), [DmlOperation::Write(_)]);
}
}

View File

@ -386,7 +386,7 @@ pub fn compute_sort_key<'a>(
builder = builder.with_col(TIME_COLUMN_NAME);
let sort_key = builder.build();
debug!(?primary_key, ?sort_key, "Computed sort key");
debug!(?primary_key, ?sort_key, "computed sort key");
sort_key
}
@ -519,7 +519,7 @@ pub fn adjust_sort_key_columns(
input_catalog_sort_key=?catalog_sort_key,
output_chunk_sort_key=?metadata_sort_key,
output_catalog_sort_key=?catalog_update,
"Adjusted sort key");
"adjusted sort key");
(metadata_sort_key, catalog_update)
}