refactor: owned client in UpstreamSnapshot

Changes then UpstreamSnapshot to return owned clients, instead of
references to those clients.

This will allow the snapshot to have a 'static lifetime, suitable for
use across tasks.
pull/24376/head
Dom Dwyer 2023-05-09 15:21:10 +02:00
parent dc27ae5fbf
commit cdaf99268c
No known key found for this signature in database
GPG Key ID: E4C40DBD9157879A
4 changed files with 34 additions and 19 deletions

View File

@ -270,11 +270,11 @@ where
/// This function panics if `endpoints.next()` returns [`None`] (the number of /// This function panics if `endpoints.next()` returns [`None`] (the number of
/// upstreams should be validated before starting the write loop). /// upstreams should be validated before starting the write loop).
async fn write_loop<T>( async fn write_loop<T>(
endpoints: &mut UpstreamSnapshot<'_, T>, endpoints: &mut UpstreamSnapshot<T>,
req: &WriteRequest, req: &WriteRequest,
) -> Result<(), RpcWriteError> ) -> Result<(), RpcWriteError>
where where
T: WriteClient, T: WriteClient + Clone,
{ {
// The last error returned from an upstream write request attempt. // The last error returned from an upstream write request attempt.
let mut last_err = None; let mut last_err = None;

View File

@ -36,7 +36,7 @@ const METRIC_EVAL_INTERVAL: Duration = Duration::from_secs(3);
/// threads) an approximately uniform distribution is achieved. /// threads) an approximately uniform distribution is achieved.
#[derive(Debug)] #[derive(Debug)]
pub(super) struct Balancer<T, C = CircuitBreaker> { pub(super) struct Balancer<T, C = CircuitBreaker> {
endpoints: Arc<[CircuitBreakingClient<T, C>]>, endpoints: Arc<[Arc<CircuitBreakingClient<T, C>>]>,
/// An optional metric exporter task that evaluates the state of this /// An optional metric exporter task that evaluates the state of this
/// [`Balancer`] every [`METRIC_EVAL_INTERVAL`]. /// [`Balancer`] every [`METRIC_EVAL_INTERVAL`].
@ -54,7 +54,7 @@ where
endpoints: impl IntoIterator<Item = CircuitBreakingClient<T, C>>, endpoints: impl IntoIterator<Item = CircuitBreakingClient<T, C>>,
metrics: Option<&metric::Registry>, metrics: Option<&metric::Registry>,
) -> Self { ) -> Self {
let endpoints = endpoints.into_iter().collect(); let endpoints = endpoints.into_iter().map(Arc::new).collect();
Self { Self {
metric_task: metrics.map(|m| tokio::spawn(metric_task(m, Arc::clone(&endpoints)))), metric_task: metrics.map(|m| tokio::spawn(metric_task(m, Arc::clone(&endpoints)))),
endpoints, endpoints,
@ -73,7 +73,7 @@ where
/// evaluated at this point and the result is returned to the caller as an /// evaluated at this point and the result is returned to the caller as an
/// infinite / cycling iterator. A node that becomes unavailable after the /// infinite / cycling iterator. A node that becomes unavailable after the
/// snapshot was taken will continue to be returned by the iterator. /// snapshot was taken will continue to be returned by the iterator.
pub(super) fn endpoints(&self) -> Option<UpstreamSnapshot<'_, CircuitBreakingClient<T, C>>> { pub(super) fn endpoints(&self) -> Option<UpstreamSnapshot<Arc<CircuitBreakingClient<T, C>>>> {
// Grab and increment the current counter. // Grab and increment the current counter.
let counter = COUNTER.with(|cell| { let counter = COUNTER.with(|cell| {
let mut cell = cell.borrow_mut(); let mut cell = cell.borrow_mut();
@ -96,7 +96,7 @@ where
let mut healthy = Vec::with_capacity(self.endpoints.len()); let mut healthy = Vec::with_capacity(self.endpoints.len());
for e in &*self.endpoints { for e in &*self.endpoints {
if e.is_healthy() { if e.is_healthy() {
healthy.push(e); healthy.push(Arc::clone(e));
continue; continue;
} }
@ -104,7 +104,7 @@ where
// probe request - therefore it is added to the front of the // probe request - therefore it is added to the front of the
// iter/request queue. // iter/request queue.
if probe.is_none() && e.should_probe() { if probe.is_none() && e.should_probe() {
probe = Some(e); probe = Some(Arc::clone(e));
} }
} }
@ -128,7 +128,7 @@ where
/// health evaluation future that updates it. /// health evaluation future that updates it.
fn metric_task<T, C>( fn metric_task<T, C>(
metrics: &metric::Registry, metrics: &metric::Registry,
endpoints: Arc<[CircuitBreakingClient<T, C>]>, endpoints: Arc<[Arc<CircuitBreakingClient<T, C>>]>,
) -> impl Future<Output = ()> + Send ) -> impl Future<Output = ()> + Send
where where
T: Send + Sync + 'static, T: Send + Sync + 'static,
@ -144,7 +144,7 @@ where
async fn metric_loop<T, C>( async fn metric_loop<T, C>(
metric: metric::Metric<U64Gauge>, metric: metric::Metric<U64Gauge>,
endpoints: Arc<[CircuitBreakingClient<T, C>]>, endpoints: Arc<[Arc<CircuitBreakingClient<T, C>>]>,
) where ) where
T: Send + Sync + 'static, T: Send + Sync + 'static,
C: CircuitBreakerState + 'static, C: CircuitBreakerState + 'static,

View File

@ -1,5 +1,7 @@
//! Abstraction over RPC client //! Abstraction over RPC client
use std::sync::Arc;
use async_trait::async_trait; use async_trait::async_trait;
use generated_types::influxdata::iox::ingester::v1::{ use generated_types::influxdata::iox::ingester::v1::{
write_service_client::WriteServiceClient, WriteRequest, write_service_client::WriteServiceClient, WriteRequest,
@ -26,6 +28,16 @@ pub(super) trait WriteClient: Send + Sync + std::fmt::Debug {
async fn write(&self, op: WriteRequest) -> Result<(), RpcWriteClientError>; async fn write(&self, op: WriteRequest) -> Result<(), RpcWriteClientError>;
} }
#[async_trait]
impl<T> WriteClient for Arc<T>
where
T: WriteClient,
{
async fn write(&self, op: WriteRequest) -> Result<(), RpcWriteClientError> {
(**self).write(op).await
}
}
/// An implementation of [`WriteClient`] for the tonic gRPC client. /// An implementation of [`WriteClient`] for the tonic gRPC client.
#[async_trait] #[async_trait]
impl WriteClient for WriteServiceClient<tonic::transport::Channel> { impl WriteClient for WriteServiceClient<tonic::transport::Channel> {

View File

@ -6,12 +6,12 @@ use smallvec::SmallVec;
/// The last yielded element can be removed from the iterator by calling /// The last yielded element can be removed from the iterator by calling
/// [`UpstreamSnapshot::remove_last_unstable()`]. /// [`UpstreamSnapshot::remove_last_unstable()`].
#[derive(Debug)] #[derive(Debug)]
pub(super) struct UpstreamSnapshot<'a, C> { pub(super) struct UpstreamSnapshot<C> {
clients: SmallVec<[&'a C; 3]>, clients: SmallVec<[C; 3]>,
idx: usize, idx: usize,
} }
impl<'a, C> UpstreamSnapshot<'a, C> { impl<C> UpstreamSnapshot<C> {
/// Initialise a new snapshot, yielding the 0-indexed `i`-th element of /// Initialise a new snapshot, yielding the 0-indexed `i`-th element of
/// `clients` next (or wrapping around if `i` is out-of-bounds). /// `clients` next (or wrapping around if `i` is out-of-bounds).
/// ///
@ -19,8 +19,8 @@ impl<'a, C> UpstreamSnapshot<'a, C> {
/// allocation during construction. /// allocation during construction.
/// ///
/// If `clients` is empty, this method returns [`None`]. /// If `clients` is empty, this method returns [`None`].
pub(super) fn new(clients: impl Iterator<Item = &'a C>, i: usize) -> Option<Self> { pub(super) fn new(clients: impl Iterator<Item = C>, i: usize) -> Option<Self> {
let clients: SmallVec<[&'a C; 3]> = clients.collect(); let clients: SmallVec<[C; 3]> = clients.collect();
if clients.is_empty() { if clients.is_empty() {
return None; return None;
} }
@ -63,15 +63,18 @@ impl<'a, C> UpstreamSnapshot<'a, C> {
} }
} }
impl<'a, C> Iterator for UpstreamSnapshot<'a, C> { impl<C> Iterator for UpstreamSnapshot<C>
type Item = &'a C; where
C: Clone,
{
type Item = C;
fn next(&mut self) -> Option<Self::Item> { fn next(&mut self) -> Option<Self::Item> {
if self.clients.is_empty() { if self.clients.is_empty() {
return None; return None;
} }
self.idx = self.idx.wrapping_add(1); self.idx = self.idx.wrapping_add(1);
Some(self.clients[self.idx()]) Some(self.clients[self.idx()].clone())
} }
fn size_hint(&self) -> (usize, Option<usize>) { fn size_hint(&self) -> (usize, Option<usize>) {
@ -233,8 +236,8 @@ mod tests {
#[test] #[test]
fn test_empty_snap() { fn test_empty_snap() {
assert!(UpstreamSnapshot::<usize>::new([].iter(), 0).is_none()); assert!(UpstreamSnapshot::<usize>::new(std::iter::empty(), 0).is_none());
assert!(UpstreamSnapshot::<usize>::new([].iter(), 1).is_none()); assert!(UpstreamSnapshot::<usize>::new(std::iter::empty(), 1).is_none());
} }
proptest! { proptest! {