refactor: encapsulate the ObjectStore implementations in the object store crate (#1932)

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Andrew Lamb 2021-07-09 06:38:32 -04:00 committed by GitHub
parent 34dcd991d3
commit 3cb8f297b1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 245 additions and 254 deletions

View File

@ -287,58 +287,61 @@ impl ObjectStoreApi for AmazonS3 {
}
}
/// Configure a connection to Amazon S3 using the specified credentials in
/// the specified Amazon region and bucket.
///
/// Note do not expose the AmazonS3::new() function to allow it to be
/// swapped out when the aws feature is not enabled
pub(crate) fn new_s3(
access_key_id: Option<impl Into<String>>,
secret_access_key: Option<impl Into<String>>,
region: impl Into<String>,
bucket_name: impl Into<String>,
endpoint: Option<impl Into<String>>,
session_token: Option<impl Into<String>>,
) -> Result<AmazonS3> {
let region = region.into();
let region: rusoto_core::Region = match endpoint {
None => region.parse().context(InvalidRegion { region })?,
Some(endpoint) => rusoto_core::Region::Custom {
name: region,
endpoint: endpoint.into(),
},
};
let http_client = rusoto_core::request::HttpClient::new()
.expect("Current implementation of rusoto_core has no way for this to fail");
let client = match (access_key_id, secret_access_key, session_token) {
(Some(access_key_id), Some(secret_access_key), Some(session_token)) => {
let credentials_provider = StaticProvider::new(
access_key_id.into(),
secret_access_key.into(),
Some(session_token.into()),
None,
);
rusoto_s3::S3Client::new_with(http_client, credentials_provider, region)
}
(Some(access_key_id), Some(secret_access_key), None) => {
let credentials_provider =
StaticProvider::new_minimal(access_key_id.into(), secret_access_key.into());
rusoto_s3::S3Client::new_with(http_client, credentials_provider, region)
}
(None, Some(_), _) => return Err(Error::MissingAccessKey),
(Some(_), None, _) => return Err(Error::MissingSecretAccessKey),
_ => {
let credentials_provider = InstanceMetadataProvider::new();
rusoto_s3::S3Client::new_with(http_client, credentials_provider, region)
}
};
Ok(AmazonS3 {
client,
bucket_name: bucket_name.into(),
})
}
impl AmazonS3 {
/// Configure a connection to Amazon S3 using the specified credentials in
/// the specified Amazon region and bucket
pub fn new(
access_key_id: Option<impl Into<String>>,
secret_access_key: Option<impl Into<String>>,
region: impl Into<String>,
bucket_name: impl Into<String>,
endpoint: Option<impl Into<String>>,
session_token: Option<impl Into<String>>,
) -> Result<Self> {
let region = region.into();
let region: rusoto_core::Region = match endpoint {
None => region.parse().context(InvalidRegion { region })?,
Some(endpoint) => rusoto_core::Region::Custom {
name: region,
endpoint: endpoint.into(),
},
};
let http_client = rusoto_core::request::HttpClient::new()
.expect("Current implementation of rusoto_core has no way for this to fail");
let client = match (access_key_id, secret_access_key, session_token) {
(Some(access_key_id), Some(secret_access_key), Some(session_token)) => {
let credentials_provider = StaticProvider::new(
access_key_id.into(),
secret_access_key.into(),
Some(session_token.into()),
None,
);
rusoto_s3::S3Client::new_with(http_client, credentials_provider, region)
}
(Some(access_key_id), Some(secret_access_key), None) => {
let credentials_provider =
StaticProvider::new_minimal(access_key_id.into(), secret_access_key.into());
rusoto_s3::S3Client::new_with(http_client, credentials_provider, region)
}
(None, Some(_), _) => return Err(Error::MissingAccessKey),
(Some(_), None, _) => return Err(Error::MissingSecretAccessKey),
_ => {
let credentials_provider = InstanceMetadataProvider::new();
rusoto_s3::S3Client::new_with(http_client, credentials_provider, region)
}
};
Ok(Self {
client,
bucket_name: bucket_name.into(),
})
}
/// List objects with the given prefix and a set delimiter of `/`. Returns
/// common prefixes (directories) in addition to object metadata. Optionally
/// takes a continuation token for paging.
@ -444,7 +447,7 @@ mod tests {
use super::*;
use crate::{
tests::{get_nonexistent_object, list_with_delimiter, put_get_delete_list},
AmazonS3, Error as ObjectStoreError, ObjectStore, ObjectStoreApi, ObjectStorePath,
Error as ObjectStoreError, ObjectStore, ObjectStoreApi, ObjectStorePath,
};
use bytes::Bytes;
use std::env;
@ -539,16 +542,14 @@ mod tests {
async fn s3_test() {
let config = maybe_skip_integration!();
let integration = ObjectStore::new_amazon_s3(
AmazonS3::new(
Some(config.access_key_id),
Some(config.secret_access_key),
config.region,
config.bucket,
config.endpoint,
config.token,
)
.expect("Valid S3 config"),
);
Some(config.access_key_id),
Some(config.secret_access_key),
config.region,
config.bucket,
config.endpoint,
config.token,
)
.expect("Valid S3 config");
check_credentials(put_get_delete_list(&integration).await).unwrap();
check_credentials(list_with_delimiter(&integration).await).unwrap();
@ -561,16 +562,14 @@ mod tests {
config.region = "us-west-1".into();
let integration = ObjectStore::new_amazon_s3(
AmazonS3::new(
Some(config.access_key_id),
Some(config.secret_access_key),
config.region,
&config.bucket,
config.endpoint,
config.token,
)
.expect("Valid S3 config"),
);
Some(config.access_key_id),
Some(config.secret_access_key),
config.region,
&config.bucket,
config.endpoint,
config.token,
)
.expect("Valid S3 config");
let mut location = integration.new_path();
location.set_file_name(NON_EXISTENT_NAME);
@ -593,16 +592,14 @@ mod tests {
async fn s3_test_get_nonexistent_location() {
let config = maybe_skip_integration!();
let integration = ObjectStore::new_amazon_s3(
AmazonS3::new(
Some(config.access_key_id),
Some(config.secret_access_key),
config.region,
&config.bucket,
config.endpoint,
config.token,
)
.expect("Valid S3 config"),
);
Some(config.access_key_id),
Some(config.secret_access_key),
config.region,
&config.bucket,
config.endpoint,
config.token,
)
.expect("Valid S3 config");
let mut location = integration.new_path();
location.set_file_name(NON_EXISTENT_NAME);
@ -636,16 +633,14 @@ mod tests {
config.bucket = NON_EXISTENT_NAME.into();
let integration = ObjectStore::new_amazon_s3(
AmazonS3::new(
Some(config.access_key_id),
Some(config.secret_access_key),
config.region,
&config.bucket,
config.endpoint,
config.token,
)
.expect("Valid S3 config"),
);
Some(config.access_key_id),
Some(config.secret_access_key),
config.region,
&config.bucket,
config.endpoint,
config.token,
)
.expect("Valid S3 config");
let mut location = integration.new_path();
location.set_file_name(NON_EXISTENT_NAME);
@ -674,16 +669,14 @@ mod tests {
config.region = "us-west-1".into();
let integration = ObjectStore::new_amazon_s3(
AmazonS3::new(
Some(config.access_key_id),
Some(config.secret_access_key),
config.region,
&config.bucket,
config.endpoint,
config.token,
)
.expect("Valid S3 config"),
);
Some(config.access_key_id),
Some(config.secret_access_key),
config.region,
&config.bucket,
config.endpoint,
config.token,
)
.expect("Valid S3 config");
let mut location = integration.new_path();
location.set_file_name(NON_EXISTENT_NAME);
@ -722,16 +715,14 @@ mod tests {
config.bucket = NON_EXISTENT_NAME.into();
let integration = ObjectStore::new_amazon_s3(
AmazonS3::new(
Some(config.access_key_id),
Some(config.secret_access_key),
config.region,
&config.bucket,
config.endpoint,
config.token,
)
.expect("Valid S3 config"),
);
Some(config.access_key_id),
Some(config.secret_access_key),
config.region,
&config.bucket,
config.endpoint,
config.token,
)
.expect("Valid S3 config");
let mut location = integration.new_path();
location.set_file_name(NON_EXISTENT_NAME);
@ -768,16 +759,14 @@ mod tests {
async fn s3_test_delete_nonexistent_location() {
let config = maybe_skip_integration!();
let integration = ObjectStore::new_amazon_s3(
AmazonS3::new(
Some(config.access_key_id),
Some(config.secret_access_key),
config.region,
config.bucket,
config.endpoint,
config.token,
)
.expect("Valid S3 config"),
);
Some(config.access_key_id),
Some(config.secret_access_key),
config.region,
config.bucket,
config.endpoint,
config.token,
)
.expect("Valid S3 config");
let mut location = integration.new_path();
location.set_file_name(NON_EXISTENT_NAME);
@ -794,16 +783,14 @@ mod tests {
config.region = "us-west-1".into();
let integration = ObjectStore::new_amazon_s3(
AmazonS3::new(
Some(config.access_key_id),
Some(config.secret_access_key),
config.region,
&config.bucket,
config.endpoint,
config.token,
)
.expect("Valid S3 config"),
);
Some(config.access_key_id),
Some(config.secret_access_key),
config.region,
&config.bucket,
config.endpoint,
config.token,
)
.expect("Valid S3 config");
let mut location = integration.new_path();
location.set_file_name(NON_EXISTENT_NAME);
@ -832,16 +819,14 @@ mod tests {
config.bucket = NON_EXISTENT_NAME.into();
let integration = ObjectStore::new_amazon_s3(
AmazonS3::new(
Some(config.access_key_id),
Some(config.secret_access_key),
config.region,
&config.bucket,
config.endpoint,
config.token,
)
.expect("Valid S3 config"),
);
Some(config.access_key_id),
Some(config.secret_access_key),
config.region,
&config.bucket,
config.endpoint,
config.token,
)
.expect("Valid S3 config");
let mut location = integration.new_path();
location.set_file_name(NON_EXISTENT_NAME);

View File

@ -241,41 +241,38 @@ impl ObjectStoreApi for MicrosoftAzure {
}
}
impl MicrosoftAzure {
/// Configure a connection to container with given name on Microsoft Azure
/// Blob store.
///
/// The credentials `account` and `access_key` must provide access to the
/// store.
pub fn new(
account: impl Into<String>,
access_key: impl Into<String>,
container_name: impl Into<String>,
) -> Self {
let account = account.into();
let access_key = access_key.into();
// From https://github.com/Azure/azure-sdk-for-rust/blob/master/sdk/storage/examples/blob_00.rs#L29
let http_client: Arc<Box<dyn HttpClient>> = Arc::new(Box::new(reqwest::Client::new()));
/// Configure a connection to container with given name on Microsoft Azure
/// Blob store.
///
/// The credentials `account` and `access_key` must provide access to the
/// store.
pub fn new_azure(
account: impl Into<String>,
access_key: impl Into<String>,
container_name: impl Into<String>,
) -> Result<MicrosoftAzure> {
let account = account.into();
let access_key = access_key.into();
// From https://github.com/Azure/azure-sdk-for-rust/blob/master/sdk/storage/examples/blob_00.rs#L29
let http_client: Arc<Box<dyn HttpClient>> = Arc::new(Box::new(reqwest::Client::new()));
let storage_account_client =
StorageAccountClient::new_access_key(Arc::clone(&http_client), &account, &access_key);
let storage_account_client =
StorageAccountClient::new_access_key(Arc::clone(&http_client), &account, &access_key);
let storage_client = storage_account_client.as_storage_client();
let storage_client = storage_account_client.as_storage_client();
let container_name = container_name.into();
let container_name = container_name.into();
let container_client = storage_client.as_container_client(&container_name);
let container_client = storage_client.as_container_client(&container_name);
Self {
container_client,
container_name,
}
}
Ok(MicrosoftAzure {
container_client,
container_name,
})
}
#[cfg(test)]
mod tests {
use super::*;
use crate::tests::{list_with_delimiter, put_get_delete_list};
use crate::ObjectStore;
use std::env;
@ -341,11 +338,12 @@ mod tests {
#[tokio::test]
async fn azure_blob_test() {
let config = maybe_skip_integration!();
let integration = ObjectStore::new_microsoft_azure(MicrosoftAzure::new(
let integration = ObjectStore::new_microsoft_azure(
config.storage_account,
config.access_key,
config.bucket,
));
)
.unwrap();
put_get_delete_list(&integration).await.unwrap();
list_with_delimiter(&integration).await.unwrap();

View File

@ -251,21 +251,19 @@ impl ObjectStoreApi for GoogleCloudStorage {
}
}
impl GoogleCloudStorage {
/// Configure a connection to Google Cloud Storage.
pub fn new(
service_account_path: impl AsRef<std::ffi::OsStr>,
bucket_name: impl Into<String>,
) -> Self {
// The cloud storage crate currently only supports authentication via
// environment variables. Set the environment variable explicitly so
// that we can optionally accept command line arguments instead.
env::set_var("SERVICE_ACCOUNT", service_account_path);
Self {
client: Default::default(),
bucket_name: bucket_name.into(),
}
}
/// Configure a connection to Google Cloud Storage.
pub fn new_gcs(
service_account_path: impl AsRef<std::ffi::OsStr>,
bucket_name: impl Into<String>,
) -> Result<GoogleCloudStorage> {
// The cloud storage crate currently only supports authentication via
// environment variables. Set the environment variable explicitly so
// that we can optionally accept command line arguments instead.
env::set_var("SERVICE_ACCOUNT", service_account_path);
Ok(GoogleCloudStorage {
client: Default::default(),
bucket_name: bucket_name.into(),
})
}
#[cfg(test)]
@ -273,8 +271,7 @@ mod test {
use super::*;
use crate::{
tests::{get_nonexistent_object, list_with_delimiter, put_get_delete_list},
Error as ObjectStoreError, GoogleCloudStorage, ObjectStore, ObjectStoreApi,
ObjectStorePath,
Error as ObjectStoreError, ObjectStore, ObjectStoreApi, ObjectStorePath,
};
use bytes::Bytes;
use std::env;
@ -334,10 +331,8 @@ mod test {
#[tokio::test]
async fn gcs_test() {
let config = maybe_skip_integration!();
let integration = ObjectStore::new_google_cloud_storage(GoogleCloudStorage::new(
config.service_account,
config.bucket,
));
let integration =
ObjectStore::new_google_cloud_storage(config.service_account, config.bucket).unwrap();
put_get_delete_list(&integration).await.unwrap();
list_with_delimiter(&integration).await.unwrap();
@ -346,10 +341,8 @@ mod test {
#[tokio::test]
async fn gcs_test_get_nonexistent_location() {
let config = maybe_skip_integration!();
let integration = ObjectStore::new_google_cloud_storage(GoogleCloudStorage::new(
config.service_account,
&config.bucket,
));
let integration =
ObjectStore::new_google_cloud_storage(config.service_account, &config.bucket).unwrap();
let mut location = integration.new_path();
location.set_file_name(NON_EXISTENT_NAME);
@ -379,10 +372,8 @@ mod test {
async fn gcs_test_get_nonexistent_bucket() {
let mut config = maybe_skip_integration!();
config.bucket = NON_EXISTENT_NAME.into();
let integration = ObjectStore::new_google_cloud_storage(GoogleCloudStorage::new(
config.service_account,
&config.bucket,
));
let integration =
ObjectStore::new_google_cloud_storage(config.service_account, &config.bucket).unwrap();
let mut location = integration.new_path();
location.set_file_name(NON_EXISTENT_NAME);
@ -405,10 +396,8 @@ mod test {
#[tokio::test]
async fn gcs_test_delete_nonexistent_location() {
let config = maybe_skip_integration!();
let integration = ObjectStore::new_google_cloud_storage(GoogleCloudStorage::new(
config.service_account,
&config.bucket,
));
let integration =
ObjectStore::new_google_cloud_storage(config.service_account, &config.bucket).unwrap();
let mut location = integration.new_path();
location.set_file_name(NON_EXISTENT_NAME);
@ -436,10 +425,8 @@ mod test {
async fn gcs_test_delete_nonexistent_bucket() {
let mut config = maybe_skip_integration!();
config.bucket = NON_EXISTENT_NAME.into();
let integration = ObjectStore::new_google_cloud_storage(GoogleCloudStorage::new(
config.service_account,
&config.bucket,
));
let integration =
ObjectStore::new_google_cloud_storage(config.service_account, &config.bucket).unwrap();
let mut location = integration.new_path();
location.set_file_name(NON_EXISTENT_NAME);
@ -467,10 +454,8 @@ mod test {
async fn gcs_test_put_nonexistent_bucket() {
let mut config = maybe_skip_integration!();
config.bucket = NON_EXISTENT_NAME.into();
let integration = ObjectStore::new_google_cloud_storage(GoogleCloudStorage::new(
config.service_account,
&config.bucket,
));
let integration =
ObjectStore::new_google_cloud_storage(config.service_account, &config.bucket).unwrap();
let mut location = integration.new_path();
location.set_file_name(NON_EXISTENT_NAME);

View File

@ -17,11 +17,11 @@
//!
//! Future compatibility will include Azure Blob Storage, Minio, and Ceph.
pub mod aws;
pub mod azure;
mod aws;
mod azure;
mod buffer;
pub mod disk;
pub mod gcp;
mod gcp;
pub mod memory;
pub mod path;
pub mod throttle;
@ -93,13 +93,32 @@ pub struct ObjectStore(pub ObjectStoreIntegration);
impl ObjectStore {
/// Configure a connection to Amazon S3.
pub fn new_amazon_s3(s3: AmazonS3) -> Self {
Self(ObjectStoreIntegration::AmazonS3(s3))
pub fn new_amazon_s3(
access_key_id: Option<impl Into<String>>,
secret_access_key: Option<impl Into<String>>,
region: impl Into<String>,
bucket_name: impl Into<String>,
endpoint: Option<impl Into<String>>,
session_token: Option<impl Into<String>>,
) -> Result<Self> {
let s3 = aws::new_s3(
access_key_id,
secret_access_key,
region,
bucket_name,
endpoint,
session_token,
)?;
Ok(Self(ObjectStoreIntegration::AmazonS3(s3)))
}
/// Configure a connection to Google Cloud Storage.
pub fn new_google_cloud_storage(gcs: GoogleCloudStorage) -> Self {
Self(ObjectStoreIntegration::GoogleCloudStorage(gcs))
pub fn new_google_cloud_storage(
service_account_path: impl AsRef<std::ffi::OsStr>,
bucket_name: impl Into<String>,
) -> Result<Self> {
let gcs = gcp::new_gcs(service_account_path, bucket_name)?;
Ok(Self(ObjectStoreIntegration::GoogleCloudStorage(gcs)))
}
/// Configure in-memory storage.
@ -118,8 +137,15 @@ impl ObjectStore {
}
/// Configure a connection to Microsoft Azure Blob store.
pub fn new_microsoft_azure(azure: MicrosoftAzure) -> Self {
Self(ObjectStoreIntegration::MicrosoftAzure(Box::new(azure)))
pub fn new_microsoft_azure(
account: impl Into<String>,
access_key: impl Into<String>,
container_name: impl Into<String>,
) -> Result<Self> {
let azure = azure::new_azure(account, access_key, container_name)?;
Ok(Self(ObjectStoreIntegration::MicrosoftAzure(Box::new(
azure,
))))
}
/// Create implementation-specific path from parsed representation.

View File

@ -1875,16 +1875,14 @@ mod tests {
async fn init_error_generic() {
// use an object store that will hopefully fail to read
let store = ObjectStore::new_amazon_s3(
object_store::aws::AmazonS3::new(
Some("foo".to_string()),
Some("bar".to_string()),
"us-east-1".to_string(),
"bucket".to_string(),
None as Option<String>,
None as Option<String>,
)
.unwrap(),
);
Some("foo".to_string()),
Some("bar".to_string()),
"us-east-1".to_string(),
"bucket".to_string(),
None as Option<String>,
None as Option<String>,
)
.unwrap();
let manager = TestConnectionManager::new();
let config = config_with_store(store);

View File

@ -1,10 +1,7 @@
use crate::commands::run::{Config, ObjectStore as ObjStoreOpt};
use futures::{future::FusedFuture, pin_mut, FutureExt, TryStreamExt};
use hyper::server::conn::AddrIncoming;
use object_store::{
self, aws::AmazonS3, azure::MicrosoftAzure, gcp::GoogleCloudStorage, path::ObjectStorePath,
ObjectStore, ObjectStoreApi,
};
use object_store::{self, path::ObjectStorePath, ObjectStore, ObjectStoreApi};
use observability_deps::tracing::{self, error, info, warn, Instrument};
use panic_logging::SendPanicsToTracing;
use server::{
@ -69,7 +66,13 @@ pub enum Error {
// not *parseable* as a rusoto `Region`. The other object store constructors
// don't return `Result`.
#[snafu(display("Amazon S3 configuration was invalid: {}", source))]
InvalidS3Config { source: object_store::aws::Error },
InvalidS3Config { source: object_store::Error },
#[snafu(display("GCS configuration was invalid: {}", source))]
InvalidGCSConfig { source: object_store::Error },
#[snafu(display("Microsoft Azure configuration was invalid: {}", source))]
InvalidAzureConfig { source: object_store::Error },
#[snafu(display("Cannot read from object store: {}", source))]
CannotReadObjectStore { source: object_store::Error },
@ -338,9 +341,10 @@ impl TryFrom<&Config> for ObjectStore {
config.bucket.as_ref(),
config.google_service_account.as_ref(),
) {
(Some(bucket), Some(service_account)) => Ok(Self::new_google_cloud_storage(
GoogleCloudStorage::new(service_account, bucket),
)),
(Some(bucket), Some(service_account)) => {
Self::new_google_cloud_storage(service_account, bucket)
.context(InvalidGCSConfig)
}
(bucket, service_account) => {
let mut missing_args = vec![];
@ -369,17 +373,15 @@ impl TryFrom<&Config> for ObjectStore {
config.aws_session_token.as_ref(),
) {
(Some(bucket), key_id, secret_key, region, endpoint, session_token) => {
Ok(Self::new_amazon_s3(
AmazonS3::new(
key_id,
secret_key,
region,
bucket,
endpoint,
session_token,
)
.context(InvalidS3Config)?,
))
Self::new_amazon_s3(
key_id,
secret_key,
region,
bucket,
endpoint,
session_token,
)
.context(InvalidS3Config)
}
(bucket, _, _, _, _, _) => {
let mut missing_args = vec![];
@ -403,11 +405,8 @@ impl TryFrom<&Config> for ObjectStore {
config.azure_storage_access_key.as_ref(),
) {
(Some(bucket), Some(storage_account), Some(access_key)) => {
Ok(Self::new_microsoft_azure(MicrosoftAzure::new(
storage_account,
access_key,
bucket,
)))
Self::new_microsoft_azure(storage_account, access_key, bucket)
.context(InvalidAzureConfig)
}
(bucket, storage_account, access_key) => {
let mut missing_args = vec![];