From cdaf99268c4c911c73d32ffe372ff147789d1d2d Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Tue, 9 May 2023 15:21:10 +0200 Subject: [PATCH] refactor: owned client in UpstreamSnapshot Changes then UpstreamSnapshot to return owned clients, instead of references to those clients. This will allow the snapshot to have a 'static lifetime, suitable for use across tasks. --- router/src/dml_handlers/rpc_write.rs | 4 ++-- router/src/dml_handlers/rpc_write/balancer.rs | 14 +++++------ router/src/dml_handlers/rpc_write/client.rs | 12 ++++++++++ .../rpc_write/upstream_snapshot.rs | 23 +++++++++++-------- 4 files changed, 34 insertions(+), 19 deletions(-) diff --git a/router/src/dml_handlers/rpc_write.rs b/router/src/dml_handlers/rpc_write.rs index a65d366303..329c0d0760 100644 --- a/router/src/dml_handlers/rpc_write.rs +++ b/router/src/dml_handlers/rpc_write.rs @@ -270,11 +270,11 @@ where /// This function panics if `endpoints.next()` returns [`None`] (the number of /// upstreams should be validated before starting the write loop). async fn write_loop( - endpoints: &mut UpstreamSnapshot<'_, T>, + endpoints: &mut UpstreamSnapshot, req: &WriteRequest, ) -> Result<(), RpcWriteError> where - T: WriteClient, + T: WriteClient + Clone, { // The last error returned from an upstream write request attempt. let mut last_err = None; diff --git a/router/src/dml_handlers/rpc_write/balancer.rs b/router/src/dml_handlers/rpc_write/balancer.rs index fecf664722..6ab55c5d63 100644 --- a/router/src/dml_handlers/rpc_write/balancer.rs +++ b/router/src/dml_handlers/rpc_write/balancer.rs @@ -36,7 +36,7 @@ const METRIC_EVAL_INTERVAL: Duration = Duration::from_secs(3); /// threads) an approximately uniform distribution is achieved. #[derive(Debug)] pub(super) struct Balancer { - endpoints: Arc<[CircuitBreakingClient]>, + endpoints: Arc<[Arc>]>, /// An optional metric exporter task that evaluates the state of this /// [`Balancer`] every [`METRIC_EVAL_INTERVAL`]. @@ -54,7 +54,7 @@ where endpoints: impl IntoIterator>, metrics: Option<&metric::Registry>, ) -> Self { - let endpoints = endpoints.into_iter().collect(); + let endpoints = endpoints.into_iter().map(Arc::new).collect(); Self { metric_task: metrics.map(|m| tokio::spawn(metric_task(m, Arc::clone(&endpoints)))), endpoints, @@ -73,7 +73,7 @@ where /// evaluated at this point and the result is returned to the caller as an /// infinite / cycling iterator. A node that becomes unavailable after the /// snapshot was taken will continue to be returned by the iterator. - pub(super) fn endpoints(&self) -> Option>> { + pub(super) fn endpoints(&self) -> Option>>> { // Grab and increment the current counter. let counter = COUNTER.with(|cell| { let mut cell = cell.borrow_mut(); @@ -96,7 +96,7 @@ where let mut healthy = Vec::with_capacity(self.endpoints.len()); for e in &*self.endpoints { if e.is_healthy() { - healthy.push(e); + healthy.push(Arc::clone(e)); continue; } @@ -104,7 +104,7 @@ where // probe request - therefore it is added to the front of the // iter/request queue. if probe.is_none() && e.should_probe() { - probe = Some(e); + probe = Some(Arc::clone(e)); } } @@ -128,7 +128,7 @@ where /// health evaluation future that updates it. fn metric_task( metrics: &metric::Registry, - endpoints: Arc<[CircuitBreakingClient]>, + endpoints: Arc<[Arc>]>, ) -> impl Future + Send where T: Send + Sync + 'static, @@ -144,7 +144,7 @@ where async fn metric_loop( metric: metric::Metric, - endpoints: Arc<[CircuitBreakingClient]>, + endpoints: Arc<[Arc>]>, ) where T: Send + Sync + 'static, C: CircuitBreakerState + 'static, diff --git a/router/src/dml_handlers/rpc_write/client.rs b/router/src/dml_handlers/rpc_write/client.rs index b39bb3ab1d..21c6b7b1ca 100644 --- a/router/src/dml_handlers/rpc_write/client.rs +++ b/router/src/dml_handlers/rpc_write/client.rs @@ -1,5 +1,7 @@ //! Abstraction over RPC client +use std::sync::Arc; + use async_trait::async_trait; use generated_types::influxdata::iox::ingester::v1::{ write_service_client::WriteServiceClient, WriteRequest, @@ -26,6 +28,16 @@ pub(super) trait WriteClient: Send + Sync + std::fmt::Debug { async fn write(&self, op: WriteRequest) -> Result<(), RpcWriteClientError>; } +#[async_trait] +impl WriteClient for Arc +where + T: WriteClient, +{ + async fn write(&self, op: WriteRequest) -> Result<(), RpcWriteClientError> { + (**self).write(op).await + } +} + /// An implementation of [`WriteClient`] for the tonic gRPC client. #[async_trait] impl WriteClient for WriteServiceClient { diff --git a/router/src/dml_handlers/rpc_write/upstream_snapshot.rs b/router/src/dml_handlers/rpc_write/upstream_snapshot.rs index b33c5289c6..f545bbbbfd 100644 --- a/router/src/dml_handlers/rpc_write/upstream_snapshot.rs +++ b/router/src/dml_handlers/rpc_write/upstream_snapshot.rs @@ -6,12 +6,12 @@ use smallvec::SmallVec; /// The last yielded element can be removed from the iterator by calling /// [`UpstreamSnapshot::remove_last_unstable()`]. #[derive(Debug)] -pub(super) struct UpstreamSnapshot<'a, C> { - clients: SmallVec<[&'a C; 3]>, +pub(super) struct UpstreamSnapshot { + clients: SmallVec<[C; 3]>, idx: usize, } -impl<'a, C> UpstreamSnapshot<'a, C> { +impl UpstreamSnapshot { /// Initialise a new snapshot, yielding the 0-indexed `i`-th element of /// `clients` next (or wrapping around if `i` is out-of-bounds). /// @@ -19,8 +19,8 @@ impl<'a, C> UpstreamSnapshot<'a, C> { /// allocation during construction. /// /// If `clients` is empty, this method returns [`None`]. - pub(super) fn new(clients: impl Iterator, i: usize) -> Option { - let clients: SmallVec<[&'a C; 3]> = clients.collect(); + pub(super) fn new(clients: impl Iterator, i: usize) -> Option { + let clients: SmallVec<[C; 3]> = clients.collect(); if clients.is_empty() { return None; } @@ -63,15 +63,18 @@ impl<'a, C> UpstreamSnapshot<'a, C> { } } -impl<'a, C> Iterator for UpstreamSnapshot<'a, C> { - type Item = &'a C; +impl Iterator for UpstreamSnapshot +where + C: Clone, +{ + type Item = C; fn next(&mut self) -> Option { if self.clients.is_empty() { return None; } self.idx = self.idx.wrapping_add(1); - Some(self.clients[self.idx()]) + Some(self.clients[self.idx()].clone()) } fn size_hint(&self) -> (usize, Option) { @@ -233,8 +236,8 @@ mod tests { #[test] fn test_empty_snap() { - assert!(UpstreamSnapshot::::new([].iter(), 0).is_none()); - assert!(UpstreamSnapshot::::new([].iter(), 1).is_none()); + assert!(UpstreamSnapshot::::new(std::iter::empty(), 0).is_none()); + assert!(UpstreamSnapshot::::new(std::iter::empty(), 1).is_none()); } proptest! {