test(ingester): Integration test dropping of persisted WAL segments
This adds an integration test that writes some data to the ingester, waits for the WAL to be rotated and then ensures that the segment file has been dropped.pull/24376/head
parent
15b22728cc
commit
bea9dbf7ab
|
@ -2616,6 +2616,7 @@ dependencies = [
|
|||
"iox_catalog",
|
||||
"iox_query",
|
||||
"iox_time",
|
||||
"itertools 0.11.0",
|
||||
"lazy_static",
|
||||
"metric",
|
||||
"mutable_batch",
|
||||
|
|
|
@ -54,6 +54,7 @@ criterion = { version = "0.5", default-features = false, features = ["async_toki
|
|||
datafusion_util = { path = "../datafusion_util" }
|
||||
influxdb_iox_client = { path = "../influxdb_iox_client" }
|
||||
ingester_test_ctx = { path = "../ingester_test_ctx" }
|
||||
itertools = "0.11"
|
||||
lazy_static = "1.4.0"
|
||||
mutable_batch_lp = { path = "../mutable_batch_lp" }
|
||||
object_store = { workspace = true }
|
||||
|
|
|
@ -100,9 +100,6 @@ pub(super) async fn graceful_shutdown_handler<F, T, P>(
|
|||
// Therefore there are no ops that need replaying to rebuild the (now empty)
|
||||
// buffer state, therefore all WAL segments can be deleted to prevent
|
||||
// spurious replay and re-uploading of the same data.
|
||||
//
|
||||
// TODO(savage): This deletion should be redundant due to the persist-driven
|
||||
// WAL dropping.
|
||||
wal.rotate().expect("failed to rotate wal");
|
||||
for file in wal.closed_segments() {
|
||||
if let Err(error) = wal.delete(file.id()).await {
|
||||
|
|
|
@ -208,6 +208,8 @@ use criterion as _;
|
|||
use influxdb_iox_client as _;
|
||||
#[cfg(test)]
|
||||
use ingester_test_ctx as _;
|
||||
#[cfg(test)]
|
||||
use itertools as _;
|
||||
use workspace_hack as _;
|
||||
|
||||
/// Ingester initialisation methods & types.
|
||||
|
|
|
@ -4,11 +4,13 @@ use data_types::{PartitionKey, TableId, Timestamp};
|
|||
use ingester_query_grpc::influxdata::iox::ingester::v1::IngesterQueryRequest;
|
||||
use ingester_test_ctx::{TestContextBuilder, DEFAULT_MAX_PERSIST_QUEUE_DEPTH};
|
||||
use iox_catalog::interface::Catalog;
|
||||
use itertools::Itertools;
|
||||
use metric::{
|
||||
assert_counter, assert_histogram, DurationHistogram, U64Counter, U64Gauge, U64Histogram,
|
||||
};
|
||||
use parquet_file::ParquetFilePath;
|
||||
use std::{sync::Arc, time::Duration};
|
||||
use std::{ffi::OsString, fs::read_dir, path::Path, sync::Arc, time::Duration};
|
||||
use test_helpers::timeout::FutureTimeout;
|
||||
|
||||
// Write data to an ingester through the RPC interface and query the data, validating the contents.
|
||||
#[tokio::test]
|
||||
|
@ -440,3 +442,119 @@ async fn graceful_shutdown() {
|
|||
.unwrap();
|
||||
assert_eq!(parquet_files.len(), 3);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn wal_reference_dropping() {
|
||||
let wal_dir = Arc::new(test_helpers::tmp_dir().unwrap());
|
||||
let metrics = Arc::new(metric::Registry::default());
|
||||
let catalog: Arc<dyn Catalog> =
|
||||
Arc::new(iox_catalog::mem::MemCatalog::new(Arc::clone(&metrics)));
|
||||
|
||||
// Test-local namespace name
|
||||
const TEST_NAMESPACE_NAME: &str = "wal_reference_dropping_test_namespace";
|
||||
// Create an ingester using a fairly low write-ahead log rotation interval
|
||||
const WAL_ROTATION_PERIOD: Duration = Duration::from_secs(15);
|
||||
|
||||
// Create an ingester with a low
|
||||
let mut ctx = TestContextBuilder::default()
|
||||
.with_wal_dir(Arc::clone(&wal_dir))
|
||||
.with_catalog(Arc::clone(&catalog))
|
||||
.with_wal_rotation_period(WAL_ROTATION_PERIOD)
|
||||
.build()
|
||||
.await;
|
||||
|
||||
let ns = ctx.ensure_namespace(TEST_NAMESPACE_NAME, None).await;
|
||||
|
||||
// Initial write
|
||||
let partition_key = PartitionKey::from("1970-01-01");
|
||||
ctx.write_lp(
|
||||
TEST_NAMESPACE_NAME,
|
||||
"bananas greatness=\"unbounded\" 10",
|
||||
partition_key.clone(),
|
||||
0,
|
||||
)
|
||||
.await;
|
||||
|
||||
// A subsequent write with a non-contiguous sequence number to a different table.
|
||||
ctx.write_lp(
|
||||
TEST_NAMESPACE_NAME,
|
||||
"cpu bar=2 20\ncpu bar=3 30",
|
||||
partition_key.clone(),
|
||||
7,
|
||||
)
|
||||
.await;
|
||||
|
||||
// And a third write that appends more data to the table in the initial
|
||||
// write.
|
||||
ctx.write_lp(
|
||||
TEST_NAMESPACE_NAME,
|
||||
"bananas count=42 200",
|
||||
partition_key.clone(),
|
||||
42,
|
||||
)
|
||||
.await;
|
||||
|
||||
// Perform a query to validate the actual data buffered.
|
||||
let data: Vec<_> = ctx
|
||||
.query(IngesterQueryRequest {
|
||||
namespace_id: ns.id.get(),
|
||||
table_id: ctx.table_id(TEST_NAMESPACE_NAME, "bananas").await.get(),
|
||||
columns: vec![],
|
||||
predicate: None,
|
||||
})
|
||||
.await
|
||||
.expect("query request failed");
|
||||
|
||||
let expected = vec![
|
||||
"+-------+-----------+--------------------------------+",
|
||||
"| count | greatness | time |",
|
||||
"+-------+-----------+--------------------------------+",
|
||||
"| | unbounded | 1970-01-01T00:00:00.000000010Z |",
|
||||
"| 42.0 | | 1970-01-01T00:00:00.000000200Z |",
|
||||
"+-------+-----------+--------------------------------+",
|
||||
];
|
||||
assert_batches_sorted_eq!(&expected, &data);
|
||||
|
||||
let initial_segment_names =
|
||||
get_file_names_in_dir(wal_dir.path()).expect("should be able to get file names");
|
||||
assert_eq!(initial_segment_names.len(), 1); // Ensure a single (open) segment is present
|
||||
|
||||
tokio::time::pause();
|
||||
tokio::time::advance(WAL_ROTATION_PERIOD).await;
|
||||
tokio::time::resume();
|
||||
|
||||
// Wait for the rotation to result in the initial segment no longer being present in
|
||||
// the write-ahead log directory
|
||||
async {
|
||||
loop {
|
||||
let segments =
|
||||
get_file_names_in_dir(wal_dir.path()).expect("should be able to get file names");
|
||||
if !segments
|
||||
.iter()
|
||||
.any(|name| initial_segment_names.contains(name))
|
||||
{
|
||||
break;
|
||||
}
|
||||
tokio::task::yield_now().await;
|
||||
}
|
||||
}
|
||||
.with_timeout_panic(Duration::from_secs(5))
|
||||
.await;
|
||||
|
||||
let final_segment_names =
|
||||
get_file_names_in_dir(wal_dir.path()).expect("should be able to get file names");
|
||||
assert_eq!(final_segment_names.len(), 1); // Ensure a single (open) segment is present after the old one has been dropped
|
||||
}
|
||||
|
||||
fn get_file_names_in_dir(dir: &Path) -> Result<Vec<OsString>, std::io::Error> {
|
||||
read_dir(dir)?
|
||||
.filter_map_ok(|f| {
|
||||
if let Ok(file_type) = f.file_type() {
|
||||
if file_type.is_file() {
|
||||
return Some(f.file_name());
|
||||
}
|
||||
}
|
||||
None
|
||||
})
|
||||
.collect::<Result<Vec<_>, std::io::Error>>()
|
||||
}
|
||||
|
|
|
@ -56,7 +56,11 @@ pub const DEFAULT_MAX_PERSIST_QUEUE_DEPTH: usize = 5;
|
|||
/// The default partition hot persist cost - configurable with
|
||||
/// [`TestContextBuilder::with_persist_hot_partition_cost()`].
|
||||
pub const DEFAULT_PERSIST_HOT_PARTITION_COST: usize = 20_000_000;
|
||||
|
||||
/// The default write-ahead log rotation period - configurable with
|
||||
/// [`TestContextBuilder::with_wal_rotation_period()`].
|
||||
/// This value is high to effectively stop the test ingester from
|
||||
/// performing WAL rotations and the associated time-based persistence.
|
||||
pub const DEFAULT_WAL_ROTATION_PERIOD: Duration = Duration::from_secs(1_000_000);
|
||||
/// Construct a new [`TestContextBuilder`] to make a [`TestContext`] for an [`ingester`] instance.
|
||||
pub fn test_context() -> TestContextBuilder {
|
||||
TestContextBuilder::default()
|
||||
|
@ -70,6 +74,7 @@ pub struct TestContextBuilder {
|
|||
|
||||
max_persist_queue_depth: usize,
|
||||
persist_hot_partition_cost: usize,
|
||||
wal_rotation_period: Duration,
|
||||
}
|
||||
|
||||
impl Default for TestContextBuilder {
|
||||
|
@ -79,6 +84,7 @@ impl Default for TestContextBuilder {
|
|||
catalog: None,
|
||||
max_persist_queue_depth: DEFAULT_MAX_PERSIST_QUEUE_DEPTH,
|
||||
persist_hot_partition_cost: DEFAULT_PERSIST_HOT_PARTITION_COST,
|
||||
wal_rotation_period: DEFAULT_WAL_ROTATION_PERIOD,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -113,6 +119,14 @@ impl TestContextBuilder {
|
|||
self
|
||||
}
|
||||
|
||||
/// Configure the ingester to rotate the write-ahead log at the regular
|
||||
/// interval specified by [`Duration`]. Defaults to
|
||||
/// [`DEFAULT_WAL_ROTATION_PERIOD`].
|
||||
pub fn with_wal_rotation_period(mut self, period: Duration) -> Self {
|
||||
self.wal_rotation_period = period;
|
||||
self
|
||||
}
|
||||
|
||||
/// Initialise the [`ingester`] instance and return a [`TestContext`] for it.
|
||||
pub async fn build(self) -> TestContext<impl IngesterRpcInterface> {
|
||||
let Self {
|
||||
|
@ -120,6 +134,7 @@ impl TestContextBuilder {
|
|||
catalog,
|
||||
max_persist_queue_depth,
|
||||
persist_hot_partition_cost,
|
||||
wal_rotation_period,
|
||||
} = self;
|
||||
|
||||
test_helpers::maybe_start_logging();
|
||||
|
@ -135,9 +150,6 @@ impl TestContextBuilder {
|
|||
let storage =
|
||||
ParquetStorage::new(object_store, parquet_file::storage::StorageId::from("iox"));
|
||||
|
||||
// Settings so that the ingester will effectively never persist by itself, only on demand
|
||||
let wal_rotation_period = Duration::from_secs(1_000_000);
|
||||
|
||||
let persist_background_fetch_time = Duration::from_secs(10);
|
||||
let persist_executor = Arc::new(iox_query::exec::Executor::new_testing());
|
||||
let persist_workers = 5;
|
||||
|
|
Loading…
Reference in New Issue