From eb72fbf3d379cebaca79e643521360f4a7401aa4 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Tue, 21 Mar 2023 14:21:56 +0100 Subject: [PATCH] test(ingester): graceful shutdown Adds a ingester e2e test asserting the graceful shutdown behaviour; specifically that: * Buffered data is uploaded to object storage * WAL files are cleaned up and empty (no replay on startup) --- ingester2/tests/common/mod.rs | 28 +++++++++ ingester2/tests/write.rs | 108 ++++++++++++++++++++++++++++++++++ 2 files changed, 136 insertions(+) diff --git a/ingester2/tests/common/mod.rs b/ingester2/tests/common/mod.rs index 4515c241a9..e1017fb309 100644 --- a/ingester2/tests/common/mod.rs +++ b/ingester2/tests/common/mod.rs @@ -23,8 +23,10 @@ use observability_deps::tracing::*; use parquet_file::storage::ParquetStorage; use std::{collections::HashMap, path::PathBuf, sync::Arc, time::Duration}; use tempfile::TempDir; +use test_helpers::timeout::FutureTimeout; use tokio::sync::oneshot; use tokio_util::sync::CancellationToken; +use tonic::Request; pub const TEST_TOPIC_NAME: &str = "banana-topics"; @@ -365,6 +367,32 @@ where Ok(record_batches) } + /// Request `namespace` be persisted and block for its completion. + pub async fn persist(&self, namespace: impl Into) { + use generated_types::influxdata::iox::ingester::v1::{ + self as proto, persist_service_server::PersistService, + }; + + let namespace = namespace.into(); + self.ingester + .rpc() + .persist_service() + .persist(Request::new(proto::PersistRequest { namespace })) + .await + .expect("failed to invoke persist"); + } + + /// Gracefully stop the ingester, blocking until completion. + pub async fn shutdown(self) { + self.shutdown_tx + .send(CancellationToken::new()) + .expect("shutdown channel dead"); + self.ingester + .join() + .with_timeout_panic(Duration::from_secs(10)) + .await; + } + /// Retrieve the specified metric value. pub fn get_metric(&self, name: &'static str, attrs: A) -> U::Recorder where diff --git a/ingester2/tests/write.rs b/ingester2/tests/write.rs index be8c2937bd..c87723d4ab 100644 --- a/ingester2/tests/write.rs +++ b/ingester2/tests/write.rs @@ -1,6 +1,7 @@ mod common; use arrow_util::assert_batches_sorted_eq; +use assert_matches::assert_matches; use common::*; use data_types::PartitionKey; use influxdb_iox_client::flight::generated_types::IngesterQueryRequest; @@ -181,3 +182,110 @@ async fn wal_replay() { actual_table_ids.sort(); assert_eq!(actual_table_ids, expected_table_ids); } + +// Ensure that data applied to an ingester is persisted at shutdown, and the WAL +// files are cleared. +#[tokio::test] +async fn graceful_shutdown() { + let wal_dir = Arc::new(test_helpers::tmp_dir().unwrap()); + let metrics: Arc = Default::default(); + let catalog: Arc = + Arc::new(iox_catalog::mem::MemCatalog::new(Arc::clone(&metrics))); + let namespace_name = "wal_replay_test_namespace"; + + let mut ctx = test_context() + .wal_dir(Arc::clone(&wal_dir)) + .catalog(Arc::clone(&catalog)) + .build() + .await; + + let ns = ctx.ensure_namespace(namespace_name, None).await; + let namespace_id = ns.id; + + // Initial write + let partition_key = PartitionKey::from("1970-01-01"); + ctx.write_lp( + namespace_name, + "bananas greatness=\"unbounded\" 10", + partition_key.clone(), + 0, + ) + .await; + + // Persist the data + ctx.persist(namespace_name).await; + + // A subsequent write with a non-contiguous sequence number to a different table. + ctx.write_lp( + namespace_name, + "cpu bar=2 20\ncpu bar=3 30", + partition_key.clone(), + 7, + ) + .await; + + // And a third write that appends more data to the table in the initial + // write. + ctx.write_lp( + namespace_name, + "bananas count=42 200", + partition_key.clone(), + 42, + ) + .await; + + // Perform a query to validate the actual data buffered. + let data: Vec<_> = ctx + .query(IngesterQueryRequest { + namespace_id: ns.id.get(), + table_id: ctx.table_id(namespace_name, "bananas").await.get(), + columns: vec![], + predicate: None, + }) + .await + .expect("query request failed"); + + let expected = vec![ + "+-------+--------------------------------+", + "| count | time |", + "+-------+--------------------------------+", + "| 42.0 | 1970-01-01T00:00:00.000000200Z |", + "+-------+--------------------------------+", + ]; + assert_batches_sorted_eq!(&expected, &data); + + // Gracefully stop the ingester. + ctx.shutdown().await; + + // Inspect the WAL files. + // + // There should be one WAL file, containing no operations. + let wal = wal::Wal::new(wal_dir.path()) + .await + .expect("failed to reinitialise WAL"); + + let wal_files = wal.closed_segments(); + assert_eq!(wal_files.len(), 1); + + let mut reader = wal + .reader_for_segment(wal_files[0].id()) + .expect("failed to open wal segment"); + + // Assert the file contains no operations + assert_matches!( + reader + .next_batch() + .expect("failed to read wal segment contents"), + None + ); + + // Validate the parquet files were added to the catalog during shutdown. + let parquet_files = catalog + .repositories() + .await + .parquet_files() + .list_by_namespace_not_to_delete(namespace_id) + .await + .unwrap(); + assert_eq!(parquet_files.len(), 3); +}