test(ingester): graceful shutdown
Adds a ingester e2e test asserting the graceful shutdown behaviour; specifically that: * Buffered data is uploaded to object storage * WAL files are cleaned up and empty (no replay on startup)pull/24376/head
parent
a385bb2ab2
commit
eb72fbf3d3
|
@ -23,8 +23,10 @@ use observability_deps::tracing::*;
|
|||
use parquet_file::storage::ParquetStorage;
|
||||
use std::{collections::HashMap, path::PathBuf, sync::Arc, time::Duration};
|
||||
use tempfile::TempDir;
|
||||
use test_helpers::timeout::FutureTimeout;
|
||||
use tokio::sync::oneshot;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tonic::Request;
|
||||
|
||||
pub const TEST_TOPIC_NAME: &str = "banana-topics";
|
||||
|
||||
|
@ -365,6 +367,32 @@ where
|
|||
Ok(record_batches)
|
||||
}
|
||||
|
||||
/// Request `namespace` be persisted and block for its completion.
|
||||
pub async fn persist(&self, namespace: impl Into<String>) {
|
||||
use generated_types::influxdata::iox::ingester::v1::{
|
||||
self as proto, persist_service_server::PersistService,
|
||||
};
|
||||
|
||||
let namespace = namespace.into();
|
||||
self.ingester
|
||||
.rpc()
|
||||
.persist_service()
|
||||
.persist(Request::new(proto::PersistRequest { namespace }))
|
||||
.await
|
||||
.expect("failed to invoke persist");
|
||||
}
|
||||
|
||||
/// Gracefully stop the ingester, blocking until completion.
|
||||
pub async fn shutdown(self) {
|
||||
self.shutdown_tx
|
||||
.send(CancellationToken::new())
|
||||
.expect("shutdown channel dead");
|
||||
self.ingester
|
||||
.join()
|
||||
.with_timeout_panic(Duration::from_secs(10))
|
||||
.await;
|
||||
}
|
||||
|
||||
/// Retrieve the specified metric value.
|
||||
pub fn get_metric<U, A>(&self, name: &'static str, attrs: A) -> U::Recorder
|
||||
where
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
mod common;
|
||||
|
||||
use arrow_util::assert_batches_sorted_eq;
|
||||
use assert_matches::assert_matches;
|
||||
use common::*;
|
||||
use data_types::PartitionKey;
|
||||
use influxdb_iox_client::flight::generated_types::IngesterQueryRequest;
|
||||
|
@ -181,3 +182,110 @@ async fn wal_replay() {
|
|||
actual_table_ids.sort();
|
||||
assert_eq!(actual_table_ids, expected_table_ids);
|
||||
}
|
||||
|
||||
// Ensure that data applied to an ingester is persisted at shutdown, and the WAL
|
||||
// files are cleared.
|
||||
#[tokio::test]
|
||||
async fn graceful_shutdown() {
|
||||
let wal_dir = Arc::new(test_helpers::tmp_dir().unwrap());
|
||||
let metrics: Arc<metric::Registry> = Default::default();
|
||||
let catalog: Arc<dyn Catalog> =
|
||||
Arc::new(iox_catalog::mem::MemCatalog::new(Arc::clone(&metrics)));
|
||||
let namespace_name = "wal_replay_test_namespace";
|
||||
|
||||
let mut ctx = test_context()
|
||||
.wal_dir(Arc::clone(&wal_dir))
|
||||
.catalog(Arc::clone(&catalog))
|
||||
.build()
|
||||
.await;
|
||||
|
||||
let ns = ctx.ensure_namespace(namespace_name, None).await;
|
||||
let namespace_id = ns.id;
|
||||
|
||||
// Initial write
|
||||
let partition_key = PartitionKey::from("1970-01-01");
|
||||
ctx.write_lp(
|
||||
namespace_name,
|
||||
"bananas greatness=\"unbounded\" 10",
|
||||
partition_key.clone(),
|
||||
0,
|
||||
)
|
||||
.await;
|
||||
|
||||
// Persist the data
|
||||
ctx.persist(namespace_name).await;
|
||||
|
||||
// A subsequent write with a non-contiguous sequence number to a different table.
|
||||
ctx.write_lp(
|
||||
namespace_name,
|
||||
"cpu bar=2 20\ncpu bar=3 30",
|
||||
partition_key.clone(),
|
||||
7,
|
||||
)
|
||||
.await;
|
||||
|
||||
// And a third write that appends more data to the table in the initial
|
||||
// write.
|
||||
ctx.write_lp(
|
||||
namespace_name,
|
||||
"bananas count=42 200",
|
||||
partition_key.clone(),
|
||||
42,
|
||||
)
|
||||
.await;
|
||||
|
||||
// Perform a query to validate the actual data buffered.
|
||||
let data: Vec<_> = ctx
|
||||
.query(IngesterQueryRequest {
|
||||
namespace_id: ns.id.get(),
|
||||
table_id: ctx.table_id(namespace_name, "bananas").await.get(),
|
||||
columns: vec![],
|
||||
predicate: None,
|
||||
})
|
||||
.await
|
||||
.expect("query request failed");
|
||||
|
||||
let expected = vec![
|
||||
"+-------+--------------------------------+",
|
||||
"| count | time |",
|
||||
"+-------+--------------------------------+",
|
||||
"| 42.0 | 1970-01-01T00:00:00.000000200Z |",
|
||||
"+-------+--------------------------------+",
|
||||
];
|
||||
assert_batches_sorted_eq!(&expected, &data);
|
||||
|
||||
// Gracefully stop the ingester.
|
||||
ctx.shutdown().await;
|
||||
|
||||
// Inspect the WAL files.
|
||||
//
|
||||
// There should be one WAL file, containing no operations.
|
||||
let wal = wal::Wal::new(wal_dir.path())
|
||||
.await
|
||||
.expect("failed to reinitialise WAL");
|
||||
|
||||
let wal_files = wal.closed_segments();
|
||||
assert_eq!(wal_files.len(), 1);
|
||||
|
||||
let mut reader = wal
|
||||
.reader_for_segment(wal_files[0].id())
|
||||
.expect("failed to open wal segment");
|
||||
|
||||
// Assert the file contains no operations
|
||||
assert_matches!(
|
||||
reader
|
||||
.next_batch()
|
||||
.expect("failed to read wal segment contents"),
|
||||
None
|
||||
);
|
||||
|
||||
// Validate the parquet files were added to the catalog during shutdown.
|
||||
let parquet_files = catalog
|
||||
.repositories()
|
||||
.await
|
||||
.parquet_files()
|
||||
.list_by_namespace_not_to_delete(namespace_id)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(parquet_files.len(), 3);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue