Merge pull request #7603 from influxdata/dom/router-msg-size

feat: configurable internal RPC write message size
pull/24376/head
Dom 2023-04-19 15:19:40 +01:00 committed by GitHub
commit 842e49b75a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 54 additions and 4 deletions

View File

@ -11,6 +11,15 @@ pub struct Ingester2Config {
#[clap(long = "wal-directory", env = "INFLUXDB_IOX_WAL_DIRECTORY", action)]
pub wal_directory: PathBuf,
/// Specify the maximum allowed incoming RPC write message size sent by the
/// Router.
#[clap(
long = "rpc-write-max-incoming-bytes",
env = "INFLUXDB_IOX_RPC_WRITE_MAX_INCOMING_BYTES",
default_value = "104857600", // 100MiB
)]
pub rpc_write_max_incoming_bytes: usize,
/// The number of seconds between WAL file rotations.
#[clap(
long = "wal-rotation-period-seconds",

View File

@ -120,6 +120,15 @@ pub struct Router2Config {
)]
pub rpc_write_timeout_seconds: Duration,
/// Specify the maximum allowed outgoing RPC write message size when
/// communicating with the Ingester.
#[clap(
long = "rpc-write-max-outgoing-bytes",
env = "INFLUXDB_IOX_RPC_WRITE_MAX_OUTGOING_BYTES",
default_value = "104857600", // 100MiB
)]
pub rpc_write_max_outgoing_bytes: usize,
/// Specify the optional replication factor for each RPC write.
///
/// The total number of copies of data after replication will be this value,

View File

@ -456,6 +456,7 @@ impl Config {
persist_max_parallelism,
persist_queue_depth,
persist_hot_partition_cost,
rpc_write_max_incoming_bytes: 1024 * 1024 * 1024, // 1GiB
};
let router_config = Router2Config {
@ -469,6 +470,7 @@ impl Config {
rpc_write_timeout_seconds: Duration::new(3, 0),
rpc_write_replicas: None,
single_tenant_deployment: false,
rpc_write_max_outgoing_bytes: ingester_config.rpc_write_max_incoming_bytes,
};
// create a CompactorConfig for the all in one server based on

View File

@ -32,6 +32,11 @@ use tokio::sync::oneshot;
use tokio_util::sync::CancellationToken;
use trace::TraceCollector;
/// Define a safe maximum ingester write response size.
///
/// The ingester SHOULD NOT ever generate a response larger than this.
const MAX_OUTGOING_MSG_BYTES: usize = 1024 * 1024; // 1 MiB
#[derive(Debug, Error)]
pub enum Error {
#[error("error initializing ingester2: {0}")]
@ -46,6 +51,7 @@ struct IngesterServerType<I: IngesterRpcInterface> {
metrics: Arc<Registry>,
trace_collector: Option<Arc<dyn TraceCollector>>,
max_simultaneous_queries: usize,
max_incoming_msg_bytes: usize,
}
impl<I: IngesterRpcInterface> IngesterServerType<I> {
@ -54,6 +60,7 @@ impl<I: IngesterRpcInterface> IngesterServerType<I> {
metrics: Arc<Registry>,
common_state: &CommonServerState,
max_simultaneous_queries: usize,
max_incoming_msg_bytes: usize,
shutdown: oneshot::Sender<CancellationToken>,
) -> Self {
Self {
@ -62,6 +69,7 @@ impl<I: IngesterRpcInterface> IngesterServerType<I> {
metrics,
trace_collector: common_state.trace_collector(),
max_simultaneous_queries,
max_incoming_msg_bytes,
}
}
}
@ -108,6 +116,8 @@ impl<I: IngesterRpcInterface + Sync + Send + Debug + 'static> ServerType for Ing
add_service!(
builder,
WriteServiceServer::new(self.server.rpc().write_service())
.max_decoding_message_size(self.max_incoming_msg_bytes)
.max_encoding_message_size(MAX_OUTGOING_MSG_BYTES)
);
add_service!(
builder,
@ -204,6 +214,7 @@ pub async fn create_ingester_server_type(
metrics,
common_state,
ingester_config.concurrent_query_limit,
ingester_config.rpc_write_max_incoming_bytes,
shutdown_tx,
)))
}

View File

@ -200,7 +200,11 @@ pub async fn create_router2_server_type(
let endpoint = Endpoint::from_shared(hyper::body::Bytes::from(addr.clone()))
.expect("invalid ingester connection address");
(
LazyConnector::new(endpoint, router_config.rpc_write_timeout_seconds),
LazyConnector::new(
endpoint,
router_config.rpc_write_timeout_seconds,
router_config.rpc_write_max_outgoing_bytes,
),
addr,
)
});

View File

@ -7,7 +7,7 @@ license.workspace = true
[dependencies]
async-trait = "0.1"
authz = { path = "../authz" }
authz = { path = "../authz", features = ["http"] }
bytes = "1.4"
crossbeam-utils = "0.8.15"
data_types = { path = "../data_types" }

View File

@ -30,6 +30,9 @@ const CONNECT_TIMEOUT: Duration = Duration::from_secs(1);
/// (at most once per [`RETRY_INTERVAL]).
const RECONNECT_ERROR_COUNT: usize = 10;
/// Define a safe maximum ingester write response size.
const MAX_INCOMING_MSG_BYTES: usize = 1024 * 1024; // 1 MiB
/// Lazy [`Channel`] connector.
///
/// Connections are attempted in a background thread every [`RETRY_INTERVAL`].
@ -43,6 +46,12 @@ pub struct LazyConnector {
addr: Endpoint,
connection: Arc<Mutex<Option<Channel>>>,
/// The maximum outgoing message size.
///
/// The incoming size remains bounded at [`MAX_INCOMING_MSG_BYTES`] as the
/// ingester SHOULD NOT ever generate a response larger than this.
max_outgoing_msg_bytes: usize,
/// The number of request errors observed without a single success.
consecutive_errors: Arc<AtomicUsize>,
/// A task that periodically opens a new connection to `addr` when
@ -52,7 +61,7 @@ pub struct LazyConnector {
impl LazyConnector {
/// Lazily connect to `addr`.
pub fn new(addr: Endpoint, request_timeout: Duration) -> Self {
pub fn new(addr: Endpoint, request_timeout: Duration, max_outgoing_msg_bytes: usize) -> Self {
let addr = addr
.connect_timeout(CONNECT_TIMEOUT)
.timeout(request_timeout);
@ -62,6 +71,7 @@ impl LazyConnector {
let consecutive_errors = Arc::new(AtomicUsize::new(RECONNECT_ERROR_COUNT + 1));
Self {
addr: addr.clone(),
max_outgoing_msg_bytes,
connection: Arc::clone(&connection),
connection_task: tokio::spawn(try_connect(
addr,
@ -89,7 +99,12 @@ impl WriteClient for LazyConnector {
let conn =
conn.ok_or_else(|| RpcWriteError::UpstreamNotConnected(self.addr.uri().to_string()))?;
match WriteServiceClient::new(conn).write(op).await {
match WriteServiceClient::new(conn)
.max_encoding_message_size(self.max_outgoing_msg_bytes)
.max_decoding_message_size(MAX_INCOMING_MSG_BYTES)
.write(op)
.await
{
Err(e) if is_envoy_unavailable_error(&e) => {
warn!(error=%e, "detected envoy proxy upstream network error translation, reconnecting");
self.consecutive_errors