diff --git a/influxdb_iox/tests/end_to_end_cases/querier.rs b/influxdb_iox/tests/end_to_end_cases/querier.rs index 9dcb496f6c..6b337a1fd5 100644 --- a/influxdb_iox/tests/end_to_end_cases/querier.rs +++ b/influxdb_iox/tests/end_to_end_cases/querier.rs @@ -904,7 +904,9 @@ mod kafkaless_rpc_write { Step::RecordNumParquetFiles, Step::WriteLineProtocol(format!("{},tag1=A,tag2=B val=42i 123456", table_name)), // Wait for data to be persisted to parquet - Step::WaitForPersisted2, + Step::WaitForPersisted2 { + expected_increase: 1, + }, Step::Query { sql: format!("select * from {}", table_name), expected: vec![ @@ -948,7 +950,9 @@ mod kafkaless_rpc_write { vec![ Step::RecordNumParquetFiles, Step::WriteLineProtocol(format!("{},tag1=A,tag2=B val=42i 123456", table_name)), - Step::WaitForPersisted2, + Step::WaitForPersisted2 { + expected_increase: 1, + }, Step::Query { sql: format!("select * from {}", table_name), expected: vec![ @@ -983,7 +987,9 @@ mod kafkaless_rpc_write { Step::RecordNumParquetFiles, Step::WriteLineProtocol(format!("{},tag1=A,tag2=B val=42i 123456", table_name)), // Wait for data to be persisted to parquet - Step::WaitForPersisted2, + Step::WaitForPersisted2 { + expected_increase: 1, + }, Step::Query { sql: format!("select * from {}", table_name), expected: vec![ @@ -1009,7 +1015,9 @@ mod kafkaless_rpc_write { // write another parquet file that has non duplicated data Step::WriteLineProtocol(format!("{},tag1=B,tag2=A val=43i 789101112", table_name)), // Wait for data to be persisted to parquet - Step::WaitForPersisted2, + Step::WaitForPersisted2 { + expected_increase: 1, + }, // query should correctly see the data in the second parquet file Step::Query { sql: format!("select * from {}", table_name), diff --git a/test_helpers_end_to_end/src/steps.rs b/test_helpers_end_to_end/src/steps.rs index 2dd0ba6b52..c567dc3916 100644 --- a/test_helpers_end_to_end/src/steps.rs +++ b/test_helpers_end_to_end/src/steps.rs @@ -68,19 +68,31 @@ impl<'a> StepTestState<'a> { /// Wait for a change (up to a timeout) in the number of Parquet files the catalog has for the /// mini cluster's namespacee since the last time the number of Parquet files was recorded, /// which indicates persistence has taken place. - pub async fn wait_for_num_parquet_file_change(&mut self) { + pub async fn wait_for_num_parquet_file_change(&mut self, expected_increase: usize) { let retry_duration = Duration::from_secs(MAX_QUERY_RETRY_TIME_SEC); + let num_parquet_files = self.num_parquet_files.expect( + "No previous number of Parquet files recorded! \ + Use `Step::RecordNumParquetFiles` before `Step::WaitForPersisted2`.", + ); + let expected_count = num_parquet_files + expected_increase; tokio::time::timeout(retry_duration, async move { let mut interval = tokio::time::interval(Duration::from_millis(1000)); loop { let current_count = self.get_num_parquet_files().await; - if Some(current_count) > self.num_parquet_files { - info!("Success; Parquet file count is now {current_count}"); - self.num_parquet_files = Some(current_count); + if current_count >= expected_count { + info!( + "Success; Parquet file count is now {current_count} \ + which is at least {expected_count}" + ); + // Reset the saved value to require recording before waiting again + self.num_parquet_files = None; return; } - info!("Retrying; Parquet file count is still {current_count}"); + info!( + "Retrying; Parquet file count is still {current_count} \ + which is less than {expected_count}" + ); interval.tick().await; } @@ -150,9 +162,9 @@ pub enum Step { RecordNumParquetFiles, /// Wait for all previously written data to be persisted by observing an increase in the number - /// of Parquet files in the catalog for this cluster's namespace. Needed for + /// of Parquet files in the catalog as specified for this cluster's namespace. Needed for /// router2/ingester2/querier2. - WaitForPersisted2, + WaitForPersisted2 { expected_increase: usize }, /// Ask the ingester if it has persisted the data. For use in tests where the querier doesn't /// know about the ingester, so the test needs to ask the ingester directly. @@ -270,9 +282,11 @@ impl<'a> StepTest<'a> { Step::RecordNumParquetFiles => { state.record_num_parquet_files().await; } - Step::WaitForPersisted2 => { + Step::WaitForPersisted2 { expected_increase } => { info!("====Begin waiting for a change in the number of Parquet files"); - state.wait_for_num_parquet_file_change().await; + state + .wait_for_num_parquet_file_change(expected_increase) + .await; info!("====Done waiting for a change in the number of Parquet files"); } // Specifically for cases when the querier doesn't know about the ingester so the