Merge pull request #3342 from influxdata/crepererum/issue3336

fix: limit number of S3 connections
pull/24376/head
kodiakhq[bot] 2021-12-09 09:53:50 +00:00 committed by GitHub
commit 46b43d3c30
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 91 additions and 12 deletions

2
Cargo.lock generated
View File

@ -2486,6 +2486,8 @@ dependencies = [
"dotenv",
"futures",
"futures-test",
"hyper",
"hyper-tls",
"indexmap",
"itertools",
"observability_deps",

View File

@ -1,5 +1,5 @@
//! CLI handling for object store config (via CLI arguments and environment variables).
use std::{convert::TryFrom, fs, path::PathBuf, time::Duration};
use std::{convert::TryFrom, fs, num::NonZeroUsize, path::PathBuf, time::Duration};
use clap::arg_enum;
use futures::TryStreamExt;
@ -172,6 +172,14 @@ Possible values (case insensitive):
/// environments.
#[structopt(long = "--azure-storage-access-key", env = "AZURE_STORAGE_ACCESS_KEY")]
pub azure_storage_access_key: Option<String>,
/// When using a network-based object store, limit the number of connection to this value.
#[structopt(
long = "--object-store-connection-limit",
env = "OBJECT_STORE_CONNECTION_LIMIT",
default_value = "16"
)]
pub object_store_connection_limit: NonZeroUsize,
}
arg_enum! {
@ -267,6 +275,7 @@ impl TryFrom<&ObjectStoreConfig> for ObjectStore {
bucket,
endpoint,
session_token,
config.object_store_connection_limit,
)
.context(InvalidS3Config)
}

View File

@ -15,6 +15,10 @@ chrono = { version = "0.4", default-features = false, features = ["clock"] }
# Google Cloud Storage integration
cloud-storage = {version = "0.10.3", optional = true}
futures = "0.3"
# for rusoto
hyper = { version = "0.14", optional = true, default-features = false }
# for rusoto
hyper-tls = { version = "0.5.0", optional = true, default-features = false }
indexmap = { version = "1.7", optional = true, features = ["std"] }
itertools = "0.10.1"
observability_deps = { path = "../observability_deps" }
@ -36,7 +40,7 @@ workspace-hack = { path = "../workspace-hack"}
[features]
azure = ["azure_core", "azure_storage", "indexmap", "reqwest"]
gcp = ["cloud-storage"]
aws = ["rusoto_core", "rusoto_credential", "rusoto_s3"]
aws = ["rusoto_core", "rusoto_credential", "rusoto_s3", "hyper", "hyper-tls"]
[dev-dependencies] # In alphabetical order
dotenv = "0.15.0"

View File

@ -11,12 +11,15 @@ use futures::{
stream::{self, BoxStream},
Future, StreamExt, TryStreamExt,
};
use hyper::client::Builder as HyperBuilder;
use hyper_tls::HttpsConnector;
use observability_deps::tracing::{debug, warn};
use rusoto_core::ByteStream;
use rusoto_credential::{InstanceMetadataProvider, StaticProvider};
use rusoto_s3::S3;
use snafu::{OptionExt, ResultExt, Snafu};
use std::{convert::TryFrom, fmt, time::Duration};
use std::{convert::TryFrom, fmt, num::NonZeroUsize, ops::Deref, sync::Arc, time::Duration};
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
/// A specialized `Result` for object store-related errors
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -140,7 +143,15 @@ pub enum Error {
/// Configuration for connecting to [Amazon S3](https://aws.amazon.com/s3/).
pub struct AmazonS3 {
client: rusoto_s3::S3Client,
/// S3 client w/o any connection limit.
///
/// You should normally use [`Self::client`] instead.
client_unrestricted: rusoto_s3::S3Client,
/// Semaphore that limits the usage of [`client_unrestricted`](Self::client_unrestricted).
connection_semaphore: Arc<Semaphore>,
/// Bucket name used by this object store client.
bucket_name: String,
}
@ -185,7 +196,7 @@ impl ObjectStoreApi for AmazonS3 {
}
};
let s3 = self.client.clone();
let s3 = self.client().await;
s3_request(move || {
let (s3, request_factory) = (s3.clone(), request_factory.clone());
@ -210,7 +221,8 @@ impl ObjectStoreApi for AmazonS3 {
};
let bucket_name = self.bucket_name.clone();
let s = self
.client
.client()
.await
.get_object(get_request)
.await
.map_err(|e| match e {
@ -252,7 +264,7 @@ impl ObjectStoreApi for AmazonS3 {
..Default::default()
};
let s3 = self.client.clone();
let s3 = self.client().await;
s3_request(move || {
let (s3, request_factory) = (s3.clone(), request_factory.clone());
@ -357,6 +369,7 @@ pub(crate) fn new_s3(
bucket_name: impl Into<String>,
endpoint: Option<impl Into<String>>,
session_token: Option<impl Into<String>>,
max_connections: NonZeroUsize,
) -> Result<AmazonS3> {
let region = region.into();
let region: rusoto_core::Region = match endpoint {
@ -367,8 +380,10 @@ pub(crate) fn new_s3(
},
};
let http_client = rusoto_core::request::HttpClient::new()
.expect("Current implementation of rusoto_core has no way for this to fail");
let mut builder = HyperBuilder::default();
builder.pool_max_idle_per_host(max_connections.get());
let connector = HttpsConnector::new();
let http_client = rusoto_core::request::HttpClient::from_builder(builder, connector);
let client = match (access_key_id, secret_access_key, session_token) {
(Some(access_key_id), Some(secret_access_key), Some(session_token)) => {
@ -394,7 +409,8 @@ pub(crate) fn new_s3(
};
Ok(AmazonS3 {
client,
client_unrestricted: client,
connection_semaphore: Arc::new(Semaphore::new(max_connections.get())),
bucket_name: bucket_name.into(),
})
}
@ -407,10 +423,43 @@ pub(crate) fn new_failing_s3() -> Result<AmazonS3> {
"bucket",
None as Option<&str>,
None as Option<&str>,
NonZeroUsize::new(16).unwrap(),
)
}
/// S3 client bundled w/ a semaphore permit.
#[derive(Clone)]
struct SemaphoreClient {
/// Permit for this specific use of the client.
///
/// Note that this field is never read and therefore considered "dead code" by rustc.
#[allow(dead_code)]
permit: Arc<OwnedSemaphorePermit>,
inner: rusoto_s3::S3Client,
}
impl Deref for SemaphoreClient {
type Target = rusoto_s3::S3Client;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl AmazonS3 {
/// Get a client according to the current connection limit.
async fn client(&self) -> SemaphoreClient {
let permit = Arc::clone(&self.connection_semaphore)
.acquire_owned()
.await
.expect("semaphore shouldn't be closed yet");
SemaphoreClient {
permit: Arc::new(permit),
inner: self.client_unrestricted.clone(),
}
}
async fn list_objects_v2(
&self,
prefix: Option<&CloudPath>,
@ -433,10 +482,11 @@ impl AmazonS3 {
delimiter,
..Default::default()
};
let s3 = self.client().await;
Ok(stream::unfold(ListState::Start, move |state| {
let request_factory = request_factory.clone();
let s3 = self.client.clone();
let s3 = s3.clone();
async move {
let continuation_token = match state.clone() {
@ -685,6 +735,7 @@ mod tests {
config.bucket,
config.endpoint,
config.token,
NonZeroUsize::new(16).unwrap(),
)
.expect("Valid S3 config");
@ -705,6 +756,7 @@ mod tests {
&config.bucket,
config.endpoint,
config.token,
NonZeroUsize::new(16).unwrap(),
)
.expect("Valid S3 config");
@ -735,6 +787,7 @@ mod tests {
&config.bucket,
config.endpoint,
config.token,
NonZeroUsize::new(16).unwrap(),
)
.expect("Valid S3 config");
@ -776,6 +829,7 @@ mod tests {
&config.bucket,
config.endpoint,
config.token,
NonZeroUsize::new(16).unwrap(),
)
.expect("Valid S3 config");
@ -812,6 +866,7 @@ mod tests {
&config.bucket,
config.endpoint,
config.token,
NonZeroUsize::new(16).unwrap(),
)
.expect("Valid S3 config");
@ -850,6 +905,7 @@ mod tests {
&config.bucket,
config.endpoint,
config.token,
NonZeroUsize::new(16).unwrap(),
)
.expect("Valid S3 config");
@ -886,6 +942,7 @@ mod tests {
config.bucket,
config.endpoint,
config.token,
NonZeroUsize::new(16).unwrap(),
)
.expect("Valid S3 config");
@ -910,6 +967,7 @@ mod tests {
&config.bucket,
config.endpoint,
config.token,
NonZeroUsize::new(16).unwrap(),
)
.expect("Valid S3 config");
@ -946,6 +1004,7 @@ mod tests {
&config.bucket,
config.endpoint,
config.token,
NonZeroUsize::new(16).unwrap(),
)
.expect("Valid S3 config");

View File

@ -1,5 +1,7 @@
//! Crate that mimics the interface of the the various object stores
//! but does nothing if they are not enabled.
use std::num::NonZeroUsize;
use async_trait::async_trait;
use bytes::Bytes;
use snafu::Snafu;
@ -89,6 +91,7 @@ pub(crate) fn new_s3(
_bucket_name: impl Into<String>,
_endpoint: Option<impl Into<String>>,
_session_token: Option<impl Into<String>>,
_max_connections: NonZeroUsize,
) -> Result<DummyObjectStore> {
NotSupported { name: "aws" }.fail()
}

View File

@ -59,7 +59,7 @@ use bytes::Bytes;
use chrono::{DateTime, Utc};
use futures::{stream::BoxStream, StreamExt, TryFutureExt, TryStreamExt};
use snafu::{ResultExt, Snafu};
use std::fmt::Formatter;
use std::{fmt::Formatter, num::NonZeroUsize};
use std::{path::PathBuf, sync::Arc};
/// Universal API to multiple object store services.
@ -118,6 +118,7 @@ impl ObjectStore {
bucket_name: impl Into<String>,
endpoint: Option<impl Into<String>>,
session_token: Option<impl Into<String>>,
max_connections: NonZeroUsize,
) -> Result<Self> {
let s3 = aws::new_s3(
access_key_id,
@ -126,6 +127,7 @@ impl ObjectStore {
bucket_name,
endpoint,
session_token,
max_connections,
)?;
Ok(Self {
integration: ObjectStoreIntegration::AmazonS3(s3),