test(router): replication prop/invariant fuzzing

Adds a property-based test of the RPC write handler's replication logic,
ensuring:

    1. If the number of healthy upstreams is 0, NoHealthyUpstreams is
       returned and no requests are attempted.

    2. Given N healthy upstreams (> 0) and a replication factor of R:
       if N < R, "not enough replicas" is returned and no requests are
       attempted.

    3. Upstreams that return an error are retried until the entire
       write succeeds or times out.

    4. Writes are replicated to R distinct upstreams successfully, or
       an error is returned.

    5. One an upstream write is ack'd as successful, it is never
       requested again.

    6. An upstream reporting as unhealthy at the start of the write is
       never requested (excluding probe requests).

These properties describe a mixture of invariants (don't replicate your
two copies of a write to the same ingester) and expected behaviour of
the replication logic (optimisations like "don't try writes when you
already know they'll fail").

This passes for the single-threaded replication logic used at the time
of this commit, and will be used to validate correctness of a concurrent
replication implementation - a concurrent approach should uphold these
properties the same way a single-threaded implementation does.
pull/24376/head
Dom Dwyer 2023-05-08 12:09:58 +02:00
parent cf622c1b91
commit 465158e08e
No known key found for this signature in database
GPG Key ID: E4C40DBD9157879A
1 changed files with 147 additions and 0 deletions

View File

@ -325,7 +325,9 @@ mod tests {
use assert_matches::assert_matches;
use data_types::{NamespaceId, PartitionKey};
use proptest::{prelude::*, prop_compose, proptest};
use rand::seq::SliceRandom;
use tokio::runtime;
use crate::dml_handlers::rpc_write::circuit_breaking_client::mock::MockCircuitBreaker;
@ -871,4 +873,149 @@ mod tests {
.chain(client_3.calls().iter())
.all(|v| *v == calls_1[0]));
}
prop_compose! {
/// Return an arbitrary results containing [`RpcWriteError`] from a
/// subset of easily constructed errors, or [`Ok`].
fn arbitrary_write_result()(which in 0..3) -> Result<(), RpcWriteClientError> {
match which {
0 => Ok(()),
1 => Err(RpcWriteClientError::Upstream(tonic::Status::internal("bananas"))),
2 => Err(RpcWriteClientError::UpstreamNotConnected("bananas".to_string())),
_ => unreachable!(),
}
}
}
prop_compose! {
/// Generate an upstream that is arbitrarily healthy/unhealthy, and will
/// arbitrarily succeed or fail when a write is attempted (a bounded
/// number of times).
fn arbitrary_mock_upstream()(
healthy in any::<bool>(),
responses in proptest::collection::vec(arbitrary_write_result(), 0..5)
) -> (Arc<MockCircuitBreaker>, Arc<MockWriteClient>) {
// Generate a mock client that returns all the errors/successes in
// the arbitrarily generated set, and then always succeeds.
let client = Arc::new(MockWriteClient::default().with_ret(
responses.into_iter().chain(iter::repeat_with(|| Ok(()))))
);
// Mark the upstream as arbitrarily healthy or unhealthy.
let circuit = Arc::new(MockCircuitBreaker::default());
circuit.set_healthy(healthy);
(circuit, client)
}
}
proptest! {
/// The invariants this property test asserts are:
///
/// 1. If the number of healthy upstreams is 0, NoHealthyUpstreams is
/// returned and no requests are attempted.
///
/// 2. Given N healthy upstreams (> 0) and a replication factor of R:
/// if N < R, "not enough replicas" is returned and no requests are
/// attempted.
///
/// 3. Upstreams that return an error are retried until the entire
/// write succeeds or times out.
///
/// 4. Writes are replicated to R distinct upstreams successfully, or
/// an error is returned.
///
/// 5. One an upstream write is ack'd as successful, it is never
/// requested again.
///
/// 6. An upstream reporting as unhealthy at the start of the write is
/// never requested (excluding probe requests).
///
#[test]
fn prop_distinct_upstreams(
upstreams in proptest::collection::vec(arbitrary_mock_upstream(), 1_usize..5),
n_copies in 1_usize..5,
) {
// Disallow invalid configurations
prop_assume!(n_copies <= upstreams.len());
// Run the request with the given upstreams and desired replication
// factor in an async context.
let res = runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on({
let upstreams = upstreams.clone();
async move {
let endpoints = upstreams.into_iter()
.map(|(circuit, client)| {
CircuitBreakingClient::new(client, "bananas")
.with_circuit_breaker(circuit)
});
make_request(endpoints, n_copies).await
}
});
// Compute the number of upstreams that were healthy at the time the
// request was made.
let healthy = upstreams.iter()
.filter(|(circuit, _client)| circuit.is_healthy())
.count();
if healthy == 0 {
// Invariant 1: no healthy upstreams yeilds the appropriate
// error.
assert_matches!(res, Err(RpcWriteError::NoHealthyUpstreams));
} else if healthy < n_copies {
// Invariant 2: if N < R yields a "not enough replicas" error
assert_matches!(res, Err(RpcWriteError::NotEnoughReplicas));
}
// For either 1 or 2, no requests should be sent as the unhappy case
// can be computed before performing network I/O.
if healthy < n_copies {
// Assert no upstream requests are made.
assert!(
upstreams.iter()
.all(|(_circuit, client)| client.calls().is_empty())
);
}
// Invariant 3 is validated by asserting that in the case of a write
// timing out, at least one upstream was tried more than once.
//
// This works because the number of distinct upstreams that will be
// requested is small enough that the timeout happens after having
// attempted each at least once.
if matches!(res, Err(RpcWriteError::Timeout(_) | RpcWriteError::PartialWrite {..})) {
assert!(upstreams.iter().any(|(_circuit, client)| client.calls().len() > 1));
}
// Invariant 4 is upheld by ensuring at least R upstreams returned
// success if the overall write succeeded, otherwise the result is
// an error.
let acks = upstreams.iter()
.filter(|(_circuit, client)| client.success_count() == 1)
.count();
assert_eq!(res.is_ok(), acks >= n_copies);
// Invariant 5 is validated by ensuring each mock only returned at
// most one Ok response.
//
// This property should hold regardless of the overall write
// succeeding or failing.
assert!(
upstreams.iter()
.all(|(_circuit, client)| client.success_count() <= 1)
);
// Invariant 6 is validated by ensuring all clients with unhealthy
// circuits never see a write request.
assert!(
upstreams.iter()
.filter(|(circuit, _client)| !circuit.is_healthy())
.all(|(_circuit, client)| client.calls().is_empty())
);
}
}
}