From 125fef388c1d1394cf571da395f986d42aee7c1e Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Thu, 23 Mar 2023 17:33:55 +0100 Subject: [PATCH 1/3] feat: MVP replication support This commit implements replication for the router's RpcWrite handler. The desired number of replica copies is specified at startup time, and each user write will be fanned-out with the specified replication factor (replicas + 1). A failure to write to any upstreams returns the write error, but a failure to obtain enough ACKs (enough successful writes) after at least 1 ACK will return a "partial write" error - this differentiation is important, as the user's write will be readable after a partial write error has occurred. This currently writes to upstreams serially; this is clearly an opportunity for improvement! A follow-on PR will parallelise writes across the desired number of replicas while maintaining the "at most one ack'd write to one host" invariant. Note that replication is currently hard-coded as disabled. --- ioxd_router/src/lib.rs | 2 +- router/src/dml_handlers/rpc_write.rs | 474 +++++++++++++++--- router/src/dml_handlers/rpc_write/balancer.rs | 5 + .../dml_handlers/rpc_write/lazy_connector.rs | 4 +- router/src/server/http.rs | 5 +- router/tests/common/mod.rs | 2 +- 6 files changed, 424 insertions(+), 68 deletions(-) diff --git a/ioxd_router/src/lib.rs b/ioxd_router/src/lib.rs index 022c260d2b..736cecf86f 100644 --- a/ioxd_router/src/lib.rs +++ b/ioxd_router/src/lib.rs @@ -295,7 +295,7 @@ pub async fn create_router2_server_type( }); // Initialise the DML handler that sends writes to the ingester using the RPC write path. - let rpc_writer = RpcWrite::new(ingester_connections, &metrics); + let rpc_writer = RpcWrite::new(ingester_connections, None, &metrics); let rpc_writer = InstrumentationDecorator::new("rpc_writer", &metrics, rpc_writer); // 1. END diff --git a/router/src/dml_handlers/rpc_write.rs b/router/src/dml_handlers/rpc_write.rs index d1add64be8..5c5b672dad 100644 --- a/router/src/dml_handlers/rpc_write.rs +++ b/router/src/dml_handlers/rpc_write.rs @@ -23,7 +23,7 @@ use hashbrown::HashMap; use mutable_batch::MutableBatch; use mutable_batch_pb::encode::encode_write; use observability_deps::tracing::*; -use std::{fmt::Debug, sync::Arc, time::Duration}; +use std::{fmt::Debug, num::NonZeroUsize, sync::Arc, time::Duration}; use thiserror::Error; use trace::ctx::SpanContext; @@ -54,6 +54,26 @@ pub enum RpcWriteError { /// A delete request was rejected (not supported). #[error("deletes are not supported")] DeletesUnsupported, + + /// The write request was not attempted, because not enough upstream + /// ingesters needed to satisfy the configured replication factor are + /// healthy. + #[error("not enough upstreams to satisfy write replication")] + NotEnoughReplicas, + + /// A replicated write was attempted but not enough upstream ingesters + /// acknowledged the write to satisfy the desired replication factor. + #[error( + "not enough upstreams accepted replicated write; \ + want {want_n_copies}, but only received {acks} acks" + )] + PartialWrite { + /// The total number of copies of data needed for this write to be + /// considered sucessful. + want_n_copies: usize, + /// The number of successful upstream ingester writes. + acks: usize, + }, } /// An [`RpcWrite`] handler submits a write directly to an Ingester via the @@ -63,6 +83,16 @@ pub enum RpcWriteError { /// distributed approximately uniformly across all downstream Ingesters. There /// is no effort made to enforce or attempt data locality. /// +/// # Replication +/// +/// If replication is configured, the total number of upstream ingesters +/// that acknowledge the write must be `replica_copies + 1` for the write to +/// be considered successful. +/// +/// A write that is accepted by some upstream ingesters, but not enough to +/// satisfy the desired replication factor will be considered a partial failure, +/// and a [`RpcWriteError::PartialWrite`] error is returned. +/// /// # Deletes /// /// This handler drops delete requests, logging the attempt and returning an @@ -72,23 +102,64 @@ pub enum RpcWriteError { #[derive(Debug)] pub struct RpcWrite { endpoints: Balancer, + + /// The total number of distinct upstream ingesters to write a single + /// request to. + /// + /// Invariant: after an upstream has ACKed a write, it MUST NOT be sent the + /// same write again. This minimises duplication within the same upstream, + /// for which no additional durability is realised. + /// + /// NOTE: it is NOT possible to eliminate duplication of write requests + /// being sent to the same upstream, as an upstream (or a middle-man proxy) + /// may NACK a write, having already buffered the data. When this request is + /// retried, the data will be duplicated. + n_copies: usize, } impl RpcWrite { /// Initialise a new [`RpcWrite`] that sends requests to an arbitrary /// downstream Ingester, using a round-robin strategy. - pub fn new(endpoints: impl IntoIterator, metrics: &metric::Registry) -> Self + /// + /// If [`Some`], `replica_copies` specifies the number of additional + /// upstream ingesters that must receive and acknowledge the write for it to + /// be considered successful. + /// + /// # Panics + /// + /// It's invalid to configure `replica_copies` such that more ACKs are + /// needed than the number of `endpoints`; doing so will cause a panic. + pub fn new( + endpoints: impl IntoIterator, + replica_copies: Option, + metrics: &metric::Registry, + ) -> Self where T: Send + Sync + Debug + 'static, N: Into>, { + let endpoints = Balancer::new( + endpoints + .into_iter() + .map(|(client, name)| CircuitBreakingClient::new(client, name.into())), + Some(metrics), + ); + + // Map the "replication factor" into the total number of distinct data + // copies necessary to consider a write a success. + let n_copies = replica_copies.map(NonZeroUsize::get).unwrap_or(1); + + // Assert this configuration is not impossible to satisfy. + assert!( + n_copies <= endpoints.len(), + "cannot configure more write copies ({n_copies}) than ingester \ + endpoints ({count})", + count = endpoints.len(), + ); + Self { - endpoints: Balancer::new( - endpoints - .into_iter() - .map(|(client, name)| CircuitBreakingClient::new(client, name.into())), - Some(metrics), - ), + endpoints, + n_copies: replica_copies.map(NonZeroUsize::get).unwrap_or(1), } } } @@ -134,16 +205,52 @@ where payload: Some(encode_write(namespace_id.get(), &op)), }; - // Perform the gRPC write to an ingester. - // - // This call is bounded to at most RPC_TIMEOUT duration of time. - write_loop( - self.endpoints - .endpoints() - .ok_or(RpcWriteError::NoUpstreams)?, - req, - ) - .await?; + // Obtain a snapshot of currently-healthy upstreams (and potentially + // some that need probing) + let mut snap = self + .endpoints + .endpoints() + .ok_or(RpcWriteError::NoUpstreams)?; + + // Validate the required number of writes is possible given the current + // number of healthy endpoints. + if snap.len() < self.n_copies { + return Err(RpcWriteError::NotEnoughReplicas); + } + + // Write the desired number of copies of `req`. + for i in 0..self.n_copies { + // Perform the gRPC write to an ingester. + // + // This call is bounded to at most RPC_TIMEOUT duration of time. + write_loop(&mut snap, &req).await.map_err(|e| { + // In all cases, if at least one write succeeded, then this + // becomes a partial write error. + if i > 0 { + return RpcWriteError::PartialWrite { + want_n_copies: self.n_copies, + acks: i, + }; + } + + // This error was for the first request - there have been no + // ACKs received. + match e { + // This error is an internal implementation detail - the + // meaningful error for the user is "there's no healthy + // upstreams". + RpcWriteError::UpstreamNotConnected(_) => RpcWriteError::NoUpstreams, + // The number of upstreams no longer satisfies the desired + // replication factor. + RpcWriteError::NoUpstreams => RpcWriteError::NotEnoughReplicas, + // All other errors pass through. + v => v, + } + })?; + // Remove the upstream that was successfully wrote to from the + // candidates + snap.remove_last_unstable(); + } debug!( %partition_key, @@ -184,8 +291,8 @@ where /// If at least one upstream request has failed (returning an error), the most /// recent error is returned. async fn write_loop( - mut endpoints: UpstreamSnapshot<'_, T>, - req: WriteRequest, + endpoints: &mut UpstreamSnapshot<'_, T>, + req: &WriteRequest, ) -> Result<(), RpcWriteError> where T: WriteClient, @@ -216,9 +323,6 @@ where }) .await .map_err(|e| match last_err { - // This error is an internal implementation detail - the meaningful - // error for the user is "there's no healthy upstreams". - Some(RpcWriteError::UpstreamNotConnected(_)) => RpcWriteError::NoUpstreams, // Any other error is returned as-is. Some(v) => v, // If the entire write attempt fails during the first RPC write @@ -244,6 +348,7 @@ mod tests { use assert_matches::assert_matches; use data_types::PartitionKey; + use rand::seq::SliceRandom; use crate::dml_handlers::rpc_write::circuit_breaking_client::mock::MockCircuitBreaker; @@ -264,6 +369,49 @@ mod tests { const NAMESPACE_NAME: &str = "bananas"; const NAMESPACE_ID: NamespaceId = NamespaceId::new(42); + /// A helper function to perform an arbitrary write against `endpoints`, + /// with the given number of desired distinct data copies. + async fn make_request( + endpoints: impl IntoIterator> + Send, + n_copies: usize, + ) -> Result, RpcWriteError> + where + T: WriteClient + 'static, + C: CircuitBreakerState + 'static, + { + let handler = RpcWrite { + endpoints: Balancer::new(endpoints, None), + n_copies, + }; + + assert!( + n_copies <= handler.endpoints.len(), + "cannot configure more write copies ({n_copies}) than ingester \ + endpoints ({count})", + count = handler.endpoints.len(), + ); + + // Generate some write input + let input = Partitioned::new( + PartitionKey::from("2022-01-01"), + lp_to_writes("bananas,tag1=A,tag2=B val=42i 1"), + ); + + // Use tokio's "auto-advance" time feature to avoid waiting for the + // actual timeout duration. + tokio::time::pause(); + + // Drive the RPC writer + handler + .write( + &NamespaceName::new(NAMESPACE_NAME).unwrap(), + NAMESPACE_ID, + input, + None, + ) + .await + } + #[tokio::test] async fn test_write() { let batches = lp_to_writes( @@ -283,6 +431,7 @@ mod tests { let client = Arc::new(MockWriteClient::default()); let handler = RpcWrite::new( [(Arc::clone(&client), "mock client")], + None, &metric::Registry::default(), ); @@ -344,6 +493,7 @@ mod tests { (Arc::clone(&client2), "client2"), (Arc::clone(&client3), "client3"), ], + None, &metric::Registry::default(), ); @@ -418,6 +568,7 @@ mod tests { (Arc::clone(&client1), "client1"), (Arc::clone(&client2), "client2"), ], + None, &metric::Registry::default(), ); @@ -458,38 +609,6 @@ mod tests { assert_eq!(got_tables, want_tables); } - async fn make_request( - endpoints: impl IntoIterator> + Send, - ) -> Result, RpcWriteError> - where - T: WriteClient + 'static, - C: CircuitBreakerState + 'static, - { - let handler = RpcWrite { - endpoints: Balancer::new(endpoints, None), - }; - - // Generate some write input - let input = Partitioned::new( - PartitionKey::from("2022-01-01"), - lp_to_writes("bananas,tag1=A,tag2=B val=42i 1"), - ); - - // Use tokio's "auto-advance" time feature to avoid waiting for the - // actual timeout duration. - tokio::time::pause(); - - // Drive the RPC writer - handler - .write( - &NamespaceName::new(NAMESPACE_NAME).unwrap(), - NAMESPACE_ID, - input, - None, - ) - .await - } - /// Assert the error response for a write request when there are no healthy /// upstreams. #[tokio::test] @@ -500,9 +619,10 @@ mod tests { // Mark the client circuit breaker as unhealthy circuit_1.set_healthy(false); - let got = make_request([ - CircuitBreakingClient::new(client_1, "client_1").with_circuit_breaker(circuit_1) - ]) + let got = make_request( + [CircuitBreakingClient::new(client_1, "client_1").with_circuit_breaker(circuit_1)], + 1, + ) .await; assert_matches!(got, Err(RpcWriteError::NoUpstreams)); @@ -520,9 +640,10 @@ mod tests { let circuit_1 = Arc::new(MockCircuitBreaker::default()); circuit_1.set_healthy(true); - let got = make_request([ - CircuitBreakingClient::new(client_1, "client_1").with_circuit_breaker(circuit_1) - ]) + let got = make_request( + [CircuitBreakingClient::new(client_1, "client_1").with_circuit_breaker(circuit_1)], + 1, + ) .await; assert_matches!(got, Err(RpcWriteError::Upstream(s)) => { @@ -543,11 +664,236 @@ mod tests { let circuit_1 = Arc::new(MockCircuitBreaker::default()); circuit_1.set_healthy(true); - let got = make_request([ - CircuitBreakingClient::new(client_1, "client_1").with_circuit_breaker(circuit_1) - ]) + let got = make_request( + [CircuitBreakingClient::new(client_1, "client_1").with_circuit_breaker(circuit_1)], + 1, + ) .await; assert_matches!(got, Err(RpcWriteError::NoUpstreams)); } + + /// Assert that an error is returned without any RPC request being made when + /// the number of healthy upstreams is less than the desired replication + /// factor. + #[tokio::test] + async fn test_write_not_enough_upstreams_for_replication() { + // Initialise two upstreams, 1 healthy, 1 not. + let client_1 = Arc::new( + MockWriteClient::default().with_ret(Box::new(iter::repeat_with(|| { + Err(RpcWriteError::UpstreamNotConnected("bananas".to_string())) + }))), + ); + let circuit_1 = Arc::new(MockCircuitBreaker::default()); + circuit_1.set_healthy(true); + + let client_2 = Arc::new( + MockWriteClient::default().with_ret(Box::new(iter::repeat_with(|| { + Err(RpcWriteError::UpstreamNotConnected("bananas".to_string())) + }))), + ); + let circuit_2 = Arc::new(MockCircuitBreaker::default()); + circuit_2.set_healthy(false); + + let got = make_request( + [ + CircuitBreakingClient::new(client_1, "client_1").with_circuit_breaker(circuit_1), + CircuitBreakingClient::new(client_2, "client_2").with_circuit_breaker(circuit_2), + ], + 2, // 2 copies required + ) + .await; + + assert_matches!(got, Err(RpcWriteError::NotEnoughReplicas)); + } + + /// Assert distinct upstreams are wrote to. + #[tokio::test] + async fn test_write_replication_distinct_hosts() { + // Initialise two upstreams. + let client_1 = Arc::new(MockWriteClient::default().with_ret(Box::new(iter::once(Ok(()))))); + let circuit_1 = Arc::new(MockCircuitBreaker::default()); + circuit_1.set_healthy(true); + + let client_2 = Arc::new(MockWriteClient::default().with_ret(Box::new(iter::once(Ok(()))))); + let circuit_2 = Arc::new(MockCircuitBreaker::default()); + circuit_2.set_healthy(true); + + let got = make_request( + [ + CircuitBreakingClient::new(Arc::clone(&client_1), "client_1") + .with_circuit_breaker(circuit_1), + CircuitBreakingClient::new(Arc::clone(&client_2), "client_2") + .with_circuit_breaker(circuit_2), + ], + 2, // 2 copies required + ) + .await; + + assert_matches!(got, Ok(_)); + + // Assert each client received one (matching) write each + let calls_1 = client_1.calls(); + assert_eq!(calls_1.len(), 1); + assert_eq!(calls_1, client_2.calls()); + } + + /// Assert that once a request has been sent to an upstream, it is never + /// retried again. + #[tokio::test] + async fn test_write_replication_distinct_hosts_partial_write() { + // Initialise two upstreams, 1 willing to ACK a write, and the other + // always throwing an error. + let client_1 = Arc::new(MockWriteClient::default().with_ret(Box::new(iter::once(Ok(()))))); + let circuit_1 = Arc::new(MockCircuitBreaker::default()); + circuit_1.set_healthy(true); + + let client_2 = Arc::new( + MockWriteClient::default().with_ret(Box::new(iter::repeat_with(|| { + Err(RpcWriteError::Upstream(tonic::Status::internal("bananas"))) + }))), + ); + let circuit_2 = Arc::new(MockCircuitBreaker::default()); + circuit_2.set_healthy(true); + + let mut clients = vec![ + CircuitBreakingClient::new(Arc::clone(&client_1), "client_1") + .with_circuit_breaker(circuit_1), + CircuitBreakingClient::new(Arc::clone(&client_2), "client_2") + .with_circuit_breaker(circuit_2), + ]; + + // The order should never affect the outcome. + clients.shuffle(&mut rand::thread_rng()); + + let got = make_request( + clients, 2, // 2 copies required + ) + .await; + + assert_matches!( + got, + Err(RpcWriteError::PartialWrite { + want_n_copies: 2, + acks: 1 + }) + ); + + // Assert the healthy upstream was only ever called once. + assert_eq!(client_1.calls().len(), 1); + } + + /// Replication writes must tolerate a transient error from an upstream. + #[tokio::test] + async fn test_write_replication_tolerates_temporary_error() { + // Initialise two upstreams, 1 willing to ACK a write, and the other + // always throwing an error. + let client_1 = Arc::new(MockWriteClient::default().with_ret(Box::new(iter::once(Ok(()))))); + let circuit_1 = Arc::new(MockCircuitBreaker::default()); + circuit_1.set_healthy(true); + + let client_2 = Arc::new( + MockWriteClient::default().with_ret(Box::new( + [ + Err(RpcWriteError::Upstream(tonic::Status::internal("bananas"))), + Ok(()), + ] + .into_iter(), + )), + ); + let circuit_2 = Arc::new(MockCircuitBreaker::default()); + circuit_2.set_healthy(true); + + let got = make_request( + [ + CircuitBreakingClient::new(Arc::clone(&client_1), "client_1") + .with_circuit_breaker(circuit_1), + CircuitBreakingClient::new(Arc::clone(&client_2), "client_2") + .with_circuit_breaker(circuit_2), + ], + 2, // 2 copies required + ) + .await; + + assert_matches!(got, Ok(_)); + + // Assert the happy upstream was only ever called once. + let calls_1 = client_1.calls(); + assert_eq!(calls_1.len(), 1); + // The unhappy upstream was retried + let calls_2 = client_2.calls(); + assert_eq!(calls_2.len(), 2); + // The two upstreams got the same request. + assert_eq!(calls_1[0], calls_2[0]); + // And the second request to the unhappy upstream matched the first. + assert_eq!(calls_2[0], calls_2[1]); + } + + /// Replication writes find the desired number of replicas by trying all + /// upstreams. + #[tokio::test] + async fn test_write_replication_tolerates_bad_upstream() { + // Initialise three upstreams, 1 willing to ACK a write immediately, the + // second will error twice, and the third always errors. + let client_1 = Arc::new(MockWriteClient::default().with_ret(Box::new(iter::once(Ok(()))))); + let circuit_1 = Arc::new(MockCircuitBreaker::default()); + circuit_1.set_healthy(true); + + // This client sometimes errors (2 times) + let client_2 = Arc::new( + MockWriteClient::default().with_ret(Box::new( + [ + Err(RpcWriteError::Upstream(tonic::Status::internal("bananas"))), + Err(RpcWriteError::Upstream(tonic::Status::internal("bananas"))), + Ok(()), + ] + .into_iter(), + )), + ); + let circuit_2 = Arc::new(MockCircuitBreaker::default()); + circuit_2.set_healthy(true); + + // This client always errors + let client_3 = Arc::new( + MockWriteClient::default().with_ret(Box::new(iter::repeat_with(|| { + Err(RpcWriteError::UpstreamNotConnected("bananas".to_string())) + }))), + ); + let circuit_3 = Arc::new(MockCircuitBreaker::default()); + circuit_3.set_healthy(true); + + let mut clients = vec![ + CircuitBreakingClient::new(Arc::clone(&client_1), "client_1") + .with_circuit_breaker(circuit_1), + CircuitBreakingClient::new(Arc::clone(&client_2), "client_2") + .with_circuit_breaker(circuit_2), + CircuitBreakingClient::new(Arc::clone(&client_3), "client_3") + .with_circuit_breaker(circuit_3), + ]; + + // The order should never affect the outcome. + clients.shuffle(&mut rand::thread_rng()); + + let got = make_request( + clients, 2, // 2 copies required + ) + .await; + + assert_matches!(got, Ok(_)); + + // Assert the happy upstream was only ever called once. + let calls_1 = client_1.calls(); + assert_eq!(calls_1.len(), 1); + + // The sometimes error upstream was retried until it succeeded. + let calls_2 = client_2.calls(); + assert_eq!(calls_2.len(), 3); + + // All requests were equal. + assert!(calls_1 + .iter() + .chain(calls_2.iter()) + .chain(client_3.calls().iter()) + .all(|v| *v == calls_1[0])); + } } diff --git a/router/src/dml_handlers/rpc_write/balancer.rs b/router/src/dml_handlers/rpc_write/balancer.rs index 906e982a9e..d297b14c1e 100644 --- a/router/src/dml_handlers/rpc_write/balancer.rs +++ b/router/src/dml_handlers/rpc_write/balancer.rs @@ -61,6 +61,11 @@ where } } + /// Returns the number of configured upstream endpoints. + pub(super) fn len(&self) -> usize { + self.endpoints.len() + } + /// Return an (infinite) iterator of healthy [`CircuitBreakingClient`], and /// at most one client needing a health probe. /// diff --git a/router/src/dml_handlers/rpc_write/lazy_connector.rs b/router/src/dml_handlers/rpc_write/lazy_connector.rs index 6325c8e96f..3361cd95b5 100644 --- a/router/src/dml_handlers/rpc_write/lazy_connector.rs +++ b/router/src/dml_handlers/rpc_write/lazy_connector.rs @@ -128,7 +128,9 @@ fn is_envoy_unavailable_error(e: &RpcWriteError) -> bool { | RpcWriteError::Timeout(_) | RpcWriteError::NoUpstreams | RpcWriteError::UpstreamNotConnected(_) - | RpcWriteError::DeletesUnsupported => false, + | RpcWriteError::DeletesUnsupported + | RpcWriteError::PartialWrite { .. } + | RpcWriteError::NotEnoughReplicas => false, } } diff --git a/router/src/server/http.rs b/router/src/server/http.rs index 9f06a553ee..9ffdfe9b83 100644 --- a/router/src/server/http.rs +++ b/router/src/server/http.rs @@ -180,7 +180,10 @@ impl From<&DmlError> for StatusCode { DmlError::RpcWrite(RpcWriteError::DeletesUnsupported) => StatusCode::NOT_IMPLEMENTED, DmlError::RpcWrite(RpcWriteError::Timeout(_)) => StatusCode::GATEWAY_TIMEOUT, DmlError::RpcWrite( - RpcWriteError::NoUpstreams | RpcWriteError::UpstreamNotConnected(_), + RpcWriteError::NoUpstreams + | RpcWriteError::NotEnoughReplicas + | RpcWriteError::PartialWrite { .. } + | RpcWriteError::UpstreamNotConnected(_), ) => StatusCode::SERVICE_UNAVAILABLE, } } diff --git a/router/tests/common/mod.rs b/router/tests/common/mod.rs index ff0a3d3cf9..226a907996 100644 --- a/router/tests/common/mod.rs +++ b/router/tests/common/mod.rs @@ -122,7 +122,7 @@ impl TestContext { metrics: Arc, ) -> Self { let client = Arc::new(MockWriteClient::default()); - let rpc_writer = RpcWrite::new([(Arc::clone(&client), "mock client")], &metrics); + let rpc_writer = RpcWrite::new([(Arc::clone(&client), "mock client")], None, &metrics); let ns_cache = Arc::new(ShardedCache::new( iter::repeat_with(|| Arc::new(MemoryNamespaceCache::default())).take(10), From feae0ce345a11d92c6de329ed2a27d73a03470e0 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Thu, 23 Mar 2023 17:46:31 +0100 Subject: [PATCH 2/3] feat(router): configurable replication factor This change allows the replication factor to be specified at runtime, defaulting to "replication disabled". --- clap_blocks/src/router2.rs | 16 +++++++++++++++- influxdb_iox/src/commands/run/all_in_one.rs | 1 + 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/clap_blocks/src/router2.rs b/clap_blocks/src/router2.rs index 8da9c621a0..35c395dcd7 100644 --- a/clap_blocks/src/router2.rs +++ b/clap_blocks/src/router2.rs @@ -1,7 +1,10 @@ //! CLI config for the router using the RPC write path use crate::ingester_address::IngesterAddress; -use std::{num::ParseIntError, time::Duration}; +use std::{ + num::{NonZeroUsize, ParseIntError}, + time::Duration, +}; /// CLI config for the router using the RPC write path #[derive(Debug, Clone, clap::Parser)] @@ -105,6 +108,17 @@ pub struct Router2Config { value_parser = parse_duration )] pub rpc_write_timeout_seconds: Duration, + + /// Specify the optional replication factor for each RPC write. + /// + /// The total number of copies of data after replication will be this value, + /// plus 1. + /// + /// If the desired replication level is not achieved, a partial write error + /// will be returned to the user. The write MAY be queryable after a partial + /// write failure. + #[clap(long = "rpc-write-replicas", env = "INFLUXDB_IOX_RPC_WRITE_REPLICAS")] + pub rpc_write_replicas: Option, } /// Map a string containing an integer number of seconds into a [`Duration`]. diff --git a/influxdb_iox/src/commands/run/all_in_one.rs b/influxdb_iox/src/commands/run/all_in_one.rs index e1326fad49..c2ca46d383 100644 --- a/influxdb_iox/src/commands/run/all_in_one.rs +++ b/influxdb_iox/src/commands/run/all_in_one.rs @@ -472,6 +472,7 @@ impl Config { partition_key_pattern: "%Y-%m-%d".to_string(), topic: QUERY_POOL_NAME.to_string(), rpc_write_timeout_seconds: Duration::new(3, 0), + rpc_write_replicas: None, }; // create a CompactorConfig for the all in one server based on From 6105fd5bb7aa84b779575f0b20ac17824a0da3fe Mon Sep 17 00:00:00 2001 From: Dom Date: Mon, 27 Mar 2023 10:04:17 +0100 Subject: [PATCH 3/3] refactor: compute n_copies once Co-authored-by: Carol (Nichols || Goulding) <193874+carols10cents@users.noreply.github.com> --- router/src/dml_handlers/rpc_write.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/router/src/dml_handlers/rpc_write.rs b/router/src/dml_handlers/rpc_write.rs index 5c5b672dad..a0e73fc942 100644 --- a/router/src/dml_handlers/rpc_write.rs +++ b/router/src/dml_handlers/rpc_write.rs @@ -159,7 +159,7 @@ impl RpcWrite { Self { endpoints, - n_copies: replica_copies.map(NonZeroUsize::get).unwrap_or(1), + n_copies, } } }