Merge pull request #7316 from influxdata/dom/replicate

feat: write replication
pull/24376/head
Dom 2023-03-27 10:13:11 +01:00 committed by GitHub
commit e067cfd3d7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 440 additions and 69 deletions

View File

@ -1,7 +1,10 @@
//! CLI config for the router using the RPC write path
use crate::ingester_address::IngesterAddress;
use std::{num::ParseIntError, time::Duration};
use std::{
num::{NonZeroUsize, ParseIntError},
time::Duration,
};
/// CLI config for the router using the RPC write path
#[derive(Debug, Clone, clap::Parser)]
@ -105,6 +108,17 @@ pub struct Router2Config {
value_parser = parse_duration
)]
pub rpc_write_timeout_seconds: Duration,
/// Specify the optional replication factor for each RPC write.
///
/// The total number of copies of data after replication will be this value,
/// plus 1.
///
/// If the desired replication level is not achieved, a partial write error
/// will be returned to the user. The write MAY be queryable after a partial
/// write failure.
#[clap(long = "rpc-write-replicas", env = "INFLUXDB_IOX_RPC_WRITE_REPLICAS")]
pub rpc_write_replicas: Option<NonZeroUsize>,
}
/// Map a string containing an integer number of seconds into a [`Duration`].

View File

@ -472,6 +472,7 @@ impl Config {
partition_key_pattern: "%Y-%m-%d".to_string(),
topic: QUERY_POOL_NAME.to_string(),
rpc_write_timeout_seconds: Duration::new(3, 0),
rpc_write_replicas: None,
};
// create a CompactorConfig for the all in one server based on

View File

@ -295,7 +295,7 @@ pub async fn create_router2_server_type(
});
// Initialise the DML handler that sends writes to the ingester using the RPC write path.
let rpc_writer = RpcWrite::new(ingester_connections, &metrics);
let rpc_writer = RpcWrite::new(ingester_connections, None, &metrics);
let rpc_writer = InstrumentationDecorator::new("rpc_writer", &metrics, rpc_writer);
// 1. END

View File

@ -23,7 +23,7 @@ use hashbrown::HashMap;
use mutable_batch::MutableBatch;
use mutable_batch_pb::encode::encode_write;
use observability_deps::tracing::*;
use std::{fmt::Debug, sync::Arc, time::Duration};
use std::{fmt::Debug, num::NonZeroUsize, sync::Arc, time::Duration};
use thiserror::Error;
use trace::ctx::SpanContext;
@ -54,6 +54,26 @@ pub enum RpcWriteError {
/// A delete request was rejected (not supported).
#[error("deletes are not supported")]
DeletesUnsupported,
/// The write request was not attempted, because not enough upstream
/// ingesters needed to satisfy the configured replication factor are
/// healthy.
#[error("not enough upstreams to satisfy write replication")]
NotEnoughReplicas,
/// A replicated write was attempted but not enough upstream ingesters
/// acknowledged the write to satisfy the desired replication factor.
#[error(
"not enough upstreams accepted replicated write; \
want {want_n_copies}, but only received {acks} acks"
)]
PartialWrite {
/// The total number of copies of data needed for this write to be
/// considered sucessful.
want_n_copies: usize,
/// The number of successful upstream ingester writes.
acks: usize,
},
}
/// An [`RpcWrite`] handler submits a write directly to an Ingester via the
@ -63,6 +83,16 @@ pub enum RpcWriteError {
/// distributed approximately uniformly across all downstream Ingesters. There
/// is no effort made to enforce or attempt data locality.
///
/// # Replication
///
/// If replication is configured, the total number of upstream ingesters
/// that acknowledge the write must be `replica_copies + 1` for the write to
/// be considered successful.
///
/// A write that is accepted by some upstream ingesters, but not enough to
/// satisfy the desired replication factor will be considered a partial failure,
/// and a [`RpcWriteError::PartialWrite`] error is returned.
///
/// # Deletes
///
/// This handler drops delete requests, logging the attempt and returning an
@ -72,23 +102,64 @@ pub enum RpcWriteError {
#[derive(Debug)]
pub struct RpcWrite<T, C = CircuitBreaker> {
endpoints: Balancer<T, C>,
/// The total number of distinct upstream ingesters to write a single
/// request to.
///
/// Invariant: after an upstream has ACKed a write, it MUST NOT be sent the
/// same write again. This minimises duplication within the same upstream,
/// for which no additional durability is realised.
///
/// NOTE: it is NOT possible to eliminate duplication of write requests
/// being sent to the same upstream, as an upstream (or a middle-man proxy)
/// may NACK a write, having already buffered the data. When this request is
/// retried, the data will be duplicated.
n_copies: usize,
}
impl<T> RpcWrite<T> {
/// Initialise a new [`RpcWrite`] that sends requests to an arbitrary
/// downstream Ingester, using a round-robin strategy.
pub fn new<N>(endpoints: impl IntoIterator<Item = (T, N)>, metrics: &metric::Registry) -> Self
///
/// If [`Some`], `replica_copies` specifies the number of additional
/// upstream ingesters that must receive and acknowledge the write for it to
/// be considered successful.
///
/// # Panics
///
/// It's invalid to configure `replica_copies` such that more ACKs are
/// needed than the number of `endpoints`; doing so will cause a panic.
pub fn new<N>(
endpoints: impl IntoIterator<Item = (T, N)>,
replica_copies: Option<NonZeroUsize>,
metrics: &metric::Registry,
) -> Self
where
T: Send + Sync + Debug + 'static,
N: Into<Arc<str>>,
{
let endpoints = Balancer::new(
endpoints
.into_iter()
.map(|(client, name)| CircuitBreakingClient::new(client, name.into())),
Some(metrics),
);
// Map the "replication factor" into the total number of distinct data
// copies necessary to consider a write a success.
let n_copies = replica_copies.map(NonZeroUsize::get).unwrap_or(1);
// Assert this configuration is not impossible to satisfy.
assert!(
n_copies <= endpoints.len(),
"cannot configure more write copies ({n_copies}) than ingester \
endpoints ({count})",
count = endpoints.len(),
);
Self {
endpoints: Balancer::new(
endpoints
.into_iter()
.map(|(client, name)| CircuitBreakingClient::new(client, name.into())),
Some(metrics),
),
endpoints,
n_copies,
}
}
}
@ -134,16 +205,52 @@ where
payload: Some(encode_write(namespace_id.get(), &op)),
};
// Perform the gRPC write to an ingester.
//
// This call is bounded to at most RPC_TIMEOUT duration of time.
write_loop(
self.endpoints
.endpoints()
.ok_or(RpcWriteError::NoUpstreams)?,
req,
)
.await?;
// Obtain a snapshot of currently-healthy upstreams (and potentially
// some that need probing)
let mut snap = self
.endpoints
.endpoints()
.ok_or(RpcWriteError::NoUpstreams)?;
// Validate the required number of writes is possible given the current
// number of healthy endpoints.
if snap.len() < self.n_copies {
return Err(RpcWriteError::NotEnoughReplicas);
}
// Write the desired number of copies of `req`.
for i in 0..self.n_copies {
// Perform the gRPC write to an ingester.
//
// This call is bounded to at most RPC_TIMEOUT duration of time.
write_loop(&mut snap, &req).await.map_err(|e| {
// In all cases, if at least one write succeeded, then this
// becomes a partial write error.
if i > 0 {
return RpcWriteError::PartialWrite {
want_n_copies: self.n_copies,
acks: i,
};
}
// This error was for the first request - there have been no
// ACKs received.
match e {
// This error is an internal implementation detail - the
// meaningful error for the user is "there's no healthy
// upstreams".
RpcWriteError::UpstreamNotConnected(_) => RpcWriteError::NoUpstreams,
// The number of upstreams no longer satisfies the desired
// replication factor.
RpcWriteError::NoUpstreams => RpcWriteError::NotEnoughReplicas,
// All other errors pass through.
v => v,
}
})?;
// Remove the upstream that was successfully wrote to from the
// candidates
snap.remove_last_unstable();
}
debug!(
%partition_key,
@ -184,8 +291,8 @@ where
/// If at least one upstream request has failed (returning an error), the most
/// recent error is returned.
async fn write_loop<T>(
mut endpoints: UpstreamSnapshot<'_, T>,
req: WriteRequest,
endpoints: &mut UpstreamSnapshot<'_, T>,
req: &WriteRequest,
) -> Result<(), RpcWriteError>
where
T: WriteClient,
@ -216,9 +323,6 @@ where
})
.await
.map_err(|e| match last_err {
// This error is an internal implementation detail - the meaningful
// error for the user is "there's no healthy upstreams".
Some(RpcWriteError::UpstreamNotConnected(_)) => RpcWriteError::NoUpstreams,
// Any other error is returned as-is.
Some(v) => v,
// If the entire write attempt fails during the first RPC write
@ -244,6 +348,7 @@ mod tests {
use assert_matches::assert_matches;
use data_types::PartitionKey;
use rand::seq::SliceRandom;
use crate::dml_handlers::rpc_write::circuit_breaking_client::mock::MockCircuitBreaker;
@ -264,6 +369,49 @@ mod tests {
const NAMESPACE_NAME: &str = "bananas";
const NAMESPACE_ID: NamespaceId = NamespaceId::new(42);
/// A helper function to perform an arbitrary write against `endpoints`,
/// with the given number of desired distinct data copies.
async fn make_request<T, C>(
endpoints: impl IntoIterator<Item = CircuitBreakingClient<T, C>> + Send,
n_copies: usize,
) -> Result<Vec<DmlMeta>, RpcWriteError>
where
T: WriteClient + 'static,
C: CircuitBreakerState + 'static,
{
let handler = RpcWrite {
endpoints: Balancer::new(endpoints, None),
n_copies,
};
assert!(
n_copies <= handler.endpoints.len(),
"cannot configure more write copies ({n_copies}) than ingester \
endpoints ({count})",
count = handler.endpoints.len(),
);
// Generate some write input
let input = Partitioned::new(
PartitionKey::from("2022-01-01"),
lp_to_writes("bananas,tag1=A,tag2=B val=42i 1"),
);
// Use tokio's "auto-advance" time feature to avoid waiting for the
// actual timeout duration.
tokio::time::pause();
// Drive the RPC writer
handler
.write(
&NamespaceName::new(NAMESPACE_NAME).unwrap(),
NAMESPACE_ID,
input,
None,
)
.await
}
#[tokio::test]
async fn test_write() {
let batches = lp_to_writes(
@ -283,6 +431,7 @@ mod tests {
let client = Arc::new(MockWriteClient::default());
let handler = RpcWrite::new(
[(Arc::clone(&client), "mock client")],
None,
&metric::Registry::default(),
);
@ -344,6 +493,7 @@ mod tests {
(Arc::clone(&client2), "client2"),
(Arc::clone(&client3), "client3"),
],
None,
&metric::Registry::default(),
);
@ -418,6 +568,7 @@ mod tests {
(Arc::clone(&client1), "client1"),
(Arc::clone(&client2), "client2"),
],
None,
&metric::Registry::default(),
);
@ -458,38 +609,6 @@ mod tests {
assert_eq!(got_tables, want_tables);
}
async fn make_request<T, C>(
endpoints: impl IntoIterator<Item = CircuitBreakingClient<T, C>> + Send,
) -> Result<Vec<DmlMeta>, RpcWriteError>
where
T: WriteClient + 'static,
C: CircuitBreakerState + 'static,
{
let handler = RpcWrite {
endpoints: Balancer::new(endpoints, None),
};
// Generate some write input
let input = Partitioned::new(
PartitionKey::from("2022-01-01"),
lp_to_writes("bananas,tag1=A,tag2=B val=42i 1"),
);
// Use tokio's "auto-advance" time feature to avoid waiting for the
// actual timeout duration.
tokio::time::pause();
// Drive the RPC writer
handler
.write(
&NamespaceName::new(NAMESPACE_NAME).unwrap(),
NAMESPACE_ID,
input,
None,
)
.await
}
/// Assert the error response for a write request when there are no healthy
/// upstreams.
#[tokio::test]
@ -500,9 +619,10 @@ mod tests {
// Mark the client circuit breaker as unhealthy
circuit_1.set_healthy(false);
let got = make_request([
CircuitBreakingClient::new(client_1, "client_1").with_circuit_breaker(circuit_1)
])
let got = make_request(
[CircuitBreakingClient::new(client_1, "client_1").with_circuit_breaker(circuit_1)],
1,
)
.await;
assert_matches!(got, Err(RpcWriteError::NoUpstreams));
@ -520,9 +640,10 @@ mod tests {
let circuit_1 = Arc::new(MockCircuitBreaker::default());
circuit_1.set_healthy(true);
let got = make_request([
CircuitBreakingClient::new(client_1, "client_1").with_circuit_breaker(circuit_1)
])
let got = make_request(
[CircuitBreakingClient::new(client_1, "client_1").with_circuit_breaker(circuit_1)],
1,
)
.await;
assert_matches!(got, Err(RpcWriteError::Upstream(s)) => {
@ -543,11 +664,236 @@ mod tests {
let circuit_1 = Arc::new(MockCircuitBreaker::default());
circuit_1.set_healthy(true);
let got = make_request([
CircuitBreakingClient::new(client_1, "client_1").with_circuit_breaker(circuit_1)
])
let got = make_request(
[CircuitBreakingClient::new(client_1, "client_1").with_circuit_breaker(circuit_1)],
1,
)
.await;
assert_matches!(got, Err(RpcWriteError::NoUpstreams));
}
/// Assert that an error is returned without any RPC request being made when
/// the number of healthy upstreams is less than the desired replication
/// factor.
#[tokio::test]
async fn test_write_not_enough_upstreams_for_replication() {
// Initialise two upstreams, 1 healthy, 1 not.
let client_1 = Arc::new(
MockWriteClient::default().with_ret(Box::new(iter::repeat_with(|| {
Err(RpcWriteError::UpstreamNotConnected("bananas".to_string()))
}))),
);
let circuit_1 = Arc::new(MockCircuitBreaker::default());
circuit_1.set_healthy(true);
let client_2 = Arc::new(
MockWriteClient::default().with_ret(Box::new(iter::repeat_with(|| {
Err(RpcWriteError::UpstreamNotConnected("bananas".to_string()))
}))),
);
let circuit_2 = Arc::new(MockCircuitBreaker::default());
circuit_2.set_healthy(false);
let got = make_request(
[
CircuitBreakingClient::new(client_1, "client_1").with_circuit_breaker(circuit_1),
CircuitBreakingClient::new(client_2, "client_2").with_circuit_breaker(circuit_2),
],
2, // 2 copies required
)
.await;
assert_matches!(got, Err(RpcWriteError::NotEnoughReplicas));
}
/// Assert distinct upstreams are wrote to.
#[tokio::test]
async fn test_write_replication_distinct_hosts() {
// Initialise two upstreams.
let client_1 = Arc::new(MockWriteClient::default().with_ret(Box::new(iter::once(Ok(())))));
let circuit_1 = Arc::new(MockCircuitBreaker::default());
circuit_1.set_healthy(true);
let client_2 = Arc::new(MockWriteClient::default().with_ret(Box::new(iter::once(Ok(())))));
let circuit_2 = Arc::new(MockCircuitBreaker::default());
circuit_2.set_healthy(true);
let got = make_request(
[
CircuitBreakingClient::new(Arc::clone(&client_1), "client_1")
.with_circuit_breaker(circuit_1),
CircuitBreakingClient::new(Arc::clone(&client_2), "client_2")
.with_circuit_breaker(circuit_2),
],
2, // 2 copies required
)
.await;
assert_matches!(got, Ok(_));
// Assert each client received one (matching) write each
let calls_1 = client_1.calls();
assert_eq!(calls_1.len(), 1);
assert_eq!(calls_1, client_2.calls());
}
/// Assert that once a request has been sent to an upstream, it is never
/// retried again.
#[tokio::test]
async fn test_write_replication_distinct_hosts_partial_write() {
// Initialise two upstreams, 1 willing to ACK a write, and the other
// always throwing an error.
let client_1 = Arc::new(MockWriteClient::default().with_ret(Box::new(iter::once(Ok(())))));
let circuit_1 = Arc::new(MockCircuitBreaker::default());
circuit_1.set_healthy(true);
let client_2 = Arc::new(
MockWriteClient::default().with_ret(Box::new(iter::repeat_with(|| {
Err(RpcWriteError::Upstream(tonic::Status::internal("bananas")))
}))),
);
let circuit_2 = Arc::new(MockCircuitBreaker::default());
circuit_2.set_healthy(true);
let mut clients = vec![
CircuitBreakingClient::new(Arc::clone(&client_1), "client_1")
.with_circuit_breaker(circuit_1),
CircuitBreakingClient::new(Arc::clone(&client_2), "client_2")
.with_circuit_breaker(circuit_2),
];
// The order should never affect the outcome.
clients.shuffle(&mut rand::thread_rng());
let got = make_request(
clients, 2, // 2 copies required
)
.await;
assert_matches!(
got,
Err(RpcWriteError::PartialWrite {
want_n_copies: 2,
acks: 1
})
);
// Assert the healthy upstream was only ever called once.
assert_eq!(client_1.calls().len(), 1);
}
/// Replication writes must tolerate a transient error from an upstream.
#[tokio::test]
async fn test_write_replication_tolerates_temporary_error() {
// Initialise two upstreams, 1 willing to ACK a write, and the other
// always throwing an error.
let client_1 = Arc::new(MockWriteClient::default().with_ret(Box::new(iter::once(Ok(())))));
let circuit_1 = Arc::new(MockCircuitBreaker::default());
circuit_1.set_healthy(true);
let client_2 = Arc::new(
MockWriteClient::default().with_ret(Box::new(
[
Err(RpcWriteError::Upstream(tonic::Status::internal("bananas"))),
Ok(()),
]
.into_iter(),
)),
);
let circuit_2 = Arc::new(MockCircuitBreaker::default());
circuit_2.set_healthy(true);
let got = make_request(
[
CircuitBreakingClient::new(Arc::clone(&client_1), "client_1")
.with_circuit_breaker(circuit_1),
CircuitBreakingClient::new(Arc::clone(&client_2), "client_2")
.with_circuit_breaker(circuit_2),
],
2, // 2 copies required
)
.await;
assert_matches!(got, Ok(_));
// Assert the happy upstream was only ever called once.
let calls_1 = client_1.calls();
assert_eq!(calls_1.len(), 1);
// The unhappy upstream was retried
let calls_2 = client_2.calls();
assert_eq!(calls_2.len(), 2);
// The two upstreams got the same request.
assert_eq!(calls_1[0], calls_2[0]);
// And the second request to the unhappy upstream matched the first.
assert_eq!(calls_2[0], calls_2[1]);
}
/// Replication writes find the desired number of replicas by trying all
/// upstreams.
#[tokio::test]
async fn test_write_replication_tolerates_bad_upstream() {
// Initialise three upstreams, 1 willing to ACK a write immediately, the
// second will error twice, and the third always errors.
let client_1 = Arc::new(MockWriteClient::default().with_ret(Box::new(iter::once(Ok(())))));
let circuit_1 = Arc::new(MockCircuitBreaker::default());
circuit_1.set_healthy(true);
// This client sometimes errors (2 times)
let client_2 = Arc::new(
MockWriteClient::default().with_ret(Box::new(
[
Err(RpcWriteError::Upstream(tonic::Status::internal("bananas"))),
Err(RpcWriteError::Upstream(tonic::Status::internal("bananas"))),
Ok(()),
]
.into_iter(),
)),
);
let circuit_2 = Arc::new(MockCircuitBreaker::default());
circuit_2.set_healthy(true);
// This client always errors
let client_3 = Arc::new(
MockWriteClient::default().with_ret(Box::new(iter::repeat_with(|| {
Err(RpcWriteError::UpstreamNotConnected("bananas".to_string()))
}))),
);
let circuit_3 = Arc::new(MockCircuitBreaker::default());
circuit_3.set_healthy(true);
let mut clients = vec![
CircuitBreakingClient::new(Arc::clone(&client_1), "client_1")
.with_circuit_breaker(circuit_1),
CircuitBreakingClient::new(Arc::clone(&client_2), "client_2")
.with_circuit_breaker(circuit_2),
CircuitBreakingClient::new(Arc::clone(&client_3), "client_3")
.with_circuit_breaker(circuit_3),
];
// The order should never affect the outcome.
clients.shuffle(&mut rand::thread_rng());
let got = make_request(
clients, 2, // 2 copies required
)
.await;
assert_matches!(got, Ok(_));
// Assert the happy upstream was only ever called once.
let calls_1 = client_1.calls();
assert_eq!(calls_1.len(), 1);
// The sometimes error upstream was retried until it succeeded.
let calls_2 = client_2.calls();
assert_eq!(calls_2.len(), 3);
// All requests were equal.
assert!(calls_1
.iter()
.chain(calls_2.iter())
.chain(client_3.calls().iter())
.all(|v| *v == calls_1[0]));
}
}

View File

@ -61,6 +61,11 @@ where
}
}
/// Returns the number of configured upstream endpoints.
pub(super) fn len(&self) -> usize {
self.endpoints.len()
}
/// Return an (infinite) iterator of healthy [`CircuitBreakingClient`], and
/// at most one client needing a health probe.
///

View File

@ -128,7 +128,9 @@ fn is_envoy_unavailable_error(e: &RpcWriteError) -> bool {
| RpcWriteError::Timeout(_)
| RpcWriteError::NoUpstreams
| RpcWriteError::UpstreamNotConnected(_)
| RpcWriteError::DeletesUnsupported => false,
| RpcWriteError::DeletesUnsupported
| RpcWriteError::PartialWrite { .. }
| RpcWriteError::NotEnoughReplicas => false,
}
}

View File

@ -180,7 +180,10 @@ impl From<&DmlError> for StatusCode {
DmlError::RpcWrite(RpcWriteError::DeletesUnsupported) => StatusCode::NOT_IMPLEMENTED,
DmlError::RpcWrite(RpcWriteError::Timeout(_)) => StatusCode::GATEWAY_TIMEOUT,
DmlError::RpcWrite(
RpcWriteError::NoUpstreams | RpcWriteError::UpstreamNotConnected(_),
RpcWriteError::NoUpstreams
| RpcWriteError::NotEnoughReplicas
| RpcWriteError::PartialWrite { .. }
| RpcWriteError::UpstreamNotConnected(_),
) => StatusCode::SERVICE_UNAVAILABLE,
}
}

View File

@ -122,7 +122,7 @@ impl TestContext {
metrics: Arc<metric::Registry>,
) -> Self {
let client = Arc::new(MockWriteClient::default());
let rpc_writer = RpcWrite::new([(Arc::clone(&client), "mock client")], &metrics);
let rpc_writer = RpcWrite::new([(Arc::clone(&client), "mock client")], None, &metrics);
let ns_cache = Arc::new(ShardedCache::new(
iter::repeat_with(|| Arc::new(MemoryNamespaceCache::default())).take(10),