diff --git a/docs/env.example b/docs/env.example index 2761cb9cd3..ed4f28a5c6 100644 --- a/docs/env.example +++ b/docs/env.example @@ -46,7 +46,7 @@ # The name of a container you've created in the storage account, under Blob Service > Containers # INFLUXDB_IOX_BUCKET= # In the Storage account's Settings > Access keys, one of the Key values -# AZURE_STORAGE_MASTER_KEY= +# AZURE_STORAGE_ACCESS_KEY= # # To enable Jaeger tracing: # OTEL_SERVICE_NAME="iox" # defaults to iox diff --git a/object_store/src/aws.rs b/object_store/src/aws.rs index b982ac4e68..191cb6bfd6 100644 --- a/object_store/src/aws.rs +++ b/object_store/src/aws.rs @@ -13,7 +13,7 @@ use futures::{ Stream, StreamExt, TryStreamExt, }; use rusoto_core::ByteStream; -use rusoto_credential::ChainProvider; +use rusoto_credential::StaticProvider; use rusoto_s3::S3; use snafu::{futures::TryStreamExt as _, OptionExt, ResultExt, Snafu}; use std::convert::TryFrom; @@ -98,6 +98,16 @@ pub enum Error { #[snafu(display("Unable to buffer data into temporary file, Error: {}", source))] UnableToBufferStream { source: std::io::Error }, + + #[snafu(display( + "Could not parse `{}` as an AWS region. Regions should look like `us-east-2`. {:?}", + region, + source + ))] + InvalidRegion { + region: String, + source: rusoto_core::region::ParseRegionError, + }, } /// Configuration for connecting to [Amazon S3](https://aws.amazon.com/s3/). @@ -272,27 +282,27 @@ impl ObjectStoreApi for AmazonS3 { } impl AmazonS3 { - /// Configure a connection to Amazon S3 in the specified Amazon region and - /// bucket. Uses [`rusoto_credential::ChainProvider`][cp] to check for - /// credentials in: - /// - /// 1. Environment variables: `AWS_ACCESS_KEY_ID` and - /// `AWS_SECRET_ACCESS_KEY` - /// 2. `credential_process` command in the AWS config file, usually located - /// at `~/.aws/config`. - /// 3. AWS credentials file. Usually located at `~/.aws/credentials`. - /// 4. IAM instance profile. Will only work if running on an EC2 instance - /// with an instance profile/role. - /// - /// [cp]: https://docs.rs/rusoto_credential/0.43.0/rusoto_credential/struct.ChainProvider.html - pub fn new(region: rusoto_core::Region, bucket_name: impl Into) -> Self { + /// Configure a connection to Amazon S3 using the specified credentials in + /// the specified Amazon region and bucket + pub fn new( + access_key_id: impl Into, + secret_access_key: impl Into, + region: impl Into, + bucket_name: impl Into, + ) -> Result { + let region = region.into(); + let region: rusoto_core::Region = region.parse().context(InvalidRegion { region })?; + let http_client = rusoto_core::request::HttpClient::new() .expect("Current implementation of rusoto_core has no way for this to fail"); - let credentials_provider = ChainProvider::new(); - Self { + + let credentials_provider = + StaticProvider::new_minimal(access_key_id.into(), secret_access_key.into()); + + Ok(Self { client: rusoto_s3::S3Client::new_with(http_client, credentials_provider, region), bucket_name: bucket_name.into(), - } + }) } /// List objects with the given prefix and a set delimiter of `/`. Returns @@ -412,64 +422,63 @@ mod tests { const NON_EXISTENT_NAME: &str = "nonexistentname"; + #[derive(Debug)] + struct AwsConfig { + access_key_id: String, + secret_access_key: String, + region: String, + bucket: String, + } + // Helper macro to skip tests if the AWS environment variables are not set. // Skips become hard errors if TEST_INTEGRATION is set. macro_rules! maybe_skip_integration { - () => { + () => {{ dotenv::dotenv().ok(); - let region = env::var("AWS_DEFAULT_REGION"); - let bucket_name = env::var("INFLUXDB_IOX_BUCKET"); - let force = std::env::var("TEST_INTEGRATION"); + let required_vars = [ + "AWS_DEFAULT_REGION", + "INFLUXDB_IOX_BUCKET", + "AWS_ACCESS_KEY_ID", + "AWS_SECRET_ACCESS_KEY", + ]; + let unset_vars: Vec<_> = required_vars + .iter() + .filter_map(|&name| match env::var(name) { + Ok(_) => None, + Err(_) => Some(name), + }) + .collect(); + let unset_var_names = unset_vars.join(", "); - match (region.is_ok(), bucket_name.is_ok(), force.is_ok()) { - (false, false, true) => { - panic!( - "TEST_INTEGRATION is set, \ - but AWS_DEFAULT_REGION and INFLUXDB_IOX_BUCKET are not" - ) + let force = env::var("TEST_INTEGRATION"); + + if force.is_ok() && !unset_var_names.is_empty() { + panic!( + "TEST_INTEGRATION is set, \ + but variable(s) {} need to be set", + unset_var_names + ) + } else if force.is_err() && !unset_var_names.is_empty() { + eprintln!( + "skipping AWS integration test - set \ + {} to run", + unset_var_names + ); + return Ok(()); + } else { + AwsConfig { + access_key_id: env::var("AWS_ACCESS_KEY_ID") + .expect("already checked AWS_ACCESS_KEY_ID"), + secret_access_key: env::var("AWS_SECRET_ACCESS_KEY") + .expect("already checked AWS_SECRET_ACCESS_KEY"), + region: env::var("AWS_DEFAULT_REGION") + .expect("already checked AWS_DEFAULT_REGION"), + bucket: env::var("INFLUXDB_IOX_BUCKET") + .expect("already checked INFLUXDB_IOX_BUCKET"), } - (false, true, true) => { - panic!("TEST_INTEGRATION is set, but AWS_DEFAULT_REGION is not") - } - (true, false, true) => { - panic!("TEST_INTEGRATION is set, but INFLUXDB_IOX_BUCKET is not") - } - (false, false, false) => { - eprintln!( - "skipping integration test - set \ - AWS_DEFAULT_REGION and INFLUXDB_IOX_BUCKET to run" - ); - return Ok(()); - } - (false, true, false) => { - eprintln!("skipping integration test - set AWS_DEFAULT_REGION to run"); - return Ok(()); - } - (true, false, false) => { - eprintln!("skipping integration test - set INFLUXDB_IOX_BUCKET to run"); - return Ok(()); - } - _ => {} } - }; - } - - // Helper to get region and bucket from environment variables. Call the - // `maybe_skip_integration!` macro before calling this to skip the test if these - // aren't set; if you don't call that macro, the tests will fail if - // these env vars aren't set. - // - // `AWS_DEFAULT_REGION` should be a value like `us-east-2`. - fn region_and_bucket_name() -> Result<(rusoto_core::Region, String)> { - let region = env::var("AWS_DEFAULT_REGION").map_err(|_| { - "The environment variable AWS_DEFAULT_REGION must be set \ - to a value like `us-east-2`" - })?; - let bucket_name = env::var("INFLUXDB_IOX_BUCKET") - .map_err(|_| "The environment variable INFLUXDB_IOX_BUCKET must be set")?; - - Ok((region.parse()?, bucket_name)) + }}; } fn check_credentials(r: Result) -> Result { @@ -490,12 +499,16 @@ mod tests { #[tokio::test] async fn s3_test() -> Result<()> { - maybe_skip_integration!(); - let (region, bucket_name) = region_and_bucket_name()?; + let config = maybe_skip_integration!(); + let integration = AmazonS3::new( + config.access_key_id, + config.secret_access_key, + config.region, + config.bucket, + ) + .expect("Valid S3 config"); - let integration = AmazonS3::new(region, &bucket_name); check_credentials(put_get_delete_list(&integration).await)?; - check_credentials(list_with_delimiter(&integration).await).unwrap(); Ok(()) @@ -503,11 +516,18 @@ mod tests { #[tokio::test] async fn s3_test_get_nonexistent_region() -> Result<()> { - maybe_skip_integration!(); + let mut config = maybe_skip_integration!(); // Assumes environment variables do not provide credentials to AWS US West 1 - let (_, bucket_name) = region_and_bucket_name()?; - let region = rusoto_core::Region::UsWest1; - let integration = AmazonS3::new(region, &bucket_name); + config.region = "us-west-1".into(); + + let integration = AmazonS3::new( + config.access_key_id, + config.secret_access_key, + config.region, + &config.bucket, + ) + .expect("Valid S3 config"); + let mut location = integration.new_path(); location.set_file_name(NON_EXISTENT_NAME); @@ -516,7 +536,7 @@ mod tests { .unwrap_err(); if let Some(Error::UnableToListData { source, bucket }) = err.downcast_ref::() { assert!(matches!(source, rusoto_core::RusotoError::Unknown(_))); - assert_eq!(bucket, &bucket_name); + assert_eq!(bucket, &config.bucket); } else { panic!("unexpected error type") } @@ -526,9 +546,15 @@ mod tests { #[tokio::test] async fn s3_test_get_nonexistent_location() -> Result<()> { - maybe_skip_integration!(); - let (region, bucket_name) = region_and_bucket_name()?; - let integration = AmazonS3::new(region, &bucket_name); + let config = maybe_skip_integration!(); + let integration = AmazonS3::new( + config.access_key_id, + config.secret_access_key, + config.region, + &config.bucket, + ) + .expect("Valid S3 config"); + let mut location = integration.new_path(); location.set_file_name(NON_EXISTENT_NAME); @@ -545,7 +571,7 @@ mod tests { source, rusoto_core::RusotoError::Service(rusoto_s3::GetObjectError::NoSuchKey(_)) )); - assert_eq!(bucket, &bucket_name); + assert_eq!(bucket, &config.bucket); assert_eq!(location, NON_EXISTENT_NAME); } else { panic!("unexpected error type") @@ -556,10 +582,17 @@ mod tests { #[tokio::test] async fn s3_test_get_nonexistent_bucket() -> Result<()> { - maybe_skip_integration!(); - let (region, _) = region_and_bucket_name()?; - let bucket_name = NON_EXISTENT_NAME; - let integration = AmazonS3::new(region, bucket_name); + let mut config = maybe_skip_integration!(); + config.bucket = NON_EXISTENT_NAME.into(); + + let integration = AmazonS3::new( + config.access_key_id, + config.secret_access_key, + config.region, + &config.bucket, + ) + .expect("Valid S3 config"); + let mut location = integration.new_path(); location.set_file_name(NON_EXISTENT_NAME); @@ -571,7 +604,7 @@ mod tests { source, rusoto_core::RusotoError::Service(rusoto_s3::ListObjectsV2Error::NoSuchBucket(_)) )); - assert_eq!(bucket, bucket_name); + assert_eq!(bucket, &config.bucket); } else { panic!("unexpected error type") } @@ -581,11 +614,18 @@ mod tests { #[tokio::test] async fn s3_test_put_nonexistent_region() -> Result<()> { - maybe_skip_integration!(); + let mut config = maybe_skip_integration!(); // Assumes environment variables do not provide credentials to AWS US West 1 - let (_, bucket_name) = region_and_bucket_name()?; - let region = rusoto_core::Region::UsWest1; - let integration = AmazonS3::new(region, &bucket_name); + config.region = "us-west-1".into(); + + let integration = AmazonS3::new( + config.access_key_id, + config.secret_access_key, + config.region, + &config.bucket, + ) + .expect("Valid S3 config"); + let mut location = integration.new_path(); location.set_file_name(NON_EXISTENT_NAME); let data = Bytes::from("arbitrary data"); @@ -607,7 +647,7 @@ mod tests { } = err { assert!(matches!(source, rusoto_core::RusotoError::Unknown(_))); - assert_eq!(bucket, bucket_name); + assert_eq!(bucket, config.bucket); assert_eq!(location, NON_EXISTENT_NAME); } else { panic!("unexpected error type") @@ -618,10 +658,17 @@ mod tests { #[tokio::test] async fn s3_test_put_nonexistent_bucket() -> Result<()> { - maybe_skip_integration!(); - let (region, _) = region_and_bucket_name()?; - let bucket_name = NON_EXISTENT_NAME; - let integration = AmazonS3::new(region, bucket_name); + let mut config = maybe_skip_integration!(); + config.bucket = NON_EXISTENT_NAME.into(); + + let integration = AmazonS3::new( + config.access_key_id, + config.secret_access_key, + config.region, + &config.bucket, + ) + .expect("Valid S3 config"); + let mut location = integration.new_path(); location.set_file_name(NON_EXISTENT_NAME); let data = Bytes::from("arbitrary data"); @@ -643,7 +690,7 @@ mod tests { } = err { assert!(matches!(source, rusoto_core::RusotoError::Unknown(_))); - assert_eq!(bucket, bucket_name); + assert_eq!(bucket, config.bucket); assert_eq!(location, NON_EXISTENT_NAME); } else { panic!("unexpected error type") @@ -654,9 +701,15 @@ mod tests { #[tokio::test] async fn s3_test_delete_nonexistent_location() -> Result<()> { - maybe_skip_integration!(); - let (region, bucket_name) = region_and_bucket_name()?; - let integration = AmazonS3::new(region, &bucket_name); + let config = maybe_skip_integration!(); + let integration = AmazonS3::new( + config.access_key_id, + config.secret_access_key, + config.region, + config.bucket, + ) + .expect("Valid S3 config"); + let mut location = integration.new_path(); location.set_file_name(NON_EXISTENT_NAME); @@ -669,11 +722,18 @@ mod tests { #[tokio::test] async fn s3_test_delete_nonexistent_region() -> Result<()> { - maybe_skip_integration!(); + let mut config = maybe_skip_integration!(); // Assumes environment variables do not provide credentials to AWS US West 1 - let (_, bucket_name) = region_and_bucket_name()?; - let region = rusoto_core::Region::UsWest1; - let integration = AmazonS3::new(region, &bucket_name); + config.region = "us-west-1".into(); + + let integration = AmazonS3::new( + config.access_key_id, + config.secret_access_key, + config.region, + &config.bucket, + ) + .expect("Valid S3 config"); + let mut location = integration.new_path(); location.set_file_name(NON_EXISTENT_NAME); @@ -685,7 +745,7 @@ mod tests { } = err { assert!(matches!(source, rusoto_core::RusotoError::Unknown(_))); - assert_eq!(bucket, bucket_name); + assert_eq!(bucket, config.bucket); assert_eq!(location, NON_EXISTENT_NAME); } else { panic!("unexpected error type") @@ -696,10 +756,17 @@ mod tests { #[tokio::test] async fn s3_test_delete_nonexistent_bucket() -> Result<()> { - maybe_skip_integration!(); - let (region, _) = region_and_bucket_name()?; - let bucket_name = NON_EXISTENT_NAME; - let integration = AmazonS3::new(region, bucket_name); + let mut config = maybe_skip_integration!(); + config.bucket = NON_EXISTENT_NAME.into(); + + let integration = AmazonS3::new( + config.access_key_id, + config.secret_access_key, + config.region, + &config.bucket, + ) + .expect("Valid S3 config"); + let mut location = integration.new_path(); location.set_file_name(NON_EXISTENT_NAME); @@ -711,7 +778,7 @@ mod tests { } = err { assert!(matches!(source, rusoto_core::RusotoError::Unknown(_))); - assert_eq!(bucket, bucket_name); + assert_eq!(bucket, config.bucket); assert_eq!(location, NON_EXISTENT_NAME); } else { panic!("unexpected error type") diff --git a/object_store/src/azure.rs b/object_store/src/azure.rs index 99085a763b..6f821381e8 100644 --- a/object_store/src/azure.rs +++ b/object_store/src/azure.rs @@ -245,14 +245,20 @@ impl MicrosoftAzure { /// Configure a connection to container with given name on Microsoft Azure /// Blob store. /// - /// The credentials `account` and `master_key` must provide access to the + /// The credentials `account` and `access_key` must provide access to the /// store. - pub fn new(account: String, master_key: String, container_name: impl Into) -> Self { + pub fn new( + account: impl Into, + access_key: impl Into, + container_name: impl Into, + ) -> Self { + let account = account.into(); + let access_key = access_key.into(); // From https://github.com/Azure/azure-sdk-for-rust/blob/master/sdk/storage/examples/blob_00.rs#L29 let http_client: Arc> = Arc::new(Box::new(reqwest::Client::new())); let storage_account_client = - StorageAccountClient::new_access_key(Arc::clone(&http_client), &account, &master_key); + StorageAccountClient::new_access_key(Arc::clone(&http_client), &account, &access_key); let storage_client = storage_account_client.as_storage_client(); @@ -265,21 +271,6 @@ impl MicrosoftAzure { container_name, } } - - /// Configure a connection to container with given name on Microsoft Azure - /// Blob store. - /// - /// The credentials `account` and `master_key` must be set via the - /// environment variables `AZURE_STORAGE_ACCOUNT` and - /// `AZURE_STORAGE_MASTER_KEY` respectively. - pub fn new_from_env(container_name: impl Into) -> Self { - let account = std::env::var("AZURE_STORAGE_ACCOUNT") - .expect("Set env variable AZURE_STORAGE_ACCOUNT first!"); - let master_key = std::env::var("AZURE_STORAGE_MASTER_KEY") - .expect("Set env variable AZURE_STORAGE_MASTER_KEY first!"); - - Self::new(account, master_key, container_name) - } } #[cfg(test)] @@ -291,16 +282,23 @@ mod tests { type Error = Box; type Result = std::result::Result; - // Helper macro to skip tests if the GCP environment variables are not set. + #[derive(Debug)] + struct AzureConfig { + storage_account: String, + access_key: String, + bucket: String, + } + + // Helper macro to skip tests if the Azure environment variables are not set. // Skips become hard errors if TEST_INTEGRATION is set. macro_rules! maybe_skip_integration { - () => { + () => {{ dotenv::dotenv().ok(); let required_vars = [ "AZURE_STORAGE_ACCOUNT", "INFLUXDB_IOX_BUCKET", - "AZURE_STORAGE_MASTER_KEY", + "AZURE_STORAGE_ACCESS_KEY", ]; let unset_vars: Vec<_> = required_vars .iter() @@ -326,17 +324,24 @@ mod tests { unset_var_names ); return Ok(()); + } else { + AzureConfig { + storage_account: env::var("AZURE_STORAGE_ACCOUNT") + .expect("already checked AZURE_STORAGE_ACCOUNT"), + access_key: env::var("AZURE_STORAGE_ACCESS_KEY") + .expect("already checked AZURE_STORAGE_ACCESS_KEY"), + bucket: env::var("INFLUXDB_IOX_BUCKET") + .expect("already checked INFLUXDB_IOX_BUCKET"), + } } - }; + }}; } #[tokio::test] async fn azure_blob_test() -> Result<()> { - maybe_skip_integration!(); - - let container_name = env::var("INFLUXDB_IOX_BUCKET") - .map_err(|_| "The environment variable INFLUXDB_IOX_BUCKET must be set")?; - let integration = MicrosoftAzure::new_from_env(container_name); + let config = maybe_skip_integration!(); + let integration = + MicrosoftAzure::new(config.storage_account, config.access_key, config.bucket); put_get_delete_list(&integration).await?; list_with_delimiter(&integration).await?; diff --git a/object_store/src/gcp.rs b/object_store/src/gcp.rs index 8a7021e744..91dcd3abde 100644 --- a/object_store/src/gcp.rs +++ b/object_store/src/gcp.rs @@ -8,7 +8,7 @@ use async_trait::async_trait; use bytes::Bytes; use futures::{stream::BoxStream, Stream, StreamExt, TryStreamExt}; use snafu::{ensure, futures::TryStreamExt as _, ResultExt, Snafu}; -use std::{convert::TryFrom, io}; +use std::{convert::TryFrom, env, io}; /// A specialized `Result` for Google Cloud Storage object store-related errors pub type Result = std::result::Result; @@ -239,7 +239,14 @@ impl ObjectStoreApi for GoogleCloudStorage { impl GoogleCloudStorage { /// Configure a connection to Google Cloud Storage. - pub fn new(bucket_name: impl Into) -> Self { + pub fn new( + service_account_path: impl AsRef, + bucket_name: impl Into, + ) -> Self { + // The cloud storage crate currently only supports authentication via + // environment variables. Set the environment variable explicitly so + // that we can optionally accept command line arguments instead. + env::set_var("SERVICE_ACCOUNT", service_account_path); Self { bucket_name: bucket_name.into(), } @@ -261,39 +268,59 @@ mod test { const NON_EXISTENT_NAME: &str = "nonexistentname"; + #[derive(Debug)] + struct GoogleCloudConfig { + bucket: String, + service_account: String, + } + // Helper macro to skip tests if the GCP environment variables are not set. // Skips become hard errors if TEST_INTEGRATION is set. macro_rules! maybe_skip_integration { - () => { + () => {{ dotenv::dotenv().ok(); - let bucket_name = env::var("INFLUXDB_IOX_BUCKET"); + let required_vars = ["INFLUXDB_IOX_BUCKET", "GOOGLE_SERVICE_ACCOUNT"]; + let unset_vars: Vec<_> = required_vars + .iter() + .filter_map(|&name| match env::var(name) { + Ok(_) => None, + Err(_) => Some(name), + }) + .collect(); + let unset_var_names = unset_vars.join(", "); + let force = std::env::var("TEST_INTEGRATION"); - match (bucket_name.is_ok(), force.is_ok()) { - (false, true) => { - panic!("TEST_INTEGRATION is set, but INFLUXDB_IOX_BUCKET is not") + if force.is_ok() && !unset_var_names.is_empty() { + panic!( + "TEST_INTEGRATION is set, \ + but variable(s) {} need to be set", + unset_var_names + ) + } else if force.is_err() && !unset_var_names.is_empty() { + eprintln!( + "skipping Google Cloud integration test - set \ + {} to run", + unset_var_names + ); + return Ok(()); + } else { + GoogleCloudConfig { + bucket: env::var("INFLUXDB_IOX_BUCKET") + .expect("already checked INFLUXDB_IOX_BUCKET"), + service_account: env::var("GOOGLE_SERVICE_ACCOUNT") + .expect("already checked GOOGLE_SERVICE_ACCOUNT"), } - (false, false) => { - eprintln!("skipping integration test - set INFLUXDB_IOX_BUCKET to run"); - return Ok(()); - } - _ => {} } - }; - } - - fn bucket_name() -> Result { - Ok(env::var("INFLUXDB_IOX_BUCKET") - .map_err(|_| "The environment variable INFLUXDB_IOX_BUCKET must be set")?) + }}; } #[tokio::test] async fn gcs_test() -> Result<()> { - maybe_skip_integration!(); - let bucket_name = bucket_name()?; + let config = maybe_skip_integration!(); + let integration = GoogleCloudStorage::new(config.service_account, config.bucket); - let integration = GoogleCloudStorage::new(&bucket_name); put_get_delete_list(&integration).await?; list_with_delimiter(&integration).await?; Ok(()) @@ -301,9 +328,8 @@ mod test { #[tokio::test] async fn gcs_test_get_nonexistent_location() -> Result<()> { - maybe_skip_integration!(); - let bucket_name = bucket_name()?; - let integration = GoogleCloudStorage::new(&bucket_name); + let config = maybe_skip_integration!(); + let integration = GoogleCloudStorage::new(config.service_account, &config.bucket); let mut location = integration.new_path(); location.set_file_name(NON_EXISTENT_NAME); @@ -319,7 +345,7 @@ mod test { }) = err.downcast_ref::() { assert!(matches!(source, cloud_storage::Error::Reqwest(_))); - assert_eq!(bucket, &bucket_name); + assert_eq!(bucket, &config.bucket); assert_eq!(location, NON_EXISTENT_NAME); } else { panic!("unexpected error type") @@ -330,9 +356,10 @@ mod test { #[tokio::test] async fn gcs_test_get_nonexistent_bucket() -> Result<()> { - maybe_skip_integration!(); - let bucket_name = NON_EXISTENT_NAME; - let integration = GoogleCloudStorage::new(bucket_name); + let mut config = maybe_skip_integration!(); + config.bucket = NON_EXISTENT_NAME.into(); + let integration = GoogleCloudStorage::new(config.service_account, &config.bucket); + let mut location = integration.new_path(); location.set_file_name(NON_EXISTENT_NAME); @@ -343,7 +370,7 @@ mod test { if let Some(Error::UnableToStreamListData { source, bucket }) = err.downcast_ref::() { assert!(matches!(source, cloud_storage::Error::Google(_))); - assert_eq!(bucket, bucket_name); + assert_eq!(bucket, &config.bucket); } else { panic!("unexpected error type") } @@ -353,9 +380,8 @@ mod test { #[tokio::test] async fn gcs_test_delete_nonexistent_location() -> Result<()> { - maybe_skip_integration!(); - let bucket_name = bucket_name()?; - let integration = GoogleCloudStorage::new(&bucket_name); + let config = maybe_skip_integration!(); + let integration = GoogleCloudStorage::new(config.service_account, &config.bucket); let mut location = integration.new_path(); location.set_file_name(NON_EXISTENT_NAME); @@ -369,7 +395,7 @@ mod test { } = err { assert!(matches!(source, cloud_storage::Error::Google(_))); - assert_eq!(bucket, bucket_name); + assert_eq!(bucket, config.bucket); assert_eq!(location, NON_EXISTENT_NAME); } else { panic!("unexpected error type") @@ -380,9 +406,9 @@ mod test { #[tokio::test] async fn gcs_test_delete_nonexistent_bucket() -> Result<()> { - maybe_skip_integration!(); - let bucket_name = NON_EXISTENT_NAME; - let integration = GoogleCloudStorage::new(bucket_name); + let mut config = maybe_skip_integration!(); + config.bucket = NON_EXISTENT_NAME.into(); + let integration = GoogleCloudStorage::new(config.service_account, &config.bucket); let mut location = integration.new_path(); location.set_file_name(NON_EXISTENT_NAME); @@ -396,7 +422,7 @@ mod test { } = err { assert!(matches!(source, cloud_storage::Error::Google(_))); - assert_eq!(bucket, bucket_name); + assert_eq!(bucket, config.bucket); assert_eq!(location, NON_EXISTENT_NAME); } else { panic!("unexpected error type") @@ -407,9 +433,10 @@ mod test { #[tokio::test] async fn gcs_test_put_nonexistent_bucket() -> Result<()> { - maybe_skip_integration!(); - let bucket_name = NON_EXISTENT_NAME; - let integration = GoogleCloudStorage::new(bucket_name); + let mut config = maybe_skip_integration!(); + config.bucket = NON_EXISTENT_NAME.into(); + let integration = GoogleCloudStorage::new(config.service_account, &config.bucket); + let mut location = integration.new_path(); location.set_file_name(NON_EXISTENT_NAME); @@ -432,7 +459,7 @@ mod test { } = err { assert!(matches!(source, cloud_storage::Error::Other(_))); - assert_eq!(bucket, bucket_name); + assert_eq!(bucket, config.bucket); assert_eq!(location, NON_EXISTENT_NAME); } else { panic!("unexpected error type"); diff --git a/src/commands/server.rs b/src/commands/server.rs index b1048ded52..33ca6f2d0e 100644 --- a/src/commands/server.rs +++ b/src/commands/server.rs @@ -11,6 +11,10 @@ pub const DEFAULT_API_BIND_ADDR: &str = "127.0.0.1:8080"; /// The default bind address for the gRPC. pub const DEFAULT_GRPC_BIND_ADDR: &str = "127.0.0.1:8082"; +/// The AWS region to use for Amazon S3 based object storage if none is +/// specified. +pub const FALLBACK_AWS_REGION: &str = "us-east-1"; + #[derive(Debug, StructOpt)] #[structopt( name = "server", @@ -102,27 +106,94 @@ Possible values (case insensitive): * memory (default): Effectively no object persistence. * file: Stores objects in the local filesystem. Must also set `--data-dir`. -* s3: Amazon S3. Must also set `--bucket`, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, and - AWS_DEFAULT_REGION. -* google: Google Cloud Storage. Must also set `--bucket` and SERVICE_ACCOUNT. -* azure: Microsoft Azure blob storage. Must also set `--bucket`, AZURE_STORAGE_ACCOUNT, - and AZURE_STORAGE_MASTER_KEY. +* s3: Amazon S3. Must also set `--bucket`, `--aws-access-key-id`, `--aws-secret-access-key`, and + possibly `--aws-default-region`. +* google: Google Cloud Storage. Must also set `--bucket` and `--google-service-account`. +* azure: Microsoft Azure blob storage. Must also set `--bucket`, `--azure-storage-account`, + and `--azure-storage-access-key`. "#, )] pub object_store: Option, /// Name of the bucket to use for the object store. Must also set - /// `--object_store` to a cloud object storage to have any effect. + /// `--object-store` to a cloud object storage to have any effect. /// - /// If using Google Cloud Storage for the object store, this item, as well - /// as SERVICE_ACCOUNT must be set. + /// If using Google Cloud Storage for the object store, this item as well + /// as `--google-service-account` must be set. /// - /// If using S3 for the object store, this item, as well - /// as AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY and AWS_DEFAULT_REGION must - /// be set. + /// If using S3 for the object store, must set this item as well + /// as `--aws-access-key-id` and `--aws-secret-access-key`. Can also set + /// `--aws-default-region` if not using the fallback region. + /// + /// If using Azure for the object store, set this item to the name of a + /// container you've created in the associated storage account, under + /// Blob Service > Containers. Must also set `--azure-storage-account` and + /// `--azure-storage-access-key`. #[structopt(long = "--bucket", env = "INFLUXDB_IOX_BUCKET")] pub bucket: Option, + /// When using Amazon S3 as the object store, set this to an access key that + /// has permission to read from and write to the specified S3 bucket. + /// + /// Must also set `--object-store=s3`, `--bucket`, and + /// `--aws-secret-access-key`. Can also set `--aws-default-region` if not + /// using the fallback region. + /// + /// Prefer the environment variable over the command line flag in shared + /// environments. + #[structopt(long = "--aws-access-key-id", env = "AWS_ACCESS_KEY_ID")] + pub aws_access_key_id: Option, + + /// When using Amazon S3 as the object store, set this to the secret access + /// key that goes with the specified access key ID. + /// + /// Must also set `--object-store=s3`, `--bucket`, `--aws-access-key-id`. + /// Can also set `--aws-default-region` if not using the fallback region. + /// + /// Prefer the environment variable over the command line flag in shared + /// environments. + #[structopt(long = "--aws-secret-access-key", env = "AWS_SECRET_ACCESS_KEY")] + pub aws_secret_access_key: Option, + + /// When using Amazon S3 as the object store, set this to the region + /// that goes with the specified bucket if different from the fallback + /// value. + /// + /// Must also set `--object-store=s3`, `--bucket`, `--aws-access-key-id`, + /// and `--aws-secret-access-key`. + #[structopt( + long = "--aws-default-region", + env = "AWS_DEFAULT_REGION", + default_value = FALLBACK_AWS_REGION, + )] + pub aws_default_region: String, + + /// When using Google Cloud Storage as the object store, set this to the + /// path to the JSON file that contains the Google credentials. + /// + /// Must also set `--object-store=google` and `--bucket`. + #[structopt(long = "--google-service-account", env = "GOOGLE_SERVICE_ACCOUNT")] + pub google_service_account: Option, + + /// When using Microsoft Azure as the object store, set this to the + /// name you see when going to All Services > Storage accounts > [name]. + /// + /// Must also set `--object-store=azure`, `--bucket`, and + /// `--azure-storage-access-key`. + #[structopt(long = "--azure-storage-account", env = "AZURE_STORAGE_ACCOUNT")] + pub azure_storage_account: Option, + + /// When using Microsoft Azure as the object store, set this to one of the + /// Key values in the Storage account's Settings > Access keys. + /// + /// Must also set `--object-store=azure`, `--bucket`, and + /// `--azure-storage-account`. + /// + /// Prefer the environment variable over the command line flag in shared + /// environments. + #[structopt(long = "--azure-storage-access-key", env = "AZURE_STORAGE_ACCESS_KEY")] + pub azure_storage_access_key: Option, + /// If set, Jaeger traces are emitted to this host /// using the OpenTelemetry tracer. /// @@ -151,13 +222,13 @@ Possible values (case insensitive): /// - user set environment variables /// - .env file contents /// - pre-configured default values -pub fn load_config() -> Config { +pub fn load_config() -> Box { // Load the Config struct - this pulls in any envs set by the user or // sourced above, and applies any defaults. // //let args = std::env::args().filter(|arg| arg != "server"); - Config::from_iter(strip_server(std::env::args()).iter()) + Box::new(Config::from_iter(strip_server(std::env::args()).iter())) } fn parse_socket_addr(s: &str) -> std::io::Result { diff --git a/src/influxdb_ioxd.rs b/src/influxdb_ioxd.rs index 76fdf501a5..dae3b6fedc 100644 --- a/src/influxdb_ioxd.rs +++ b/src/influxdb_ioxd.rs @@ -3,7 +3,9 @@ use crate::commands::{ server::{load_config, Config, ObjectStore as ObjStoreOpt}, }; use hyper::Server; -use object_store::{self, aws::AmazonS3, gcp::GoogleCloudStorage, ObjectStore}; +use object_store::{ + self, aws::AmazonS3, azure::MicrosoftAzure, gcp::GoogleCloudStorage, ObjectStore, +}; use panic_logging::SendPanicsToTracing; use server::{ConnectionManagerImpl as ConnectionManager, Server as AppServer}; use snafu::{ResultExt, Snafu}; @@ -21,18 +23,6 @@ pub enum Error { source: std::io::Error, }, - #[snafu(display("Unable to initialize database in directory {:?}: {}", db_dir, source))] - InitializingMutableBuffer { - db_dir: PathBuf, - source: Box, - }, - - #[snafu(display("Unable to restore WAL from directory {:?}: {}", dir, source))] - RestoringMutableBuffer { - dir: PathBuf, - source: Box, - }, - #[snafu(display( "Unable to bind to listen for HTTP requests on {}: {}", bind_addr, @@ -59,11 +49,21 @@ pub enum Error { #[snafu(display("Error serving RPC: {}", source))] ServingRPC { source: self::rpc::Error }, - #[snafu(display("Specifed {} for the object store, but not a bucket", object_store))] - InvalidCloudObjectStoreConfiguration { object_store: ObjStoreOpt }, + #[snafu(display( + "Specified {} for the object store, required configuration missing for {}", + object_store, + missing + ))] + MissingObjectStoreConfig { + object_store: ObjStoreOpt, + missing: String, + }, - #[snafu(display("Specified file for the object store, but not a database directory"))] - InvalidFileObjectStoreConfiguration, + // Creating a new S3 object store can fail if the region is *specified* but + // not *parseable* as a rusoto `Region`. The other object store constructors + // don't return `Result`. + #[snafu(display("Amazon S3 configuration was invalid: {}", source))] + InvalidS3Config { source: object_store::aws::Error }, } pub type Result = std::result::Result; @@ -73,7 +73,7 @@ pub type Result = std::result::Result; /// /// The logging_level passed in is the global setting (e.g. if -v or /// -vv was passed in before 'server') -pub async fn main(logging_level: LoggingLevel, config: Option) -> Result<()> { +pub async fn main(logging_level: LoggingLevel, config: Option>) -> Result<()> { // load config from environment if no command line let config = config.unwrap_or_else(load_config); @@ -101,7 +101,7 @@ pub async fn main(logging_level: LoggingLevel, config: Option) -> Result } } - let object_store = ObjectStore::try_from(&config)?; + let object_store = ObjectStore::try_from(&*config)?; let object_storage = Arc::new(object_store); let connection_manager = ConnectionManager {}; @@ -165,40 +165,101 @@ impl TryFrom<&Config> for ObjectStore { Ok(Self::new_in_memory(object_store::memory::InMemory::new())) } - Some(ObjStoreOpt::Google) => match config.bucket.as_ref() { - Some(bucket) => Ok(Self::new_google_cloud_storage(GoogleCloudStorage::new( - bucket, - ))), - None => InvalidCloudObjectStoreConfiguration { - object_store: ObjStoreOpt::Google, - } - .fail(), - }, + Some(ObjStoreOpt::Google) => { + match ( + config.bucket.as_ref(), + config.google_service_account.as_ref(), + ) { + (Some(bucket), Some(service_account)) => Ok(Self::new_google_cloud_storage( + GoogleCloudStorage::new(service_account, bucket), + )), + (bucket, service_account) => { + let mut missing_args = vec![]; - Some(ObjStoreOpt::S3) => { - match config.bucket.as_ref() { - Some(bucket) => { - // rusoto::Region's default takes the value from the AWS_DEFAULT_REGION env - // var. - Ok(Self::new_amazon_s3(AmazonS3::new( - Default::default(), - bucket, - ))) + if bucket.is_none() { + missing_args.push("bucket"); + } + if service_account.is_none() { + missing_args.push("google-service-account"); + } + MissingObjectStoreConfig { + object_store: ObjStoreOpt::Google, + missing: missing_args.join(", "), + } + .fail() } - None => InvalidCloudObjectStoreConfiguration { - object_store: ObjStoreOpt::S3, - } - .fail(), } } - Some(ObjStoreOpt::Azure) => match config.bucket.as_ref() { - Some(_bucket) => unimplemented!(), - None => InvalidCloudObjectStoreConfiguration { - object_store: ObjStoreOpt::Azure, + Some(ObjStoreOpt::S3) => { + match ( + config.bucket.as_ref(), + config.aws_access_key_id.as_ref(), + config.aws_secret_access_key.as_ref(), + config.aws_default_region.as_str(), + ) { + (Some(bucket), Some(key_id), Some(secret_key), region) => { + Ok(Self::new_amazon_s3( + AmazonS3::new(key_id, secret_key, region, bucket) + .context(InvalidS3Config)?, + )) + } + (bucket, key_id, secret_key, _) => { + let mut missing_args = vec![]; + + if bucket.is_none() { + missing_args.push("bucket"); + } + if key_id.is_none() { + missing_args.push("aws-access-key-id"); + } + if secret_key.is_none() { + missing_args.push("aws-secret-access-key"); + } + + MissingObjectStoreConfig { + object_store: ObjStoreOpt::S3, + missing: missing_args.join(", "), + } + .fail() + } } - .fail(), - }, + } + + Some(ObjStoreOpt::Azure) => { + match ( + config.bucket.as_ref(), + config.azure_storage_account.as_ref(), + config.azure_storage_access_key.as_ref(), + ) { + (Some(bucket), Some(storage_account), Some(access_key)) => { + Ok(Self::new_microsoft_azure(MicrosoftAzure::new( + storage_account, + access_key, + bucket, + ))) + } + (bucket, storage_account, access_key) => { + let mut missing_args = vec![]; + + if bucket.is_none() { + missing_args.push("bucket"); + } + if storage_account.is_none() { + missing_args.push("azure-storage-account"); + } + if access_key.is_none() { + missing_args.push("azure-storage-access-key"); + } + + MissingObjectStoreConfig { + object_store: ObjStoreOpt::Azure, + missing: missing_args.join(", "), + } + .fail() + } + } + } Some(ObjStoreOpt::File) => match config.database_directory.as_ref() { Some(db_dir) => { @@ -206,7 +267,11 @@ impl TryFrom<&Config> for ObjectStore { .context(CreatingDatabaseDirectory { path: db_dir })?; Ok(Self::new_file(object_store::disk::File::new(&db_dir))) } - None => InvalidFileObjectStoreConfiguration.fail(), + None => MissingObjectStoreConfig { + object_store: ObjStoreOpt::File, + missing: "data-dir", + } + .fail(), }, } } @@ -217,6 +282,7 @@ mod tests { use super::*; use object_store::ObjectStoreIntegration; use structopt::StructOpt; + use tempfile::TempDir; #[test] fn default_object_store_is_memory() { @@ -241,4 +307,144 @@ mod tests { ObjectStore(ObjectStoreIntegration::InMemory(_)) )); } + + #[test] + fn valid_s3_config() { + let config = Config::from_iter_safe(&[ + "server", + "--object-store", + "s3", + "--bucket", + "mybucket", + "--aws-access-key-id", + "NotARealAWSAccessKey", + "--aws-secret-access-key", + "NotARealAWSSecretAccessKey", + ]) + .unwrap(); + + let object_store = ObjectStore::try_from(&config).unwrap(); + + assert!(matches!( + object_store, + ObjectStore(ObjectStoreIntegration::AmazonS3(_)) + )); + } + + #[test] + fn s3_config_missing_params() { + let config = Config::from_iter_safe(&["server", "--object-store", "s3"]).unwrap(); + + let err = ObjectStore::try_from(&config).unwrap_err().to_string(); + + assert_eq!( + err, + "Specified S3 for the object store, required configuration missing for \ + bucket, aws-access-key-id, aws-secret-access-key" + ); + } + + #[test] + fn valid_google_config() { + let config = Config::from_iter_safe(&[ + "server", + "--object-store", + "google", + "--bucket", + "mybucket", + "--google-service-account", + "~/Not/A/Real/path.json", + ]) + .unwrap(); + + let object_store = ObjectStore::try_from(&config).unwrap(); + + assert!(matches!( + object_store, + ObjectStore(ObjectStoreIntegration::GoogleCloudStorage(_)) + )); + } + + #[test] + fn google_config_missing_params() { + let config = Config::from_iter_safe(&["server", "--object-store", "google"]).unwrap(); + + let err = ObjectStore::try_from(&config).unwrap_err().to_string(); + + assert_eq!( + err, + "Specified Google for the object store, required configuration missing for \ + bucket, google-service-account" + ); + } + + #[test] + fn valid_azure_config() { + let config = Config::from_iter_safe(&[ + "server", + "--object-store", + "azure", + "--bucket", + "mybucket", + "--azure-storage-account", + "NotARealStorageAccount", + "--azure-storage-access-key", + "NotARealKey", + ]) + .unwrap(); + + let object_store = ObjectStore::try_from(&config).unwrap(); + + assert!(matches!( + object_store, + ObjectStore(ObjectStoreIntegration::MicrosoftAzure(_)) + )); + } + + #[test] + fn azure_config_missing_params() { + let config = Config::from_iter_safe(&["server", "--object-store", "azure"]).unwrap(); + + let err = ObjectStore::try_from(&config).unwrap_err().to_string(); + + assert_eq!( + err, + "Specified Azure for the object store, required configuration missing for \ + bucket, azure-storage-account, azure-storage-access-key" + ); + } + + #[test] + fn valid_file_config() { + let root = TempDir::new().unwrap(); + + let config = Config::from_iter_safe(&[ + "server", + "--object-store", + "file", + "--data-dir", + root.path().to_str().unwrap(), + ]) + .unwrap(); + + let object_store = ObjectStore::try_from(&config).unwrap(); + + assert!(matches!( + object_store, + ObjectStore(ObjectStoreIntegration::File(_)) + )); + } + + #[test] + fn file_config_missing_params() { + let config = Config::from_iter_safe(&["server", "--object-store", "file"]).unwrap(); + + let err = ObjectStore::try_from(&config).unwrap_err().to_string(); + + assert_eq!( + err, + "Specified File for the object store, required configuration missing for \ + data-dir" + ); + } } diff --git a/src/main.rs b/src/main.rs index 89a4d9e827..d2064e3ce1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -110,7 +110,8 @@ enum Command { }, Database(commands::database::Config), Stats(commands::stats::Config), - Server(commands::server::Config), + // Clippy recommended boxing this variant because it's much larger than the others + Server(Box), Writer(commands::writer::Config), }