diff --git a/Cargo.lock b/Cargo.lock index 10be9c3c5f..617cc2d172 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2616,6 +2616,7 @@ dependencies = [ "iox_catalog", "iox_query", "iox_time", + "itertools 0.11.0", "lazy_static", "metric", "mutable_batch", diff --git a/ingester/Cargo.toml b/ingester/Cargo.toml index 9e0b2a0b27..00c1c157be 100644 --- a/ingester/Cargo.toml +++ b/ingester/Cargo.toml @@ -54,6 +54,7 @@ criterion = { version = "0.5", default-features = false, features = ["async_toki datafusion_util = { path = "../datafusion_util" } influxdb_iox_client = { path = "../influxdb_iox_client" } ingester_test_ctx = { path = "../ingester_test_ctx" } +itertools = "0.11" lazy_static = "1.4.0" mutable_batch_lp = { path = "../mutable_batch_lp" } object_store = { workspace = true } diff --git a/ingester/src/init/graceful_shutdown.rs b/ingester/src/init/graceful_shutdown.rs index 8e4a57a418..ed08395cd0 100644 --- a/ingester/src/init/graceful_shutdown.rs +++ b/ingester/src/init/graceful_shutdown.rs @@ -100,9 +100,6 @@ pub(super) async fn graceful_shutdown_handler( // Therefore there are no ops that need replaying to rebuild the (now empty) // buffer state, therefore all WAL segments can be deleted to prevent // spurious replay and re-uploading of the same data. - // - // TODO(savage): This deletion should be redundant due to the persist-driven - // WAL dropping. wal.rotate().expect("failed to rotate wal"); for file in wal.closed_segments() { if let Err(error) = wal.delete(file.id()).await { diff --git a/ingester/src/lib.rs b/ingester/src/lib.rs index e9705a1d78..8ac2233e96 100644 --- a/ingester/src/lib.rs +++ b/ingester/src/lib.rs @@ -208,6 +208,8 @@ use criterion as _; use influxdb_iox_client as _; #[cfg(test)] use ingester_test_ctx as _; +#[cfg(test)] +use itertools as _; use workspace_hack as _; /// Ingester initialisation methods & types. diff --git a/ingester/tests/write.rs b/ingester/tests/write.rs index 05de79f0e5..3268261715 100644 --- a/ingester/tests/write.rs +++ b/ingester/tests/write.rs @@ -4,11 +4,13 @@ use data_types::{PartitionKey, TableId, Timestamp}; use ingester_query_grpc::influxdata::iox::ingester::v1::IngesterQueryRequest; use ingester_test_ctx::{TestContextBuilder, DEFAULT_MAX_PERSIST_QUEUE_DEPTH}; use iox_catalog::interface::Catalog; +use itertools::Itertools; use metric::{ assert_counter, assert_histogram, DurationHistogram, U64Counter, U64Gauge, U64Histogram, }; use parquet_file::ParquetFilePath; -use std::{sync::Arc, time::Duration}; +use std::{ffi::OsString, fs::read_dir, path::Path, sync::Arc, time::Duration}; +use test_helpers::timeout::FutureTimeout; // Write data to an ingester through the RPC interface and query the data, validating the contents. #[tokio::test] @@ -440,3 +442,119 @@ async fn graceful_shutdown() { .unwrap(); assert_eq!(parquet_files.len(), 3); } + +#[tokio::test] +async fn wal_reference_dropping() { + let wal_dir = Arc::new(test_helpers::tmp_dir().unwrap()); + let metrics = Arc::new(metric::Registry::default()); + let catalog: Arc = + Arc::new(iox_catalog::mem::MemCatalog::new(Arc::clone(&metrics))); + + // Test-local namespace name + const TEST_NAMESPACE_NAME: &str = "wal_reference_dropping_test_namespace"; + // Create an ingester using a fairly low write-ahead log rotation interval + const WAL_ROTATION_PERIOD: Duration = Duration::from_secs(15); + + // Create an ingester with a low + let mut ctx = TestContextBuilder::default() + .with_wal_dir(Arc::clone(&wal_dir)) + .with_catalog(Arc::clone(&catalog)) + .with_wal_rotation_period(WAL_ROTATION_PERIOD) + .build() + .await; + + let ns = ctx.ensure_namespace(TEST_NAMESPACE_NAME, None).await; + + // Initial write + let partition_key = PartitionKey::from("1970-01-01"); + ctx.write_lp( + TEST_NAMESPACE_NAME, + "bananas greatness=\"unbounded\" 10", + partition_key.clone(), + 0, + ) + .await; + + // A subsequent write with a non-contiguous sequence number to a different table. + ctx.write_lp( + TEST_NAMESPACE_NAME, + "cpu bar=2 20\ncpu bar=3 30", + partition_key.clone(), + 7, + ) + .await; + + // And a third write that appends more data to the table in the initial + // write. + ctx.write_lp( + TEST_NAMESPACE_NAME, + "bananas count=42 200", + partition_key.clone(), + 42, + ) + .await; + + // Perform a query to validate the actual data buffered. + let data: Vec<_> = ctx + .query(IngesterQueryRequest { + namespace_id: ns.id.get(), + table_id: ctx.table_id(TEST_NAMESPACE_NAME, "bananas").await.get(), + columns: vec![], + predicate: None, + }) + .await + .expect("query request failed"); + + let expected = vec![ + "+-------+-----------+--------------------------------+", + "| count | greatness | time |", + "+-------+-----------+--------------------------------+", + "| | unbounded | 1970-01-01T00:00:00.000000010Z |", + "| 42.0 | | 1970-01-01T00:00:00.000000200Z |", + "+-------+-----------+--------------------------------+", + ]; + assert_batches_sorted_eq!(&expected, &data); + + let initial_segment_names = + get_file_names_in_dir(wal_dir.path()).expect("should be able to get file names"); + assert_eq!(initial_segment_names.len(), 1); // Ensure a single (open) segment is present + + tokio::time::pause(); + tokio::time::advance(WAL_ROTATION_PERIOD).await; + tokio::time::resume(); + + // Wait for the rotation to result in the initial segment no longer being present in + // the write-ahead log directory + async { + loop { + let segments = + get_file_names_in_dir(wal_dir.path()).expect("should be able to get file names"); + if !segments + .iter() + .any(|name| initial_segment_names.contains(name)) + { + break; + } + tokio::task::yield_now().await; + } + } + .with_timeout_panic(Duration::from_secs(5)) + .await; + + let final_segment_names = + get_file_names_in_dir(wal_dir.path()).expect("should be able to get file names"); + assert_eq!(final_segment_names.len(), 1); // Ensure a single (open) segment is present after the old one has been dropped +} + +fn get_file_names_in_dir(dir: &Path) -> Result, std::io::Error> { + read_dir(dir)? + .filter_map_ok(|f| { + if let Ok(file_type) = f.file_type() { + if file_type.is_file() { + return Some(f.file_name()); + } + } + None + }) + .collect::, std::io::Error>>() +} diff --git a/ingester_test_ctx/src/lib.rs b/ingester_test_ctx/src/lib.rs index ceeca0df00..e62d011e2a 100644 --- a/ingester_test_ctx/src/lib.rs +++ b/ingester_test_ctx/src/lib.rs @@ -56,7 +56,11 @@ pub const DEFAULT_MAX_PERSIST_QUEUE_DEPTH: usize = 5; /// The default partition hot persist cost - configurable with /// [`TestContextBuilder::with_persist_hot_partition_cost()`]. pub const DEFAULT_PERSIST_HOT_PARTITION_COST: usize = 20_000_000; - +/// The default write-ahead log rotation period - configurable with +/// [`TestContextBuilder::with_wal_rotation_period()`]. +/// This value is high to effectively stop the test ingester from +/// performing WAL rotations and the associated time-based persistence. +pub const DEFAULT_WAL_ROTATION_PERIOD: Duration = Duration::from_secs(1_000_000); /// Construct a new [`TestContextBuilder`] to make a [`TestContext`] for an [`ingester`] instance. pub fn test_context() -> TestContextBuilder { TestContextBuilder::default() @@ -70,6 +74,7 @@ pub struct TestContextBuilder { max_persist_queue_depth: usize, persist_hot_partition_cost: usize, + wal_rotation_period: Duration, } impl Default for TestContextBuilder { @@ -79,6 +84,7 @@ impl Default for TestContextBuilder { catalog: None, max_persist_queue_depth: DEFAULT_MAX_PERSIST_QUEUE_DEPTH, persist_hot_partition_cost: DEFAULT_PERSIST_HOT_PARTITION_COST, + wal_rotation_period: DEFAULT_WAL_ROTATION_PERIOD, } } } @@ -113,6 +119,14 @@ impl TestContextBuilder { self } + /// Configure the ingester to rotate the write-ahead log at the regular + /// interval specified by [`Duration`]. Defaults to + /// [`DEFAULT_WAL_ROTATION_PERIOD`]. + pub fn with_wal_rotation_period(mut self, period: Duration) -> Self { + self.wal_rotation_period = period; + self + } + /// Initialise the [`ingester`] instance and return a [`TestContext`] for it. pub async fn build(self) -> TestContext { let Self { @@ -120,6 +134,7 @@ impl TestContextBuilder { catalog, max_persist_queue_depth, persist_hot_partition_cost, + wal_rotation_period, } = self; test_helpers::maybe_start_logging(); @@ -135,9 +150,6 @@ impl TestContextBuilder { let storage = ParquetStorage::new(object_store, parquet_file::storage::StorageId::from("iox")); - // Settings so that the ingester will effectively never persist by itself, only on demand - let wal_rotation_period = Duration::from_secs(1_000_000); - let persist_background_fetch_time = Duration::from_secs(10); let persist_executor = Arc::new(iox_query::exec::Executor::new_testing()); let persist_workers = 5;