fix: Make recording num parquet files an explicit test step
To support a case where someone calls WriteLineProtocol twice in a row to simulate two write requests. The test should be able to record this state before the two write requests and not twice.pull/24376/head
parent
6677ae5c61
commit
7e921e6a23
|
@ -901,6 +901,7 @@ mod kafkaless_rpc_write {
|
|||
StepTest::new(
|
||||
&mut cluster,
|
||||
vec![
|
||||
Step::RecordNumParquetFiles,
|
||||
Step::WriteLineProtocol(format!("{},tag1=A,tag2=B val=42i 123456", table_name)),
|
||||
// Wait for data to be persisted to parquet
|
||||
Step::WaitForPersisted2,
|
||||
|
@ -945,6 +946,7 @@ mod kafkaless_rpc_write {
|
|||
StepTest::new(
|
||||
&mut cluster,
|
||||
vec![
|
||||
Step::RecordNumParquetFiles,
|
||||
Step::WriteLineProtocol(format!("{},tag1=A,tag2=B val=42i 123456", table_name)),
|
||||
Step::WaitForPersisted2,
|
||||
Step::Query {
|
||||
|
@ -978,6 +980,7 @@ mod kafkaless_rpc_write {
|
|||
let mut cluster = MiniCluster::create_shared2(database_url).await;
|
||||
|
||||
let steps = vec![
|
||||
Step::RecordNumParquetFiles,
|
||||
Step::WriteLineProtocol(format!("{},tag1=A,tag2=B val=42i 123456", table_name)),
|
||||
// Wait for data to be persisted to parquet
|
||||
Step::WaitForPersisted2,
|
||||
|
@ -1002,6 +1005,7 @@ mod kafkaless_rpc_write {
|
|||
"+------+------+--------------------------------+-----+",
|
||||
],
|
||||
},
|
||||
Step::RecordNumParquetFiles,
|
||||
// write another parquet file that has non duplicated data
|
||||
Step::WriteLineProtocol(format!("{},tag1=B,tag2=A val=43i 789101112", table_name)),
|
||||
// Wait for data to be persisted to parquet
|
||||
|
|
|
@ -143,6 +143,12 @@ pub enum Step {
|
|||
/// Wait for all previously written data to be persisted
|
||||
WaitForPersisted,
|
||||
|
||||
/// Ask the catalog service how many Parquet files it has for this cluster's namespace. Do this
|
||||
/// before a write where you're interested in when the write has been persisted to Parquet;
|
||||
/// then after the write use `WaitForPersisted2` to observe the change in the number of Parquet
|
||||
/// files from the value this step recorded.
|
||||
RecordNumParquetFiles,
|
||||
|
||||
/// Wait for all previously written data to be persisted by observing an increase in the number
|
||||
/// of Parquet files in the catalog for this cluster's namespace. Needed for
|
||||
/// router2/ingester2/querier2.
|
||||
|
@ -235,9 +241,6 @@ impl<'a> StepTest<'a> {
|
|||
"====Begin writing line protocol to v2 HTTP API:\n{}",
|
||||
line_protocol
|
||||
);
|
||||
// Get the current number of Parquet files in the cluster's namespace before
|
||||
// starting a new write so we can observe a change when waiting for persistence.
|
||||
state.record_num_parquet_files().await;
|
||||
let response = state.cluster.write_to_router(line_protocol).await;
|
||||
assert_eq!(response.status(), StatusCode::NO_CONTENT);
|
||||
let write_token = get_write_token(&response);
|
||||
|
@ -262,6 +265,11 @@ impl<'a> StepTest<'a> {
|
|||
}
|
||||
info!("====Done waiting for all write tokens to be persisted");
|
||||
}
|
||||
// Get the current number of Parquet files in the cluster's namespace before
|
||||
// starting a new write so we can observe a change when waiting for persistence.
|
||||
Step::RecordNumParquetFiles => {
|
||||
state.record_num_parquet_files().await;
|
||||
}
|
||||
Step::WaitForPersisted2 => {
|
||||
info!("====Begin waiting for a change in the number of Parquet files");
|
||||
state.wait_for_num_parquet_file_change().await;
|
||||
|
|
Loading…
Reference in New Issue