fix: Wait for a particular number of Parquet files, not just any change

pull/24376/head
Carol (Nichols || Goulding) 2023-01-11 12:05:12 -05:00
parent 7e921e6a23
commit be7c312033
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
2 changed files with 35 additions and 13 deletions

View File

@ -904,7 +904,9 @@ mod kafkaless_rpc_write {
Step::RecordNumParquetFiles,
Step::WriteLineProtocol(format!("{},tag1=A,tag2=B val=42i 123456", table_name)),
// Wait for data to be persisted to parquet
Step::WaitForPersisted2,
Step::WaitForPersisted2 {
expected_increase: 1,
},
Step::Query {
sql: format!("select * from {}", table_name),
expected: vec![
@ -948,7 +950,9 @@ mod kafkaless_rpc_write {
vec![
Step::RecordNumParquetFiles,
Step::WriteLineProtocol(format!("{},tag1=A,tag2=B val=42i 123456", table_name)),
Step::WaitForPersisted2,
Step::WaitForPersisted2 {
expected_increase: 1,
},
Step::Query {
sql: format!("select * from {}", table_name),
expected: vec![
@ -983,7 +987,9 @@ mod kafkaless_rpc_write {
Step::RecordNumParquetFiles,
Step::WriteLineProtocol(format!("{},tag1=A,tag2=B val=42i 123456", table_name)),
// Wait for data to be persisted to parquet
Step::WaitForPersisted2,
Step::WaitForPersisted2 {
expected_increase: 1,
},
Step::Query {
sql: format!("select * from {}", table_name),
expected: vec![
@ -1009,7 +1015,9 @@ mod kafkaless_rpc_write {
// write another parquet file that has non duplicated data
Step::WriteLineProtocol(format!("{},tag1=B,tag2=A val=43i 789101112", table_name)),
// Wait for data to be persisted to parquet
Step::WaitForPersisted2,
Step::WaitForPersisted2 {
expected_increase: 1,
},
// query should correctly see the data in the second parquet file
Step::Query {
sql: format!("select * from {}", table_name),

View File

@ -68,19 +68,31 @@ impl<'a> StepTestState<'a> {
/// Wait for a change (up to a timeout) in the number of Parquet files the catalog has for the
/// mini cluster's namespacee since the last time the number of Parquet files was recorded,
/// which indicates persistence has taken place.
pub async fn wait_for_num_parquet_file_change(&mut self) {
pub async fn wait_for_num_parquet_file_change(&mut self, expected_increase: usize) {
let retry_duration = Duration::from_secs(MAX_QUERY_RETRY_TIME_SEC);
let num_parquet_files = self.num_parquet_files.expect(
"No previous number of Parquet files recorded! \
Use `Step::RecordNumParquetFiles` before `Step::WaitForPersisted2`.",
);
let expected_count = num_parquet_files + expected_increase;
tokio::time::timeout(retry_duration, async move {
let mut interval = tokio::time::interval(Duration::from_millis(1000));
loop {
let current_count = self.get_num_parquet_files().await;
if Some(current_count) > self.num_parquet_files {
info!("Success; Parquet file count is now {current_count}");
self.num_parquet_files = Some(current_count);
if current_count >= expected_count {
info!(
"Success; Parquet file count is now {current_count} \
which is at least {expected_count}"
);
// Reset the saved value to require recording before waiting again
self.num_parquet_files = None;
return;
}
info!("Retrying; Parquet file count is still {current_count}");
info!(
"Retrying; Parquet file count is still {current_count} \
which is less than {expected_count}"
);
interval.tick().await;
}
@ -150,9 +162,9 @@ pub enum Step {
RecordNumParquetFiles,
/// Wait for all previously written data to be persisted by observing an increase in the number
/// of Parquet files in the catalog for this cluster's namespace. Needed for
/// of Parquet files in the catalog as specified for this cluster's namespace. Needed for
/// router2/ingester2/querier2.
WaitForPersisted2,
WaitForPersisted2 { expected_increase: usize },
/// Ask the ingester if it has persisted the data. For use in tests where the querier doesn't
/// know about the ingester, so the test needs to ask the ingester directly.
@ -270,9 +282,11 @@ impl<'a> StepTest<'a> {
Step::RecordNumParquetFiles => {
state.record_num_parquet_files().await;
}
Step::WaitForPersisted2 => {
Step::WaitForPersisted2 { expected_increase } => {
info!("====Begin waiting for a change in the number of Parquet files");
state.wait_for_num_parquet_file_change().await;
state
.wait_for_num_parquet_file_change(expected_increase)
.await;
info!("====Done waiting for a change in the number of Parquet files");
}
// Specifically for cases when the querier doesn't know about the ingester so the