From 7e921e6a234feaa999b71a5f283a0a5a6bd3925a Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Wed, 11 Jan 2023 11:38:01 -0500 Subject: [PATCH] fix: Make recording num parquet files an explicit test step To support a case where someone calls WriteLineProtocol twice in a row to simulate two write requests. The test should be able to record this state before the two write requests and not twice. --- influxdb_iox/tests/end_to_end_cases/querier.rs | 4 ++++ test_helpers_end_to_end/src/steps.rs | 14 +++++++++++--- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/influxdb_iox/tests/end_to_end_cases/querier.rs b/influxdb_iox/tests/end_to_end_cases/querier.rs index 4bdab22727..9dcb496f6c 100644 --- a/influxdb_iox/tests/end_to_end_cases/querier.rs +++ b/influxdb_iox/tests/end_to_end_cases/querier.rs @@ -901,6 +901,7 @@ mod kafkaless_rpc_write { StepTest::new( &mut cluster, vec![ + Step::RecordNumParquetFiles, Step::WriteLineProtocol(format!("{},tag1=A,tag2=B val=42i 123456", table_name)), // Wait for data to be persisted to parquet Step::WaitForPersisted2, @@ -945,6 +946,7 @@ mod kafkaless_rpc_write { StepTest::new( &mut cluster, vec![ + Step::RecordNumParquetFiles, Step::WriteLineProtocol(format!("{},tag1=A,tag2=B val=42i 123456", table_name)), Step::WaitForPersisted2, Step::Query { @@ -978,6 +980,7 @@ mod kafkaless_rpc_write { let mut cluster = MiniCluster::create_shared2(database_url).await; let steps = vec![ + Step::RecordNumParquetFiles, Step::WriteLineProtocol(format!("{},tag1=A,tag2=B val=42i 123456", table_name)), // Wait for data to be persisted to parquet Step::WaitForPersisted2, @@ -1002,6 +1005,7 @@ mod kafkaless_rpc_write { "+------+------+--------------------------------+-----+", ], }, + Step::RecordNumParquetFiles, // write another parquet file that has non duplicated data Step::WriteLineProtocol(format!("{},tag1=B,tag2=A val=43i 789101112", table_name)), // Wait for data to be persisted to parquet diff --git a/test_helpers_end_to_end/src/steps.rs b/test_helpers_end_to_end/src/steps.rs index 756ce693e8..2dd0ba6b52 100644 --- a/test_helpers_end_to_end/src/steps.rs +++ b/test_helpers_end_to_end/src/steps.rs @@ -143,6 +143,12 @@ pub enum Step { /// Wait for all previously written data to be persisted WaitForPersisted, + /// Ask the catalog service how many Parquet files it has for this cluster's namespace. Do this + /// before a write where you're interested in when the write has been persisted to Parquet; + /// then after the write use `WaitForPersisted2` to observe the change in the number of Parquet + /// files from the value this step recorded. + RecordNumParquetFiles, + /// Wait for all previously written data to be persisted by observing an increase in the number /// of Parquet files in the catalog for this cluster's namespace. Needed for /// router2/ingester2/querier2. @@ -235,9 +241,6 @@ impl<'a> StepTest<'a> { "====Begin writing line protocol to v2 HTTP API:\n{}", line_protocol ); - // Get the current number of Parquet files in the cluster's namespace before - // starting a new write so we can observe a change when waiting for persistence. - state.record_num_parquet_files().await; let response = state.cluster.write_to_router(line_protocol).await; assert_eq!(response.status(), StatusCode::NO_CONTENT); let write_token = get_write_token(&response); @@ -262,6 +265,11 @@ impl<'a> StepTest<'a> { } info!("====Done waiting for all write tokens to be persisted"); } + // Get the current number of Parquet files in the cluster's namespace before + // starting a new write so we can observe a change when waiting for persistence. + Step::RecordNumParquetFiles => { + state.record_num_parquet_files().await; + } Step::WaitForPersisted2 => { info!("====Begin waiting for a change in the number of Parquet files"); state.wait_for_num_parquet_file_change().await;