From 876a9af35a1ad455b99a5b0a306c6d938d14e2ac Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Wed, 8 Dec 2021 19:06:02 +0100 Subject: [PATCH] fix: limit number of S3 connections Otherwise the whole thing blows up when starting a server that has many DBs registerd, because we potentially create 1 connection per DB (e.g. to read out the preserved catalog). Fixes #3336. --- Cargo.lock | 2 + .../src/structopt_blocks/object_store.rs | 11 ++- object_store/Cargo.toml | 6 +- object_store/src/aws.rs | 77 ++++++++++++++++--- object_store/src/dummy.rs | 3 + object_store/src/lib.rs | 4 +- 6 files changed, 91 insertions(+), 12 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b6123cd7e7..31237308e4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2486,6 +2486,8 @@ dependencies = [ "dotenv", "futures", "futures-test", + "hyper", + "hyper-tls", "indexmap", "itertools", "observability_deps", diff --git a/influxdb_iox/src/structopt_blocks/object_store.rs b/influxdb_iox/src/structopt_blocks/object_store.rs index 0296500767..e6666c5f80 100644 --- a/influxdb_iox/src/structopt_blocks/object_store.rs +++ b/influxdb_iox/src/structopt_blocks/object_store.rs @@ -1,5 +1,5 @@ //! CLI handling for object store config (via CLI arguments and environment variables). -use std::{convert::TryFrom, fs, path::PathBuf, time::Duration}; +use std::{convert::TryFrom, fs, num::NonZeroUsize, path::PathBuf, time::Duration}; use clap::arg_enum; use futures::TryStreamExt; @@ -172,6 +172,14 @@ Possible values (case insensitive): /// environments. #[structopt(long = "--azure-storage-access-key", env = "AZURE_STORAGE_ACCESS_KEY")] pub azure_storage_access_key: Option, + + /// When using a network-based object store, limit the number of connection to this value. + #[structopt( + long = "--object-store-connection-limit", + env = "OBJECT_STORE_CONNECTION_LIMIT", + default_value = "16" + )] + pub object_store_connection_limit: NonZeroUsize, } arg_enum! { @@ -267,6 +275,7 @@ impl TryFrom<&ObjectStoreConfig> for ObjectStore { bucket, endpoint, session_token, + config.object_store_connection_limit, ) .context(InvalidS3Config) } diff --git a/object_store/Cargo.toml b/object_store/Cargo.toml index a34f870852..0bc1eb1a59 100644 --- a/object_store/Cargo.toml +++ b/object_store/Cargo.toml @@ -15,6 +15,10 @@ chrono = { version = "0.4", default-features = false, features = ["clock"] } # Google Cloud Storage integration cloud-storage = {version = "0.10.3", optional = true} futures = "0.3" +# for rusoto +hyper = { version = "0.14", optional = true, default-features = false } +# for rusoto +hyper-tls = { version = "0.5.0", optional = true, default-features = false } indexmap = { version = "1.7", optional = true, features = ["std"] } itertools = "0.10.1" observability_deps = { path = "../observability_deps" } @@ -36,7 +40,7 @@ workspace-hack = { path = "../workspace-hack"} [features] azure = ["azure_core", "azure_storage", "indexmap", "reqwest"] gcp = ["cloud-storage"] -aws = ["rusoto_core", "rusoto_credential", "rusoto_s3"] +aws = ["rusoto_core", "rusoto_credential", "rusoto_s3", "hyper", "hyper-tls"] [dev-dependencies] # In alphabetical order dotenv = "0.15.0" diff --git a/object_store/src/aws.rs b/object_store/src/aws.rs index 42b3fd8abe..b1a06a697e 100644 --- a/object_store/src/aws.rs +++ b/object_store/src/aws.rs @@ -11,12 +11,15 @@ use futures::{ stream::{self, BoxStream}, Future, StreamExt, TryStreamExt, }; +use hyper::client::Builder as HyperBuilder; +use hyper_tls::HttpsConnector; use observability_deps::tracing::{debug, warn}; use rusoto_core::ByteStream; use rusoto_credential::{InstanceMetadataProvider, StaticProvider}; use rusoto_s3::S3; use snafu::{OptionExt, ResultExt, Snafu}; -use std::{convert::TryFrom, fmt, time::Duration}; +use std::{convert::TryFrom, fmt, num::NonZeroUsize, ops::Deref, sync::Arc, time::Duration}; +use tokio::sync::{OwnedSemaphorePermit, Semaphore}; /// A specialized `Result` for object store-related errors pub type Result = std::result::Result; @@ -140,7 +143,15 @@ pub enum Error { /// Configuration for connecting to [Amazon S3](https://aws.amazon.com/s3/). pub struct AmazonS3 { - client: rusoto_s3::S3Client, + /// S3 client w/o any connection limit. + /// + /// You should normally use [`Self::client`] instead. + client_unrestricted: rusoto_s3::S3Client, + + /// Semaphore that limits the usage of [`client_unrestricted`](Self::client_unrestricted). + connection_semaphore: Arc, + + /// Bucket name used by this object store client. bucket_name: String, } @@ -185,7 +196,7 @@ impl ObjectStoreApi for AmazonS3 { } }; - let s3 = self.client.clone(); + let s3 = self.client().await; s3_request(move || { let (s3, request_factory) = (s3.clone(), request_factory.clone()); @@ -210,7 +221,8 @@ impl ObjectStoreApi for AmazonS3 { }; let bucket_name = self.bucket_name.clone(); let s = self - .client + .client() + .await .get_object(get_request) .await .map_err(|e| match e { @@ -252,7 +264,7 @@ impl ObjectStoreApi for AmazonS3 { ..Default::default() }; - let s3 = self.client.clone(); + let s3 = self.client().await; s3_request(move || { let (s3, request_factory) = (s3.clone(), request_factory.clone()); @@ -357,6 +369,7 @@ pub(crate) fn new_s3( bucket_name: impl Into, endpoint: Option>, session_token: Option>, + max_connections: NonZeroUsize, ) -> Result { let region = region.into(); let region: rusoto_core::Region = match endpoint { @@ -367,8 +380,10 @@ pub(crate) fn new_s3( }, }; - let http_client = rusoto_core::request::HttpClient::new() - .expect("Current implementation of rusoto_core has no way for this to fail"); + let mut builder = HyperBuilder::default(); + builder.pool_max_idle_per_host(max_connections.get()); + let connector = HttpsConnector::new(); + let http_client = rusoto_core::request::HttpClient::from_builder(builder, connector); let client = match (access_key_id, secret_access_key, session_token) { (Some(access_key_id), Some(secret_access_key), Some(session_token)) => { @@ -394,7 +409,8 @@ pub(crate) fn new_s3( }; Ok(AmazonS3 { - client, + client_unrestricted: client, + connection_semaphore: Arc::new(Semaphore::new(max_connections.get())), bucket_name: bucket_name.into(), }) } @@ -407,10 +423,43 @@ pub(crate) fn new_failing_s3() -> Result { "bucket", None as Option<&str>, None as Option<&str>, + NonZeroUsize::new(16).unwrap(), ) } +/// S3 client bundled w/ a semaphore permit. +#[derive(Clone)] +struct SemaphoreClient { + /// Permit for this specific use of the client. + /// + /// Note that this field is never read and therefore considered "dead code" by rustc. + #[allow(dead_code)] + permit: Arc, + + inner: rusoto_s3::S3Client, +} + +impl Deref for SemaphoreClient { + type Target = rusoto_s3::S3Client; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + impl AmazonS3 { + /// Get a client according to the current connection limit. + async fn client(&self) -> SemaphoreClient { + let permit = Arc::clone(&self.connection_semaphore) + .acquire_owned() + .await + .expect("semaphore shouldn't be closed yet"); + SemaphoreClient { + permit: Arc::new(permit), + inner: self.client_unrestricted.clone(), + } + } + async fn list_objects_v2( &self, prefix: Option<&CloudPath>, @@ -433,10 +482,11 @@ impl AmazonS3 { delimiter, ..Default::default() }; + let s3 = self.client().await; Ok(stream::unfold(ListState::Start, move |state| { let request_factory = request_factory.clone(); - let s3 = self.client.clone(); + let s3 = s3.clone(); async move { let continuation_token = match state.clone() { @@ -685,6 +735,7 @@ mod tests { config.bucket, config.endpoint, config.token, + NonZeroUsize::new(16).unwrap(), ) .expect("Valid S3 config"); @@ -705,6 +756,7 @@ mod tests { &config.bucket, config.endpoint, config.token, + NonZeroUsize::new(16).unwrap(), ) .expect("Valid S3 config"); @@ -735,6 +787,7 @@ mod tests { &config.bucket, config.endpoint, config.token, + NonZeroUsize::new(16).unwrap(), ) .expect("Valid S3 config"); @@ -776,6 +829,7 @@ mod tests { &config.bucket, config.endpoint, config.token, + NonZeroUsize::new(16).unwrap(), ) .expect("Valid S3 config"); @@ -812,6 +866,7 @@ mod tests { &config.bucket, config.endpoint, config.token, + NonZeroUsize::new(16).unwrap(), ) .expect("Valid S3 config"); @@ -850,6 +905,7 @@ mod tests { &config.bucket, config.endpoint, config.token, + NonZeroUsize::new(16).unwrap(), ) .expect("Valid S3 config"); @@ -886,6 +942,7 @@ mod tests { config.bucket, config.endpoint, config.token, + NonZeroUsize::new(16).unwrap(), ) .expect("Valid S3 config"); @@ -910,6 +967,7 @@ mod tests { &config.bucket, config.endpoint, config.token, + NonZeroUsize::new(16).unwrap(), ) .expect("Valid S3 config"); @@ -946,6 +1004,7 @@ mod tests { &config.bucket, config.endpoint, config.token, + NonZeroUsize::new(16).unwrap(), ) .expect("Valid S3 config"); diff --git a/object_store/src/dummy.rs b/object_store/src/dummy.rs index 3d2c1141c0..4742e84714 100644 --- a/object_store/src/dummy.rs +++ b/object_store/src/dummy.rs @@ -1,5 +1,7 @@ //! Crate that mimics the interface of the the various object stores //! but does nothing if they are not enabled. +use std::num::NonZeroUsize; + use async_trait::async_trait; use bytes::Bytes; use snafu::Snafu; @@ -89,6 +91,7 @@ pub(crate) fn new_s3( _bucket_name: impl Into, _endpoint: Option>, _session_token: Option>, + _max_connections: NonZeroUsize, ) -> Result { NotSupported { name: "aws" }.fail() } diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs index fa01aa4af5..ab407b6150 100644 --- a/object_store/src/lib.rs +++ b/object_store/src/lib.rs @@ -59,7 +59,7 @@ use bytes::Bytes; use chrono::{DateTime, Utc}; use futures::{stream::BoxStream, StreamExt, TryFutureExt, TryStreamExt}; use snafu::{ResultExt, Snafu}; -use std::fmt::Formatter; +use std::{fmt::Formatter, num::NonZeroUsize}; use std::{path::PathBuf, sync::Arc}; /// Universal API to multiple object store services. @@ -118,6 +118,7 @@ impl ObjectStore { bucket_name: impl Into, endpoint: Option>, session_token: Option>, + max_connections: NonZeroUsize, ) -> Result { let s3 = aws::new_s3( access_key_id, @@ -126,6 +127,7 @@ impl ObjectStore { bucket_name, endpoint, session_token, + max_connections, )?; Ok(Self { integration: ObjectStoreIntegration::AmazonS3(s3),