diff --git a/ingester2/src/init.rs b/ingester2/src/init.rs index 23b3d443bd..9bb01621af 100644 --- a/ingester2/src/init.rs +++ b/ingester2/src/init.rs @@ -241,7 +241,7 @@ pub async fn new( // Spawn the persist workers to compact partition data, convert it into // Parquet files, and upload them to object storage. - let persist_handle = PersistHandle::new( + let (persist_handle, persist_state) = PersistHandle::new( persist_workers, persist_worker_queue_depth, persist_executor, @@ -273,7 +273,14 @@ pub async fn new( )); Ok(IngesterGuard { - rpc: GrpcDelegate::new(Arc::new(write_path), buffer, timestamp, catalog, metrics), + rpc: GrpcDelegate::new( + Arc::new(write_path), + buffer, + timestamp, + persist_state, + catalog, + metrics, + ), rotation_task: handle, }) } diff --git a/ingester2/src/init/wal_replay.rs b/ingester2/src/init/wal_replay.rs index e826ac2280..a049ca306f 100644 --- a/ingester2/src/init/wal_replay.rs +++ b/ingester2/src/init/wal_replay.rs @@ -95,7 +95,7 @@ where "dropping empty wal segment", ); - // TODO(dom:test): empty WAL replay + // TODO(test): empty WAL replay // A failure to delete an empty file should not prevent WAL // replay from continuing. diff --git a/ingester2/src/persist/backpressure.rs b/ingester2/src/persist/backpressure.rs new file mode 100644 index 0000000000..a996d9e905 --- /dev/null +++ b/ingester2/src/persist/backpressure.rs @@ -0,0 +1,521 @@ +use std::{ + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, + time::Duration, +}; + +use crossbeam_utils::CachePadded; +use observability_deps::tracing::*; +use parking_lot::Mutex; +use tokio::{ + sync::mpsc, + task::JoinHandle, + time::{Interval, MissedTickBehavior}, +}; + +/// The interval of time between evaluations of the state of the persist system +/// when [`CurrentState::Saturated`]. +const EVALUATE_SATURATION_INTERVAL: Duration = Duration::from_secs(1); + +/// A state of the persist system. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) enum CurrentState { + /// The system is operating normally. + Ok, + /// The persist system is overloaded. + Saturated, +} + +/// A handle to read (and set, within the persist module) the state of the +/// persist system. +/// +/// Clone operations are cheap, and state read operations are very cheap. +/// +/// # Saturation Recovery +/// +/// Once the persist system is marked as [`CurrentState::Saturated`], it remains +/// in that state until the following conditions are satisfied: +/// +/// * There are no outstanding enqueue operations (no thread is blocked adding +/// an item to any work queue). +/// +/// * All queues have at least half of their capacity free (being at most, +/// half full). +/// +/// These conditions are evaluated periodically, at the interval specified in +/// [`EVALUATE_SATURATION_INTERVAL`]. +#[derive(Debug)] +pub(crate) struct PersistState { + /// The actual state value. + /// + /// The value of this variable is set to the [`CurrentState`] discriminant + /// for the respective state that was [`PersistState::set()`] in it. + /// + /// This is cache padded due to the high read volume, preventing any + /// unfortunate false-sharing of cache lines from impacting the hot-path + /// reads. + state: CachePadded, + + /// Tracks the number of async tasks waiting within + /// [`PersistHandle::queue_persist()`], asynchronously blocking to enqueue a + /// persist job. + /// + /// This is modified using [`Ordering::SeqCst`] as performance is not a + /// priority for code paths that modify it. + /// + /// [`PersistHandle::queue_persist()`]: + /// super::handle::PersistHandle::queue_persist() + waiting_to_enqueue: Arc, + + /// The handle to the current saturation evaluation/recovery task, if any. + recovery_handle: Mutex>>, +} + +/// Initialise a [`PersistState`] with [`CurrentState::Ok`]. +impl Default for PersistState { + fn default() -> Self { + let s = Self { + state: Default::default(), + waiting_to_enqueue: Arc::new(AtomicUsize::new(0)), + recovery_handle: Default::default(), + }; + s.set(CurrentState::Ok); + s + } +} + +impl PersistState { + /// Set the reported state of the [`PersistState`]. + fn set(&self, s: CurrentState) -> bool { + // Set the new state, retaining the most recent state. + // + // SeqCst is absolute overkill, but is used here due to the strong + // ordering guarantees providing minimal risk of bugs. The low volume of + // writes to this variable means the overhead is more than acceptable. + let last = self.state.swap(s as usize, Ordering::SeqCst); + + // If "s" does not match the old state, this is the first thread to + // switch the state from "last", to "s", since setting it to "last". + // + // Subsequent calls setting the state to "s" will return false, until a + // different state is set. + s as usize != last + } + + /// Get the current reported state of the [`PersistState`]. + /// + /// Reading this value is extremely cheap and can be done without + /// performance concern. + /// + /// This value is eventually consistent, with a presumption of + pub(crate) fn get(&self) -> CurrentState { + // Correctness: relaxed as reading the current state is allowed to be + // racy for performance reasons; this call should be as cheap as + // possible due to it being squarely in the hot path. + // + // Any value change will "eventually" be made visible to all threads, at + // which point this read converges to the latest value. A potential + // extra write or two arriving before this value is visible to all + // threads is acceptable in the "saturated" cold path, prioritising + // latency of the hot path. + match self.state.load(Ordering::Relaxed) { + v if v == CurrentState::Ok as usize => CurrentState::Ok, + v if v == CurrentState::Saturated as usize => CurrentState::Saturated, + _ => unreachable!(), + } + } + + /// A convenience method that returns true if `self` is + /// [`CurrentState::Saturated`]. + pub(crate) fn is_saturated(&self) -> bool { + self.get() == CurrentState::Saturated + } + + /// Mark the persist system as saturated, returning a [`WaitGuard`] that + /// MUST be held during any subsequent async-blocking enqueue request + /// ([`mpsc::Sender::send()`] and the like). + /// + /// Holding the guard over the `send()` await allows the saturation + /// evaluation to track the number of threads with an ongoing enqueue wait. + pub(super) fn set_saturated(s: Arc, persist_queues: Vec>) -> WaitGuard + where + T: Send + 'static, + { + // Increment the number of tasks waiting to push into a queue. + // + // INVARIANT: this increment MUST happen-before returning the guard, and + // waiting on the queue send(), and before starting the saturation + // monitor task so that it observes this waiter. + let _ = s.waiting_to_enqueue.fetch_add(1, Ordering::SeqCst); + + // Attempt to set the system to "saturated". + let first = s.set(CurrentState::Saturated); + if first { + // This is the first thread to mark the system as saturated. + warn!("persist queues saturated, blocking ingest"); + + // Always check the state of the system EVALUATE_SATURATION_INTERVAL + // duration of time after the last completed evaluation - do not + // attempt to check continuously should the check fall behind the + // ticker. + let mut interval = tokio::time::interval(EVALUATE_SATURATION_INTERVAL); + interval.set_missed_tick_behavior(MissedTickBehavior::Delay); + + // Spawn a task that marks the system as not saturated after the queues + // have processed some of the backlog. + let h = tokio::spawn(saturation_monitor_task( + interval, + Arc::clone(&s), + persist_queues, + )); + // Retain the task handle to avoid leaking it if dropped. + *s.recovery_handle.lock() = Some(h); + } + + WaitGuard(Arc::clone(&s.waiting_to_enqueue)) + } + + /// A test-only helper that sets the state of `self` only. It does not spawn + /// a recovery task. + #[cfg(test)] + pub(crate) fn test_set_state(&self, s: CurrentState) { + self.set(s); + } +} + +impl Drop for PersistState { + fn drop(&mut self) { + if let Some(h) = self.recovery_handle.lock().as_ref() { + h.abort(); + } + } +} + +/// A guard that decrements the number of writers waiting to enqueue an item +/// into the persistence queue when dropped. +/// +/// This MUST be held whilst calling [`mpsc::Sender::send()`]. +#[must_use = "must hold wait guard while waiting for enqueue"] +pub(super) struct WaitGuard(Arc); + +impl Drop for WaitGuard { + fn drop(&mut self) { + let _ = self.0.fetch_sub(1, Ordering::SeqCst); + } +} + +/// A task that monitors the `waiters` and `queues` to determine when the +/// persist system is no longer saturated. +/// +/// Once the system is no longer saturated (as determined according to the +/// documentation for [`PersistState`]), the [`PersistState`] is set to +/// [`CurrentState::Ok`]. +async fn saturation_monitor_task( + mut interval: Interval, + state: Arc, + queues: Vec>, +) where + T: Send, +{ + loop { + // Wait before evaluating the state of the system. + interval.tick().await; + + // INVARIANT: this task only ever runs when the system is saturated. + assert!(state.is_saturated()); + + // First check if any tasks are waiting to enqueue an item (an + // indication that one or more queues is full). + let n_waiting = state.waiting_to_enqueue.load(Ordering::SeqCst); + if n_waiting > 0 { + debug!( + n_waiting, + "waiting for outstanding persist jobs to be enqueued" + ); + continue; + } + + // No async task WAS currently waiting to enqueue a persist job when + // checking above, but one may want to immediately enqueue one now (or + // later). + // + // In order to minimise health flip-flopping, only mark the persist + // system as healthy once there is some capacity in the queues to accept + // new persist jobs. This avoids a queue having 1 slot free, only to be + // immediately filled and the system pause again. + // + // This check below ensures that all queues are at least half empty + // before marking the system as recovered. + let n_queues = queues + .iter() + .filter(|q| !has_sufficient_capacity(q.capacity(), q.max_capacity())) + .count(); + if n_queues != 0 { + debug!(n_queues, "waiting for queues to drain"); + continue; + } + + // There are no outstanding enqueue waiters, and all queues are at half + // capacity or better. + info!("persist queue saturation reduced, resuming ingest"); + + // INVARIANT: there is only ever one task that monitors the queue state + // and transitions the persist state to OK, therefore this task is + // always the first to set the state to OK. + assert!(state.set(CurrentState::Ok)); + + // The task MUST immediately stop so any subsequent saturation is + // handled by the newly spawned task, upholding the above invariant. + return; + } +} + +/// Returns true if `capacity` is sufficient to be considered ready for more +/// requests to be enqueued. +fn has_sufficient_capacity(capacity: usize, max_capacity: usize) -> bool { + // Did this fire? You have your arguments the wrong way around. + assert!(capacity <= max_capacity); + + let want_at_least = (max_capacity + 1) / 2; + trace!( + available = capacity, + max = max_capacity, + want_at_least, + "evaluating queue backlog" + ); + + capacity >= want_at_least +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use test_helpers::timeout::FutureTimeout; + + use super::*; + + const POLL_INTERVAL: Duration = Duration::from_millis(5); + + #[test] + fn test_has_sufficient_capacity() { + // A queue of minimal depth (1). + // + // Validates there are no off-by-one errors. + assert!(!has_sufficient_capacity(0, 1)); + assert!(has_sufficient_capacity(1, 1)); + + // Even queues + assert!(!has_sufficient_capacity(0, 2)); + assert!(has_sufficient_capacity(1, 2)); + assert!(has_sufficient_capacity(2, 2)); + + // Odd queues + assert!(!has_sufficient_capacity(0, 3)); + assert!(!has_sufficient_capacity(1, 3)); + assert!(has_sufficient_capacity(2, 3)); + assert!(has_sufficient_capacity(3, 3)); + } + + /// Validate the state setters and getters are correct, and that only the + /// first thread that changes the state observes the "first=true" response. + #[test] + fn test_state_transitions() { + let s = PersistState::default(); + assert_eq!(s.get(), CurrentState::Ok); + assert!(!s.is_saturated()); + + assert!(!s.set(CurrentState::Ok)); // Already OK + assert_eq!(s.get(), CurrentState::Ok); + assert!(!s.is_saturated()); + + assert!(!s.set(CurrentState::Ok)); // Already OK + assert_eq!(s.get(), CurrentState::Ok); + assert!(!s.is_saturated()); + + assert!(s.set(CurrentState::Saturated)); // First to change + assert!(s.is_saturated()); + assert!(!s.set(CurrentState::Saturated)); // Not first + assert!(s.is_saturated()); + assert_eq!(s.get(), CurrentState::Saturated); + assert!(s.is_saturated()); + + assert!(!s.set(CurrentState::Saturated)); // Not first + assert_eq!(s.get(), CurrentState::Saturated); + assert!(s.is_saturated()); + + assert!(s.set(CurrentState::Ok)); // First to change + assert_eq!(s.get(), CurrentState::Ok); + assert!(!s.is_saturated()); + } + + /// Ensure that the saturation evaluation checks for outstanding enqueue + /// waiters (as tracked by the [`WaitGuard`]). + #[tokio::test] + async fn test_saturation_recovery_enqueue_waiters() { + let s = Arc::new(PersistState::default()); + + // Use no queues to ensure only the waiters are blocking recovery. + + assert!(!s.is_saturated()); + + let w1 = PersistState::set_saturated::<()>(Arc::clone(&s), vec![]); + let w2 = PersistState::set_saturated::<()>(Arc::clone(&s), vec![]); + + assert!(s.is_saturated()); + + // Kill the actual recovery task (there must be one running at this + // point). + s.recovery_handle.lock().take().unwrap().abort(); + + // Spawn a replacement that ticks way more often to speed up the test. + let h = tokio::spawn(saturation_monitor_task::<()>( + tokio::time::interval(POLL_INTERVAL), + Arc::clone(&s), + vec![], + )); + + // Drop a waiter and ensure the system is still saturated. + drop(w1); + assert!(s.is_saturated()); + + // Sleep a little to ensure it remains saturated with 1 outstanding + // waiter. + // + // This is false-negative racy - if this assert fires, there is a + // legitimate problem - one outstanding waiter should prevent the system + // from ever transitioning to a healthy state. + tokio::time::sleep(POLL_INTERVAL * 4).await; + assert!(s.is_saturated()); + + // Drop the other waiter. + drop(w2); + + // Wait up to 5 seconds to observe the system recovery. + async { + loop { + if !s.is_saturated() { + return; + } + tokio::time::sleep(POLL_INTERVAL).await; + } + } + .with_timeout_panic(Duration::from_secs(5)) + .await; + + // Wait up to 60 seconds to observe the recovery task finish. + // + // The recovery task sets the system state as healthy, and THEN exits, + // so there exists a window of time where the system has passed the + // saturation check above, but the recovery task MAY still be running. + // + // By waiting an excessive duration of time, we ensure the task does + // indeed finish. + async { + loop { + if h.is_finished() { + return; + } + tokio::time::sleep(POLL_INTERVAL).await; + } + } + .with_timeout_panic(Duration::from_secs(60)) + .await; + + // No task panic occurred. + assert!(h.with_timeout_panic(Duration::from_secs(5)).await.is_ok()); + assert!(!s.is_saturated()); + } + + /// Ensure that the saturation evaluation checks for free queue slots before + /// marking the system as healthy. + #[tokio::test] + async fn test_saturation_recovery_queue_capacity() { + let s = Arc::new(PersistState::default()); + + async fn fill(q: &mpsc::Sender<()>, times: usize) { + for _ in 0..times { + q.send(()).await.unwrap(); + } + } + + // Use no waiters to ensure only the queue slots are blocking recovery. + + let (tx1, mut rx1) = mpsc::channel(5); + let (tx2, mut rx2) = mpsc::channel(5); + + // Place some items in the queues + fill(&tx1, 3).await; // Over the threshold of 5/2 = 2.5, rounded down to 2. + fill(&tx2, 3).await; // Over the threshold of 5/2 = 2.5, rounded down to 2. + + assert!(!s.is_saturated()); + assert!(s.set(CurrentState::Saturated)); + assert!(s.is_saturated()); + + // Spawn the recovery task directly, not via set_saturated() for + // simplicity - the test above asserts the task is started by a call to + // set_saturated(). + let h = tokio::spawn(saturation_monitor_task::<()>( + tokio::time::interval(POLL_INTERVAL), + Arc::clone(&s), + vec![tx1, tx2], + )); + + // Wait a little and ensure the state hasn't changed. + // + // While this could be a false negative, if this assert fires there is a + // legitimate problem. + tokio::time::sleep(POLL_INTERVAL * 4).await; + assert!(s.is_saturated()); + + // Drain one of the queues to below the saturation point. + rx1.recv().await.expect("no recovery task running"); + + // Wait a little and ensure the state still hasn't changed. + // + // While this could also be a false negative, if this assert fires there + // is a legitimate problem. + tokio::time::sleep(POLL_INTERVAL * 4).await; + assert!(s.is_saturated()); + + // Drain the remaining queue below the threshold for recovery. + rx2.recv().await.expect("no recovery task running"); + + // Wait up to 5 seconds to observe the system recovery. + async { + loop { + if !s.is_saturated() { + return; + } + tokio::time::sleep(POLL_INTERVAL).await; + } + } + .with_timeout_panic(Duration::from_secs(5)) + .await; + + // Wait up to 60 seconds to observe the recovery task finish. + // + // The recovery task sets the system state as healthy, and THEN exits, + // so there exists a window of time where the system has passed the + // saturation check above, but the recovery task MAY still be running. + // + // By waiting an excessive duration of time, we ensure the task does + // indeed finish. + async { + loop { + if h.is_finished() { + return; + } + tokio::time::sleep(POLL_INTERVAL).await; + } + } + .with_timeout_panic(Duration::from_secs(60)) + .await; + + // No task panic occurred. + assert!(h.with_timeout_panic(Duration::from_secs(5)).await.is_ok()); + assert!(!s.is_saturated()); + } +} diff --git a/ingester2/src/persist/handle.rs b/ingester2/src/persist/handle.rs index eb993e0dac..bc49f0caf2 100644 --- a/ingester2/src/persist/handle.rs +++ b/ingester2/src/persist/handle.rs @@ -2,22 +2,21 @@ use std::sync::Arc; use iox_catalog::interface::Catalog; use iox_query::exec::Executor; -use observability_deps::tracing::{debug, info}; +use observability_deps::tracing::*; use parking_lot::Mutex; use parquet_file::storage::ParquetStorage; use sharder::JumpHash; -use thiserror::Error; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::{ + mpsc::{self, error::TrySendError}, + oneshot, +}; use crate::buffer_tree::partition::{persisting::PersistingData, PartitionData}; -use super::context::{Context, PersistRequest}; - -#[derive(Debug, Error)] -pub(crate) enum PersistError { - #[error("persist queue is full")] - QueueFull, -} +use super::{ + backpressure::PersistState, + context::{Context, PersistRequest}, +}; /// A persistence task submission handle. /// @@ -90,6 +89,28 @@ pub(crate) enum PersistError { /// always placed in the same worker queue, ensuring they execute sequentially. /// /// [`SortKey`]: schema::sort::SortKey +/// +/// # Overload & Back-pressure +/// +/// The persist queue is bounded, but the caller must prevent new persist jobs +/// from being generated and blocked whilst waiting to add the persist job to +/// the bounded queue, otherwise the system is effectively unbounded. If an +/// unbounded number of threads block on [`PersistHandle::queue_persist()`] +/// waiting to successfully enqueue the job, then there is no bound on +/// outstanding persist jobs at all. +/// +/// To prevent this, the persistence system exposes an indicator of saturation +/// (readable via the [`PersistState`]) that the caller MUST use to prevent the +/// generation of new persist tasks (for example, by blocking any further +/// ingest) on a best-effort basis. +/// +/// When the persist queue is saturated, the [`PersistState::is_saturated()`] +/// returns true. Once the backlog of persist jobs is reduced, the +/// [`PersistState`] is switched back to a healthy state and new persist jobs +/// may be generated as normal. +/// +/// For details of the exact saturation detection & recovery logic, see +/// [`PersistState`]. #[derive(Debug, Clone)] pub(crate) struct PersistHandle { /// THe state/dependencies shared across all worker tasks. @@ -105,6 +126,9 @@ pub(crate) struct PersistHandle { /// Task handles for the worker tasks, aborted on drop of all /// [`PersistHandle`] instances. tasks: Arc>>, + + /// Records the saturation state of the persist system. + persist_state: Arc, } impl PersistHandle { @@ -115,7 +139,7 @@ impl PersistHandle { exec: Arc, store: ParquetStorage, catalog: Arc, - ) -> Self { + ) -> (Self, Arc) { assert_ne!(n_workers, 0, "must run at least 1 persist worker"); assert_ne!(worker_queue_depth, 0, "worker queue depth must be non-zero"); @@ -143,11 +167,18 @@ impl PersistHandle { assert!(!tasks.is_empty()); - Self { - inner, - persist_queues: Arc::new(JumpHash::new(tx_handles)), - tasks: Arc::new(tasks), - } + // Initialise the saturation state as "not saturated". + let persist_state = Default::default(); + + ( + Self { + inner, + persist_queues: Arc::new(JumpHash::new(tx_handles)), + tasks: Arc::new(tasks), + persist_state: Arc::clone(&persist_state), + }, + persist_state, + ) } /// Place `data` from `partition` into the persistence queue. @@ -159,12 +190,12 @@ impl PersistHandle { /// key will be updated, and [`PartitionData::mark_persisted()`] is called /// with `data`. /// - /// Once all persistence related tasks are complete, the returned channel - /// publishes a notification. + /// Once all persistence related tasks for `data` are complete, the returned + /// channel publishes a notification. /// /// # Panics /// - /// Panics if one or more worker threads have stopped. + /// Panics if the assigned persist worker task has stopped. /// /// Panics (asynchronously) if the [`PartitionData`]'s sort key is updated /// between persistence starting and ending. @@ -180,14 +211,40 @@ impl PersistHandle { "enqueuing persistence task" ); - // Build the persist task request + // Build the persist task request. let (r, notify) = PersistRequest::new(partition, data); - self.persist_queues - .hash(r.partition_id()) - .send(r) - .await - .expect("persist worker has stopped"); + // Select a worker to dispatch this request to. + let queue = self.persist_queues.hash(r.partition_id()); + + // Try and enqueue the persist task immediately. + match queue.try_send(r) { + Ok(()) => {} // Success! + Err(TrySendError::Closed(_)) => panic!("persist worker has stopped"), + Err(TrySendError::Full(r)) => { + // The worker's queue is full. Mark the persist system as being + // saturated, requiring some time to clear outstanding persist + // operations. + // + // The returned guard MUST be held during the send() await + // below. + let _guard = PersistState::set_saturated( + Arc::clone(&self.persist_state), + self.persist_queues.shards().to_owned(), + ); + + // TODO(test): the guard is held over the await point below + + // Park this task waiting to enqueue the persist whilst holding + // the guard above. + // + // If this send() is aborted, the guard is dropped and the + // number of waiters is decremented. If the send() is + // successful, the guard is dropped immediately when leaving + // this scope. + queue.send(r).await.expect("persist worker stopped"); + } + }; notify } diff --git a/ingester2/src/persist/mod.rs b/ingester2/src/persist/mod.rs index 1ae9092259..a5b0c433ae 100644 --- a/ingester2/src/persist/mod.rs +++ b/ingester2/src/persist/mod.rs @@ -1,3 +1,4 @@ +pub(crate) mod backpressure; pub(super) mod compact; mod context; pub(crate) mod handle; diff --git a/ingester2/src/server/grpc.rs b/ingester2/src/server/grpc.rs index 0f80fbfd99..96b3dc9dc0 100644 --- a/ingester2/src/server/grpc.rs +++ b/ingester2/src/server/grpc.rs @@ -16,6 +16,7 @@ use service_grpc_catalog::CatalogService; use crate::{ dml_sink::DmlSink, init::IngesterRpcInterface, + persist::backpressure::PersistState, query::{response::QueryResponse, QueryExec}, timestamp_oracle::TimestampOracle, }; @@ -32,6 +33,7 @@ pub(crate) struct GrpcDelegate { dml_sink: Arc, query_exec: Arc, timestamp: Arc, + persist_state: Arc, catalog: Arc, metrics: Arc, } @@ -46,6 +48,7 @@ where dml_sink: Arc, query_exec: Arc, timestamp: Arc, + persist_state: Arc, catalog: Arc, metrics: Arc, ) -> Self { @@ -53,6 +56,7 @@ where dml_sink, query_exec, timestamp, + persist_state, catalog, metrics, } @@ -84,6 +88,7 @@ where WriteServiceServer::new(RpcWrite::new( Arc::clone(&self.dml_sink), Arc::clone(&self.timestamp), + Arc::clone(&self.persist_state), )) } diff --git a/ingester2/src/server/grpc/rpc_write.rs b/ingester2/src/server/grpc/rpc_write.rs index fa8531dc8b..cb63ede31d 100644 --- a/ingester2/src/server/grpc/rpc_write.rs +++ b/ingester2/src/server/grpc/rpc_write.rs @@ -9,10 +9,11 @@ use mutable_batch::writer; use mutable_batch_pb::decode::decode_database_batch; use observability_deps::tracing::*; use thiserror::Error; -use tonic::{Request, Response}; +use tonic::{Code, Request, Response}; use crate::{ dml_sink::{DmlError, DmlSink}, + persist::backpressure::PersistState, timestamp_oracle::TimestampOracle, TRANSITION_SHARD_INDEX, }; @@ -36,15 +37,25 @@ enum RpcError { /// The serialised write payload could not be read. #[error(transparent)] Decode(mutable_batch_pb::decode::Error), + + /// The ingester's [`PersistState`] is marked as + /// [`CurrentState::Saturated`]. See [`PersistHandle`] for documentation. + /// + /// [`PersistHandle`]: crate::persist::handle::PersistHandle + /// [`CurrentState::Saturated`]: + /// crate::persist::backpressure::CurrentState::Saturated + #[error("ingester overloaded")] + PersistSaturated, } impl From for tonic::Status { fn from(e: RpcError) -> Self { - match e { - RpcError::Decode(_) | RpcError::NoPayload | RpcError::NoTables => { - Self::invalid_argument(e.to_string()) - } - } + let code = match e { + RpcError::Decode(_) | RpcError::NoPayload | RpcError::NoTables => Code::InvalidArgument, + RpcError::PersistSaturated => Code::ResourceExhausted, + }; + + Self::new(code, e.to_string()) } } @@ -94,14 +105,22 @@ fn map_write_error(e: mutable_batch::Error) -> tonic::Status { pub(crate) struct RpcWrite { sink: T, timestamp: Arc, + persist_state: Arc, } impl RpcWrite { /// Instantiate a new [`RpcWrite`] that pushes [`DmlOperation`] instances /// into `sink`. - #[allow(dead_code)] - pub(crate) fn new(sink: T, timestamp: Arc) -> Self { - Self { sink, timestamp } + pub(crate) fn new( + sink: T, + timestamp: Arc, + persist_state: Arc, + ) -> Self { + Self { + sink, + timestamp, + persist_state, + } } } @@ -115,6 +134,27 @@ where &self, request: Request, ) -> Result, tonic::Status> { + // Drop writes if the persistence is saturated. + // + // This gives the ingester a chance to reduce the backlog of persistence + // tasks, which in turn reduces the memory usage of the ingester. If + // ingest was to continue unabated, an OOM would be inevitable. + // + // If you're seeing these error responses in RPC requests, you need to + // either: + // + // * Increase the persist queue depth if there is a decent headroom of + // unused RAM allocated to the ingester. + // * Increase the RAM allocation, and increase the persist queue + // depth proportionally. + // * Deploy more ingesters to reduce the request load on any single + // ingester. + // + if self.persist_state.is_saturated() { + return Err(RpcError::PersistSaturated)?; + } + + // Extract the remote address for debugging. let remote_addr = request .remote_addr() .map(|v| v.to_string()) @@ -188,7 +228,7 @@ mod tests { }; use super::*; - use crate::dml_sink::mock_sink::MockDmlSink; + use crate::{dml_sink::mock_sink::MockDmlSink, persist::backpressure::CurrentState}; const NAMESPACE_ID: NamespaceId = NamespaceId::new(42); const PARTITION_KEY: &str = "bananas"; @@ -208,7 +248,7 @@ mod tests { MockDmlSink::default().with_apply_return(vec![$sink_ret]), ); let timestamp = Arc::new(TimestampOracle::new(0)); - let handler = RpcWrite::new(Arc::clone(&mock), timestamp); + let handler = RpcWrite::new(Arc::clone(&mock), timestamp, Default::default()); let ret = handler .write(Request::new($request)) @@ -319,7 +359,7 @@ mod tests { async fn test_rpc_write_ordered_timestamps() { let mock = Arc::new(MockDmlSink::default().with_apply_return(vec![Ok(()), Ok(())])); let timestamp = Arc::new(TimestampOracle::new(0)); - let handler = RpcWrite::new(Arc::clone(&mock), timestamp); + let handler = RpcWrite::new(Arc::clone(&mock), timestamp, Default::default()); let req = proto::WriteRequest { payload: Some(DatabaseBatch { @@ -366,4 +406,58 @@ mod tests { } ); } + + /// Validate that the persist system being marked as saturated prevents the + /// ingester from accepting new writes. + #[tokio::test] + async fn test_rpc_write_persist_saturation() { + let mock = Arc::new(MockDmlSink::default().with_apply_return(vec![Ok(()), Ok(())])); + let timestamp = Arc::new(TimestampOracle::new(0)); + let persist_state = Default::default(); + let handler = RpcWrite::new(Arc::clone(&mock), timestamp, Arc::clone(&persist_state)); + + let req = proto::WriteRequest { + payload: Some(DatabaseBatch { + database_id: NAMESPACE_ID.get(), + partition_key: PARTITION_KEY.to_string(), + table_batches: vec![TableBatch { + table_id: 42, + columns: vec![Column { + column_name: "time".to_string(), + semantic_type: SemanticType::Time.into(), + values: Some(Values { + i64_values: vec![4242], + f64_values: vec![], + u64_values: vec![], + string_values: vec![], + bool_values: vec![], + bytes_values: vec![], + packed_string_values: None, + interned_string_values: None, + }), + null_mask: vec![0], + }], + row_count: 1, + }], + }), + }; + + handler + .write(Request::new(req.clone())) + .await + .expect("write should succeed"); + + persist_state.test_set_state(CurrentState::Saturated); + + let err = handler + .write(Request::new(req)) + .await + .expect_err("write should fail"); + + // Validate the error code returned to the user. + assert_eq!(err.code(), Code::ResourceExhausted); + + // One write should have been passed through to the DML sinks. + assert_matches!(*mock.get_calls(), [DmlOperation::Write(_)]); + } } diff --git a/schema/src/sort.rs b/schema/src/sort.rs index 4aa12b130e..3eb1c50a27 100644 --- a/schema/src/sort.rs +++ b/schema/src/sort.rs @@ -386,7 +386,7 @@ pub fn compute_sort_key<'a>( builder = builder.with_col(TIME_COLUMN_NAME); let sort_key = builder.build(); - debug!(?primary_key, ?sort_key, "Computed sort key"); + debug!(?primary_key, ?sort_key, "computed sort key"); sort_key } @@ -519,7 +519,7 @@ pub fn adjust_sort_key_columns( input_catalog_sort_key=?catalog_sort_key, output_chunk_sort_key=?metadata_sort_key, output_catalog_sort_key=?catalog_update, - "Adjusted sort key"); + "adjusted sort key"); (metadata_sort_key, catalog_update) }