fix: fix flaky partition tests with a retry (#8523)

pull/24376/head
Andrew Lamb 2023-08-18 14:12:49 -04:00 committed by GitHub
parent 5d17a99dbb
commit 38c3229e35
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 31 additions and 7 deletions

View File

@ -105,6 +105,35 @@ impl<'a> StepTestState<'a> {
.map(|parquet_files| parquet_files.len())
.unwrap_or_default()
}
/// waits for `MAX_QUERY_RETRY_TIME_SEC` for the database to
/// report exactly `expected` for its partition keys
async fn wait_for_partition_keys(
&self,
table_name: &str,
namespace_name: &Option<String>,
expected: &[&str],
) {
let retry_duration = Duration::from_secs(MAX_QUERY_RETRY_TIME_SEC);
let partition_keys = tokio::time::timeout(retry_duration, async {
loop {
let mut partition_keys = self
.cluster()
.partition_keys(table_name, namespace_name.clone())
.await;
partition_keys.sort();
info!("====Read partition keys: {partition_keys:?}");
if partition_keys == *expected {
return partition_keys;
}
}
})
.await
.expect("did not get expected partition keys before timeout");
assert_eq!(partition_keys, *expected);
}
}
/// Function used for custom [`Step`]s.
@ -613,14 +642,9 @@ where
.await;
info!("====Begin reading partition keys for table: {}", table_name);
let partition_keys = state
.cluster()
.partition_keys(table_name, namespace_name.clone())
state
.wait_for_partition_keys(table_name, namespace_name, expected)
.await;
// order the partition keys so that we can compare them
let mut partition_keys = partition_keys;
partition_keys.sort();
assert_eq!(partition_keys, *expected);
info!("====Done reading partition keys");
}
Step::GracefulStopIngesters => {