Merge pull request #6568 from influxdata/dom/shutdown-persist

feat(ingester2): persist on shutdown
pull/24376/head
Dom 2023-01-12 10:22:28 +00:00 committed by GitHub
commit 7e3bb25815
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 465 additions and 14 deletions

View File

@ -267,6 +267,14 @@ mod kafkaless_rpc_write {
// Restart the ingester and ensure it gets a new UUID
cluster.restart_ingester().await;
// Populate the ingester with some data so it returns a successful
// response containing the UUID.
let lp = format!("{},tag1=A,tag2=B val=42i 123456", table_name);
let response = cluster.write_to_router(lp).await;
assert_eq!(response.status(), StatusCode::NO_CONTENT);
// Query for the new UUID and assert it has changed.
let mut performed_query = querier_flight.do_get(query).await.unwrap().into_inner();
let (msg, app_metadata) = next_message(&mut performed_query).await.unwrap();
assert!(matches!(msg, DecodedPayload::None), "{:?}", msg);

View File

@ -2,11 +2,13 @@ crate::maybe_pub!(
pub use super::wal_replay::*;
);
mod graceful_shutdown;
mod wal_replay;
use std::{path::PathBuf, sync::Arc, time::Duration};
use backoff::BackoffConfig;
use futures::{future::Shared, Future, FutureExt};
use generated_types::influxdata::iox::{
catalog::v1::catalog_service_server::{CatalogService, CatalogServiceServer},
ingester::v1::write_service_server::{WriteService, WriteServiceServer},
@ -14,8 +16,10 @@ use generated_types::influxdata::iox::{
use iox_arrow_flight::flight_service_server::{FlightService, FlightServiceServer};
use iox_catalog::interface::Catalog;
use iox_query::exec::Executor;
use observability_deps::tracing::*;
use parquet_file::storage::ParquetStorage;
use thiserror::Error;
use tokio::sync::oneshot;
use wal::Wal;
use crate::{
@ -33,6 +37,8 @@ use crate::{
TRANSITION_SHARD_INDEX,
};
use self::graceful_shutdown::graceful_shutdown_handler;
/// Acquire opaque handles to the Ingester RPC service implementations.
///
/// This trait serves as the public crate API boundary - callers external to the
@ -77,18 +83,34 @@ pub struct IngesterGuard<T> {
///
/// Aborted on drop.
rotation_task: tokio::task::JoinHandle<()>,
/// The task handle executing the graceful shutdown once triggered.
graceful_shutdown_handler: tokio::task::JoinHandle<()>,
shutdown_complete: Shared<oneshot::Receiver<()>>,
}
impl<T> IngesterGuard<T> {
impl<T> IngesterGuard<T>
where
T: Send + Sync,
{
/// Obtain a handle to the gRPC handlers.
pub fn rpc(&self) -> &T {
&self.rpc
}
/// Block and wait until the ingester has gracefully stopped.
pub async fn join(&self) {
self.shutdown_complete
.clone()
.await
.expect("graceful shutdown task panicked")
}
}
impl<T> Drop for IngesterGuard<T> {
fn drop(&mut self) {
self.rotation_task.abort();
self.graceful_shutdown_handler.abort();
}
}
@ -148,8 +170,21 @@ pub enum InitError {
/// value should be tuned to be slightly less than the interval between persist
/// operations, but not so long that it causes catalog load spikes at persist
/// time (which can be observed by the catalog instrumentation metrics).
///
/// ## Graceful Shutdown
///
/// When `shutdown` completes, the ingester blocks ingest (returning an error to
/// all new write requests) while still executing query requests. The ingester
/// then persists all data currently buffered.
///
/// Callers can wait for this buffer persist to complete by awaiting
/// [`IngesterGuard::join()`], which will resolve once all data has been flushed
/// to object storage.
///
/// The ingester will continue answering queries until the gRPC server is
/// stopped by the caller (managed outside of this crate).
#[allow(clippy::too_many_arguments)]
pub async fn new(
pub async fn new<F>(
catalog: Arc<dyn Catalog>,
metrics: Arc<metric::Registry>,
persist_background_fetch_time: Duration,
@ -160,7 +195,11 @@ pub async fn new(
persist_queue_depth: usize,
persist_hot_partition_cost: usize,
object_store: ParquetStorage,
) -> Result<IngesterGuard<impl IngesterRpcInterface>, InitError> {
shutdown: F,
) -> Result<IngesterGuard<impl IngesterRpcInterface>, InitError>
where
F: Future<Output = ()> + Send + 'static,
{
// Create the transition shard.
let mut txn = catalog
.start_transaction()
@ -269,11 +308,11 @@ pub async fn new(
let write_path = WalSink::new(Arc::clone(&buffer), Arc::clone(&wal));
// Spawn a background thread to periodically rotate the WAL segment file.
let handle = tokio::spawn(periodic_rotation(
wal,
let rotation_task = tokio::spawn(periodic_rotation(
Arc::clone(&wal),
wal_rotation_period,
Arc::clone(&buffer),
persist_handle,
Arc::clone(&persist_handle),
));
// Restore the highest sequence number from the WAL files, and default to 0
@ -288,6 +327,16 @@ pub async fn new(
.unwrap_or(0),
));
let (shutdown_tx, shutdown_rx) = oneshot::channel();
let shutdown_task = tokio::spawn(graceful_shutdown_handler(
shutdown,
shutdown_tx,
Arc::clone(&ingest_state),
Arc::clone(&buffer),
persist_handle,
wal,
));
Ok(IngesterGuard {
rpc: GrpcDelegate::new(
Arc::new(write_path),
@ -297,6 +346,8 @@ pub async fn new(
catalog,
metrics,
),
rotation_task: handle,
rotation_task,
graceful_shutdown_handler: shutdown_task,
shutdown_complete: shutdown_rx.shared(),
})
}

View File

@ -0,0 +1,372 @@
use std::{sync::Arc, time::Duration};
use futures::Future;
use observability_deps::tracing::*;
use tokio::sync::oneshot;
use crate::{
ingest_state::{IngestState, IngestStateError},
partition_iter::PartitionIter,
persist::{drain_buffer::persist_partitions, queue::PersistQueue},
};
/// Defines how often the shutdown task polls the partition buffers for
/// emptiness.
///
/// Polls faster in tests to avoid unnecessary delay.
#[cfg(test)]
const SHUTDOWN_POLL_INTERVAL: Duration = Duration::from_millis(50);
#[cfg(not(test))]
const SHUTDOWN_POLL_INTERVAL: Duration = Duration::from_secs(1);
/// Awaits `fut`, before blocking ingest and persisting all data.
///
/// Returns once all outstanding persist jobs have completed (regardless of what
/// started them) and all buffered data has been flushed to object store.
///
/// Correctly accounts for persist jobs that have been started (by a call to
/// [`PartitionData::mark_persisting()`] but not yet enqueued).
///
/// Ingest is blocked by setting [`IngestStateError::GracefulStop`] in the
/// [`IngestState`].
///
/// [`PartitionData::mark_persisting()`]:
/// crate::buffer_tree::partition::PartitionData::mark_persisting()
pub(super) async fn graceful_shutdown_handler<F, T, P>(
fut: F,
complete: oneshot::Sender<()>,
ingest_state: Arc<IngestState>,
buffer: T,
persist: P,
wal: Arc<wal::Wal>,
) where
F: Future<Output = ()> + Send,
T: PartitionIter + Sync,
P: PersistQueue + Clone,
{
fut.await;
info!("gracefully stopping ingester");
// Reject RPC writes.
//
// There MAY be writes ongoing that started before this state was set.
ingest_state.set(IngestStateError::GracefulStop);
info!("persisting all data before shutdown");
// Drain the buffer tree, persisting all data.
//
// Returns once the persist jobs it starts have complete.
persist_partitions(buffer.partition_iter(), &persist).await;
// There may have been concurrent persist jobs started previously by hot
// partition persistence or WAL rotation (or some other, arbitrary persist
// source) that have not yet completed (this is unlikely). There may also be
// late arriving writes that started before ingest was blocked, but did not
// buffer until after the persist was completed above (also unlikely).
//
// Wait until there is no data in the buffer at all before proceeding,
// therefore ensuring those concurrent persist operations have completed and
// no late arriving data remains buffered.
//
// NOTE: There is a small race in which a late arriving write starts before
// ingest is blocked, is then stalled the entire time partitions are
// persisted, remains stalled while this "empty" check occurs, and then
// springs to life and buffers in the buffer tree after this check has
// completed - I think this is extreme enough to accept as a theoretical
// possibility that doesn't need covering off in practice.
while buffer
.partition_iter()
.any(|p| p.lock().get_query_data().is_some())
{
if persist_partitions(buffer.partition_iter(), &persist).await != 0 {
// Late arriving writes needed persisting.
debug!("re-persisting late arriving data");
} else {
// At least one partition is returning data, and there is no data to
// start persisting, therefore there is an outstanding persist
// operation that hasn't yet been marked as complete.
debug!("waiting for concurrent persist to complete");
}
tokio::time::sleep(SHUTDOWN_POLL_INTERVAL).await;
}
// There is now no data buffered in the ingester - all data has been
// persisted to object storage.
//
// Therefore there are no ops that need replaying to rebuild the (now empty)
// buffer state, therefore all WAL segments can be deleted to prevent
// spurious replay and re-uploading of the same data.
//
// This should be made redundant by persist-driven WAL dropping:
//
// https://github.com/influxdata/influxdb_iox/issues/6566
//
wal.rotate().expect("failed to rotate wal");
for file in wal.closed_segments() {
if let Err(error) = wal.delete(file.id()).await {
// This MAY occur due to concurrent segment deletion driven by the
// WAL rotation task.
//
// If this is a legitimate failure to delete (not a "not found")
// then this causes the data to be re-uploaded - an acceptable
// outcome, and preferable to panicking here and not dropping the
// rest of the deletable files.
warn!(%error, "failed to drop WAL segment");
}
}
info!("persisted all data - stopping ingester");
let _ = complete.send(());
}
#[cfg(test)]
mod tests {
use std::{future::ready, sync::Arc, task::Poll};
use assert_matches::assert_matches;
use data_types::{NamespaceId, PartitionId, PartitionKey, SequenceNumber, ShardId, TableId};
use futures::FutureExt;
use lazy_static::lazy_static;
use mutable_batch_lp::test_helpers::lp_to_mutable_batch;
use parking_lot::Mutex;
use test_helpers::timeout::FutureTimeout;
use crate::{
buffer_tree::{
namespace::NamespaceName, partition::PartitionData, partition::SortKeyState,
table::TableName,
},
deferred_load::DeferredLoad,
persist::queue::mock::MockPersistQueue,
};
use super::*;
const PARTITION_ID: PartitionId = PartitionId::new(1);
const TRANSITION_SHARD_ID: ShardId = ShardId::new(84);
lazy_static! {
static ref PARTITION_KEY: PartitionKey = PartitionKey::from("platanos");
static ref TABLE_NAME: TableName = TableName::from("bananas");
static ref NAMESPACE_NAME: NamespaceName = NamespaceName::from("namespace-bananas");
}
// Initialise a partition containing buffered data.
fn new_partition() -> Arc<Mutex<PartitionData>> {
let mut partition = PartitionData::new(
PARTITION_ID,
PARTITION_KEY.clone(),
NamespaceId::new(3),
Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
NAMESPACE_NAME.clone()
})),
TableId::new(4),
Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
TABLE_NAME.clone()
})),
SortKeyState::Provided(None),
TRANSITION_SHARD_ID,
);
let mb = lp_to_mutable_batch(r#"bananas,city=London people=2,pigeons="millions" 10"#).1;
partition
.buffer_write(mb, SequenceNumber::new(1))
.expect("failed to write dummy data");
Arc::new(Mutex::new(partition))
}
// Initialise a WAL with > 1 segment.
async fn new_wal() -> (tempfile::TempDir, Arc<wal::Wal>) {
let dir = tempfile::tempdir().expect("failed to get temporary WAL directory");
let wal = wal::Wal::new(dir.path())
.await
.expect("failed to initialise WAL to write");
wal.rotate().expect("failed to rotate WAL");
(dir, wal)
}
#[tokio::test]
async fn test_graceful_shutdown() {
let persist = Arc::new(MockPersistQueue::default());
let ingest_state = Arc::new(IngestState::default());
let (_tempdir, wal) = new_wal().await;
let partition = new_partition();
let (tx, rx) = oneshot::channel();
graceful_shutdown_handler(
ready(()),
tx,
ingest_state,
vec![Arc::clone(&partition)],
Arc::clone(&persist),
Arc::clone(&wal),
)
.await;
// Wait for the shutdown to complete.
rx.with_timeout_panic(Duration::from_secs(5))
.await
.expect("shutdown task panicked");
// Assert the data was persisted
let persist_calls = persist.calls();
assert_matches!(&*persist_calls, [p] => {
assert!(Arc::ptr_eq(p, &partition));
});
// Assert there are now no WAL segment files that will be replayed
assert!(wal.closed_segments().is_empty());
}
#[tokio::test]
async fn test_graceful_shutdown_concurrent_persist() {
let persist = Arc::new(MockPersistQueue::default());
let ingest_state = Arc::new(IngestState::default());
let (_tempdir, wal) = new_wal().await;
let partition = new_partition();
// Mark the partition as persisting
let persist_job = partition
.lock()
.mark_persisting()
.expect("non-empty partition should begin persisting");
// Start the graceful shutdown job in another thread, as it SHOULD block
// until the persist job is marked as complete.
let (tx, rx) = oneshot::channel();
let handle = tokio::spawn(graceful_shutdown_handler(
ready(()),
tx,
ingest_state,
vec![Arc::clone(&partition)],
Arc::clone(&persist),
Arc::clone(&wal),
));
// Wait a small duration of time for the first buffer emptiness check to
// fire.
tokio::time::sleep(Duration::from_millis(200)).await;
// Assert the shutdown hasn't completed.
//
// This is racy, but will fail false negative and will not flake in CI.
// If this fails in CI, it is a legitimate bug (shutdown task should not
// have stopped).
let rx = rx.shared();
assert_matches!(futures::poll!(rx.clone()), Poll::Pending);
// Mark the persist job as having completed, unblocking the shutdown
// task.
partition.lock().mark_persisted(persist_job);
// Wait for the shutdown to complete.
rx.with_timeout_panic(Duration::from_secs(5))
.await
.expect("shutdown task panicked");
assert!(handle
.with_timeout_panic(Duration::from_secs(1))
.await
.is_ok());
// Assert the data was not passed to the persist task (it couldn't have
// been, as this caller held the PersistData)
assert!(persist.calls().is_empty());
// Assert there are now no WAL segment files that will be replayed
assert!(wal.closed_segments().is_empty());
}
/// An implementation of [`PartitionIter`] that yields an extra new,
/// non-empty partition each time [`PartitionIter::partition_iter()`] is
/// called.
#[derive(Debug)]
struct SneakyPartitionBuffer {
max: usize,
partitions: Mutex<Vec<Arc<Mutex<PartitionData>>>>,
}
impl SneakyPartitionBuffer {
fn new(max: usize) -> Self {
Self {
max,
partitions: Default::default(),
}
}
fn partitions(&self) -> Vec<Arc<Mutex<PartitionData>>> {
self.partitions.lock().clone()
}
}
impl PartitionIter for SneakyPartitionBuffer {
fn partition_iter(&self) -> Box<dyn Iterator<Item = Arc<Mutex<PartitionData>>> + Send> {
let mut partitions = self.partitions.lock();
// If this hasn't reached the maximum number of times to be sneaky,
// add another partition.
if partitions.len() != self.max {
partitions.push(new_partition());
}
Box::new(partitions.clone().into_iter())
}
}
#[tokio::test]
async fn test_graceful_shutdown_concurrent_new_writes() {
let persist = Arc::new(MockPersistQueue::default());
let ingest_state = Arc::new(IngestState::default());
let (_tempdir, wal) = new_wal().await;
// Initialise a buffer that keeps yielding more and more newly wrote
// data, up until the maximum.
const MAX_NEW_PARTITIONS: usize = 3;
let buffer = Arc::new(SneakyPartitionBuffer::new(MAX_NEW_PARTITIONS));
// Start the graceful shutdown job in another thread, as it SHOULD block
// until the persist job is marked as complete.
let (tx, rx) = oneshot::channel();
let handle = tokio::spawn(graceful_shutdown_handler(
ready(()),
tx,
ingest_state,
Arc::clone(&buffer),
Arc::clone(&persist),
Arc::clone(&wal),
));
// Wait for the shutdown to complete.
rx.with_timeout_panic(Duration::from_secs(5))
.await
.expect("shutdown task panicked");
assert!(handle
.with_timeout_panic(Duration::from_secs(1))
.await
.is_ok());
// Assert all the data yielded by the sneaky buffer was passed to the
// persist task.
let persist_calls = persist.calls();
let must_have_persisted = |p: &Arc<Mutex<PartitionData>>| {
for call in &persist_calls {
if Arc::ptr_eq(call, p) {
return true;
}
}
false
};
if !buffer.partitions().iter().all(must_have_persisted) {
panic!("at least one sneaky buffer was not passed to the persist system");
}
// Assert there are now no WAL segment files that will be replayed
assert!(wal.closed_segments().is_empty());
}
}

View File

@ -130,7 +130,7 @@ where
);
// Persist all the data that was replayed from the WAL segment.
persist_partitions(sink.partition_iter(), persist.clone()).await;
persist_partitions(sink.partition_iter(), &persist).await;
// Drop the newly persisted data - it should not be replayed.
wal.delete(file.id())

View File

@ -18,3 +18,9 @@ where
(**self).partition_iter()
}
}
impl PartitionIter for Vec<Arc<Mutex<PartitionData>>> {
fn partition_iter(&self) -> Box<dyn Iterator<Item = Arc<Mutex<PartitionData>>> + Send> {
Box::new(self.clone().into_iter())
}
}

View File

@ -14,12 +14,12 @@ use super::queue::PersistQueue;
const PERSIST_ENQUEUE_CONCURRENCY: usize = 5;
// Persist a set of [`PartitionData`], blocking for completion of all enqueued
// persist jobs.
// persist jobs and returning the number of partitions that were persisted.
//
// This call is not atomic, partitions are marked for persistence incrementally.
// Writes that landed into the partition buffer after this call, but before the
// partition data is read will be included in the persisted data.
pub(crate) async fn persist_partitions<T, P>(iter: T, persist: P)
pub(crate) async fn persist_partitions<T, P>(iter: T, persist: &P) -> usize
where
T: Iterator<Item = Arc<Mutex<PartitionData>>> + Send,
P: PersistQueue + Clone,
@ -70,8 +70,12 @@ where
"queued all non-empty partitions for persist"
);
let count = notifications.len();
// Wait for all the persist completion notifications.
for n in notifications {
n.await.expect("persist worker task panic");
}
count
}

View File

@ -119,7 +119,7 @@ use crate::{
/// crate::ingest_state::IngestStateError::PersistSaturated
#[derive(Debug)]
pub(crate) struct PersistHandle {
/// THe state/dependencies shared across all worker tasks.
/// The state/dependencies shared across all worker tasks.
worker_state: Arc<SharedWorkerState>,
/// Task handles for the worker tasks, aborted on drop of all

View File

@ -68,6 +68,8 @@ pub(crate) async fn periodic_rotation<T, P>(
// special code path between "hot partition persist" and "wal rotation
// persist" - it all works the same way!
//
// https://github.com/influxdata/influxdb_iox/issues/6566
//
// TODO: this properly as described above.
tokio::time::sleep(Duration::from_secs(5)).await;
@ -86,7 +88,7 @@ pub(crate) async fn periodic_rotation<T, P>(
// - a small price to pay for not having to block ingest while the WAL
// is rotated, all outstanding writes + queries complete, and all then
// partitions are marked as persisting.
persist_partitions(buffer.partition_iter(), persist.clone()).await;
persist_partitions(buffer.partition_iter(), &persist).await;
debug!(
closed_id = %stats.id(),

View File

@ -45,10 +45,11 @@ impl<I: IngesterRpcInterface> IngesterServerType<I> {
metrics: Arc<Registry>,
common_state: &CommonServerState,
max_simultaneous_queries: usize,
shutdown: CancellationToken,
) -> Self {
Self {
server,
shutdown: CancellationToken::new(),
shutdown,
metrics,
trace_collector: common_state.trace_collector(),
max_simultaneous_queries,
@ -101,7 +102,7 @@ impl<I: IngesterRpcInterface + Sync + Send + Debug + 'static> ServerType for Ing
}
async fn join(self: Arc<Self>) {
self.shutdown.cancelled().await;
self.server.join().await;
}
fn shutdown(&self) {
@ -148,6 +149,8 @@ pub async fn create_ingester_server_type(
exec: Arc<Executor>,
object_store: ParquetStorage,
) -> Result<Arc<dyn ServerType>> {
let shutdown = CancellationToken::new();
let grpc = ingester2::new(
catalog,
Arc::clone(&metrics),
@ -159,6 +162,10 @@ pub async fn create_ingester_server_type(
ingester_config.persist_queue_depth,
ingester_config.persist_hot_partition_cost,
object_store,
{
let shutdown = shutdown.clone();
async move { shutdown.cancelled().await }
},
)
.await?;
@ -167,5 +174,6 @@ pub async fn create_ingester_server_type(
metrics,
common_state,
ingester_config.concurrent_query_limit,
shutdown,
)))
}