test: assert all upstreams are (re)-tried
Adds a test ensuring requests are sent to all upstream ingesters until a request succeeds.pull/24376/head
parent
c0a53ae5a9
commit
f28ba2381d
|
@ -267,8 +267,10 @@ mod tests {
|
|||
assert_eq!(got_tables, want_tables);
|
||||
}
|
||||
|
||||
/// Ensure all candidates returned by the balancer are tried, aborting after
|
||||
/// the first successful request.
|
||||
#[tokio::test]
|
||||
async fn test_write_retries() {
|
||||
async fn test_write_tries_all_candidates() {
|
||||
let batches = lp_to_writes("bananas,tag1=A,tag2=B val=42i 1");
|
||||
|
||||
// Wrap the table batches in a partition key
|
||||
|
@ -280,10 +282,12 @@ mod tests {
|
|||
.with_ret([Err(RpcWriteError::Upstream(tonic::Status::internal("")))]),
|
||||
);
|
||||
let client2 = Arc::new(MockWriteClient::default());
|
||||
let client3 = Arc::new(MockWriteClient::default());
|
||||
let handler = RpcWrite::new(
|
||||
[
|
||||
(Arc::clone(&client1), "client1"),
|
||||
(Arc::clone(&client2), "client2"),
|
||||
(Arc::clone(&client3), "client3"),
|
||||
],
|
||||
&metric::Registry::default(),
|
||||
);
|
||||
|
@ -306,6 +310,78 @@ mod tests {
|
|||
calls.pop().unwrap()
|
||||
};
|
||||
|
||||
// Ensure client 3 was not called.
|
||||
assert!(client3.calls().is_empty());
|
||||
|
||||
let payload = assert_matches!(call.payload, Some(p) => p);
|
||||
assert_eq!(payload.database_id, NAMESPACE_ID.get());
|
||||
assert_eq!(payload.partition_key, "2022-01-01");
|
||||
assert_eq!(payload.table_batches.len(), 1);
|
||||
|
||||
let got_tables = payload
|
||||
.table_batches
|
||||
.into_iter()
|
||||
.map(|t| t.table_id)
|
||||
.collect::<HashSet<_>>();
|
||||
|
||||
let want_tables = batches
|
||||
.into_iter()
|
||||
.map(|(id, (_name, _data))| id.get())
|
||||
.collect::<HashSet<_>>();
|
||||
|
||||
assert_eq!(got_tables, want_tables);
|
||||
}
|
||||
|
||||
/// Ensure all candidates are tried more than once should they first fail.
|
||||
#[tokio::test]
|
||||
async fn test_write_retries() {
|
||||
let batches = lp_to_writes("bananas,tag1=A,tag2=B val=42i 1");
|
||||
|
||||
// Wrap the table batches in a partition key
|
||||
let input = Partitioned::new(PartitionKey::from("2022-01-01"), batches.clone());
|
||||
|
||||
// The first client in line fails the first request, but will succeed
|
||||
// the second try.
|
||||
let client1 = Arc::new(MockWriteClient::default().with_ret([
|
||||
Err(RpcWriteError::Upstream(tonic::Status::internal(""))),
|
||||
Ok(()),
|
||||
]));
|
||||
// This client always errors.
|
||||
let client2 = Arc::new(MockWriteClient::default().with_ret([
|
||||
Err(RpcWriteError::Upstream(tonic::Status::internal(""))),
|
||||
Err(RpcWriteError::Upstream(tonic::Status::internal(""))),
|
||||
Err(RpcWriteError::Upstream(tonic::Status::internal(""))),
|
||||
Err(RpcWriteError::Upstream(tonic::Status::internal(""))),
|
||||
Err(RpcWriteError::Upstream(tonic::Status::internal(""))),
|
||||
Err(RpcWriteError::Upstream(tonic::Status::internal(""))),
|
||||
]));
|
||||
|
||||
let handler = RpcWrite::new(
|
||||
[
|
||||
(Arc::clone(&client1), "client1"),
|
||||
(Arc::clone(&client2), "client2"),
|
||||
],
|
||||
&metric::Registry::default(),
|
||||
);
|
||||
|
||||
// Drive the RPC writer
|
||||
let got = handler
|
||||
.write(
|
||||
&NamespaceName::new(NAMESPACE_NAME).unwrap(),
|
||||
NAMESPACE_ID,
|
||||
input,
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
assert_matches!(got, Ok(_));
|
||||
|
||||
// Ensure client 1 observed both calls.
|
||||
let call = {
|
||||
let mut calls = client1.calls();
|
||||
assert_eq!(calls.len(), 2);
|
||||
calls.pop().unwrap()
|
||||
};
|
||||
|
||||
let payload = assert_matches!(call.payload, Some(p) => p);
|
||||
assert_eq!(payload.database_id, NAMESPACE_ID.get());
|
||||
assert_eq!(payload.partition_key, "2022-01-01");
|
||||
|
|
Loading…
Reference in New Issue