Merge branch 'main' into jgm-docker-readme

pull/24376/head
Jacob Marble 2021-03-04 08:35:02 -08:00 committed by GitHub
commit 454786c33c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 617 additions and 240 deletions

View File

@ -46,7 +46,7 @@
# The name of a container you've created in the storage account, under Blob Service > Containers
# INFLUXDB_IOX_BUCKET=
# In the Storage account's Settings > Access keys, one of the Key values
# AZURE_STORAGE_MASTER_KEY=
# AZURE_STORAGE_ACCESS_KEY=
#
# To enable Jaeger tracing:
# OTEL_SERVICE_NAME="iox" # defaults to iox

View File

@ -13,7 +13,7 @@ use futures::{
Stream, StreamExt, TryStreamExt,
};
use rusoto_core::ByteStream;
use rusoto_credential::ChainProvider;
use rusoto_credential::StaticProvider;
use rusoto_s3::S3;
use snafu::{futures::TryStreamExt as _, OptionExt, ResultExt, Snafu};
use std::convert::TryFrom;
@ -98,6 +98,16 @@ pub enum Error {
#[snafu(display("Unable to buffer data into temporary file, Error: {}", source))]
UnableToBufferStream { source: std::io::Error },
#[snafu(display(
"Could not parse `{}` as an AWS region. Regions should look like `us-east-2`. {:?}",
region,
source
))]
InvalidRegion {
region: String,
source: rusoto_core::region::ParseRegionError,
},
}
/// Configuration for connecting to [Amazon S3](https://aws.amazon.com/s3/).
@ -272,27 +282,27 @@ impl ObjectStoreApi for AmazonS3 {
}
impl AmazonS3 {
/// Configure a connection to Amazon S3 in the specified Amazon region and
/// bucket. Uses [`rusoto_credential::ChainProvider`][cp] to check for
/// credentials in:
///
/// 1. Environment variables: `AWS_ACCESS_KEY_ID` and
/// `AWS_SECRET_ACCESS_KEY`
/// 2. `credential_process` command in the AWS config file, usually located
/// at `~/.aws/config`.
/// 3. AWS credentials file. Usually located at `~/.aws/credentials`.
/// 4. IAM instance profile. Will only work if running on an EC2 instance
/// with an instance profile/role.
///
/// [cp]: https://docs.rs/rusoto_credential/0.43.0/rusoto_credential/struct.ChainProvider.html
pub fn new(region: rusoto_core::Region, bucket_name: impl Into<String>) -> Self {
/// Configure a connection to Amazon S3 using the specified credentials in
/// the specified Amazon region and bucket
pub fn new(
access_key_id: impl Into<String>,
secret_access_key: impl Into<String>,
region: impl Into<String>,
bucket_name: impl Into<String>,
) -> Result<Self> {
let region = region.into();
let region: rusoto_core::Region = region.parse().context(InvalidRegion { region })?;
let http_client = rusoto_core::request::HttpClient::new()
.expect("Current implementation of rusoto_core has no way for this to fail");
let credentials_provider = ChainProvider::new();
Self {
let credentials_provider =
StaticProvider::new_minimal(access_key_id.into(), secret_access_key.into());
Ok(Self {
client: rusoto_s3::S3Client::new_with(http_client, credentials_provider, region),
bucket_name: bucket_name.into(),
}
})
}
/// List objects with the given prefix and a set delimiter of `/`. Returns
@ -412,64 +422,63 @@ mod tests {
const NON_EXISTENT_NAME: &str = "nonexistentname";
#[derive(Debug)]
struct AwsConfig {
access_key_id: String,
secret_access_key: String,
region: String,
bucket: String,
}
// Helper macro to skip tests if the AWS environment variables are not set.
// Skips become hard errors if TEST_INTEGRATION is set.
macro_rules! maybe_skip_integration {
() => {
() => {{
dotenv::dotenv().ok();
let region = env::var("AWS_DEFAULT_REGION");
let bucket_name = env::var("INFLUXDB_IOX_BUCKET");
let force = std::env::var("TEST_INTEGRATION");
let required_vars = [
"AWS_DEFAULT_REGION",
"INFLUXDB_IOX_BUCKET",
"AWS_ACCESS_KEY_ID",
"AWS_SECRET_ACCESS_KEY",
];
let unset_vars: Vec<_> = required_vars
.iter()
.filter_map(|&name| match env::var(name) {
Ok(_) => None,
Err(_) => Some(name),
})
.collect();
let unset_var_names = unset_vars.join(", ");
match (region.is_ok(), bucket_name.is_ok(), force.is_ok()) {
(false, false, true) => {
panic!(
"TEST_INTEGRATION is set, \
but AWS_DEFAULT_REGION and INFLUXDB_IOX_BUCKET are not"
)
let force = env::var("TEST_INTEGRATION");
if force.is_ok() && !unset_var_names.is_empty() {
panic!(
"TEST_INTEGRATION is set, \
but variable(s) {} need to be set",
unset_var_names
)
} else if force.is_err() && !unset_var_names.is_empty() {
eprintln!(
"skipping AWS integration test - set \
{} to run",
unset_var_names
);
return Ok(());
} else {
AwsConfig {
access_key_id: env::var("AWS_ACCESS_KEY_ID")
.expect("already checked AWS_ACCESS_KEY_ID"),
secret_access_key: env::var("AWS_SECRET_ACCESS_KEY")
.expect("already checked AWS_SECRET_ACCESS_KEY"),
region: env::var("AWS_DEFAULT_REGION")
.expect("already checked AWS_DEFAULT_REGION"),
bucket: env::var("INFLUXDB_IOX_BUCKET")
.expect("already checked INFLUXDB_IOX_BUCKET"),
}
(false, true, true) => {
panic!("TEST_INTEGRATION is set, but AWS_DEFAULT_REGION is not")
}
(true, false, true) => {
panic!("TEST_INTEGRATION is set, but INFLUXDB_IOX_BUCKET is not")
}
(false, false, false) => {
eprintln!(
"skipping integration test - set \
AWS_DEFAULT_REGION and INFLUXDB_IOX_BUCKET to run"
);
return Ok(());
}
(false, true, false) => {
eprintln!("skipping integration test - set AWS_DEFAULT_REGION to run");
return Ok(());
}
(true, false, false) => {
eprintln!("skipping integration test - set INFLUXDB_IOX_BUCKET to run");
return Ok(());
}
_ => {}
}
};
}
// Helper to get region and bucket from environment variables. Call the
// `maybe_skip_integration!` macro before calling this to skip the test if these
// aren't set; if you don't call that macro, the tests will fail if
// these env vars aren't set.
//
// `AWS_DEFAULT_REGION` should be a value like `us-east-2`.
fn region_and_bucket_name() -> Result<(rusoto_core::Region, String)> {
let region = env::var("AWS_DEFAULT_REGION").map_err(|_| {
"The environment variable AWS_DEFAULT_REGION must be set \
to a value like `us-east-2`"
})?;
let bucket_name = env::var("INFLUXDB_IOX_BUCKET")
.map_err(|_| "The environment variable INFLUXDB_IOX_BUCKET must be set")?;
Ok((region.parse()?, bucket_name))
}};
}
fn check_credentials<T>(r: Result<T>) -> Result<T> {
@ -490,12 +499,16 @@ mod tests {
#[tokio::test]
async fn s3_test() -> Result<()> {
maybe_skip_integration!();
let (region, bucket_name) = region_and_bucket_name()?;
let config = maybe_skip_integration!();
let integration = AmazonS3::new(
config.access_key_id,
config.secret_access_key,
config.region,
config.bucket,
)
.expect("Valid S3 config");
let integration = AmazonS3::new(region, &bucket_name);
check_credentials(put_get_delete_list(&integration).await)?;
check_credentials(list_with_delimiter(&integration).await).unwrap();
Ok(())
@ -503,11 +516,18 @@ mod tests {
#[tokio::test]
async fn s3_test_get_nonexistent_region() -> Result<()> {
maybe_skip_integration!();
let mut config = maybe_skip_integration!();
// Assumes environment variables do not provide credentials to AWS US West 1
let (_, bucket_name) = region_and_bucket_name()?;
let region = rusoto_core::Region::UsWest1;
let integration = AmazonS3::new(region, &bucket_name);
config.region = "us-west-1".into();
let integration = AmazonS3::new(
config.access_key_id,
config.secret_access_key,
config.region,
&config.bucket,
)
.expect("Valid S3 config");
let mut location = integration.new_path();
location.set_file_name(NON_EXISTENT_NAME);
@ -516,7 +536,7 @@ mod tests {
.unwrap_err();
if let Some(Error::UnableToListData { source, bucket }) = err.downcast_ref::<Error>() {
assert!(matches!(source, rusoto_core::RusotoError::Unknown(_)));
assert_eq!(bucket, &bucket_name);
assert_eq!(bucket, &config.bucket);
} else {
panic!("unexpected error type")
}
@ -526,9 +546,15 @@ mod tests {
#[tokio::test]
async fn s3_test_get_nonexistent_location() -> Result<()> {
maybe_skip_integration!();
let (region, bucket_name) = region_and_bucket_name()?;
let integration = AmazonS3::new(region, &bucket_name);
let config = maybe_skip_integration!();
let integration = AmazonS3::new(
config.access_key_id,
config.secret_access_key,
config.region,
&config.bucket,
)
.expect("Valid S3 config");
let mut location = integration.new_path();
location.set_file_name(NON_EXISTENT_NAME);
@ -545,7 +571,7 @@ mod tests {
source,
rusoto_core::RusotoError::Service(rusoto_s3::GetObjectError::NoSuchKey(_))
));
assert_eq!(bucket, &bucket_name);
assert_eq!(bucket, &config.bucket);
assert_eq!(location, NON_EXISTENT_NAME);
} else {
panic!("unexpected error type")
@ -556,10 +582,17 @@ mod tests {
#[tokio::test]
async fn s3_test_get_nonexistent_bucket() -> Result<()> {
maybe_skip_integration!();
let (region, _) = region_and_bucket_name()?;
let bucket_name = NON_EXISTENT_NAME;
let integration = AmazonS3::new(region, bucket_name);
let mut config = maybe_skip_integration!();
config.bucket = NON_EXISTENT_NAME.into();
let integration = AmazonS3::new(
config.access_key_id,
config.secret_access_key,
config.region,
&config.bucket,
)
.expect("Valid S3 config");
let mut location = integration.new_path();
location.set_file_name(NON_EXISTENT_NAME);
@ -571,7 +604,7 @@ mod tests {
source,
rusoto_core::RusotoError::Service(rusoto_s3::ListObjectsV2Error::NoSuchBucket(_))
));
assert_eq!(bucket, bucket_name);
assert_eq!(bucket, &config.bucket);
} else {
panic!("unexpected error type")
}
@ -581,11 +614,18 @@ mod tests {
#[tokio::test]
async fn s3_test_put_nonexistent_region() -> Result<()> {
maybe_skip_integration!();
let mut config = maybe_skip_integration!();
// Assumes environment variables do not provide credentials to AWS US West 1
let (_, bucket_name) = region_and_bucket_name()?;
let region = rusoto_core::Region::UsWest1;
let integration = AmazonS3::new(region, &bucket_name);
config.region = "us-west-1".into();
let integration = AmazonS3::new(
config.access_key_id,
config.secret_access_key,
config.region,
&config.bucket,
)
.expect("Valid S3 config");
let mut location = integration.new_path();
location.set_file_name(NON_EXISTENT_NAME);
let data = Bytes::from("arbitrary data");
@ -607,7 +647,7 @@ mod tests {
} = err
{
assert!(matches!(source, rusoto_core::RusotoError::Unknown(_)));
assert_eq!(bucket, bucket_name);
assert_eq!(bucket, config.bucket);
assert_eq!(location, NON_EXISTENT_NAME);
} else {
panic!("unexpected error type")
@ -618,10 +658,17 @@ mod tests {
#[tokio::test]
async fn s3_test_put_nonexistent_bucket() -> Result<()> {
maybe_skip_integration!();
let (region, _) = region_and_bucket_name()?;
let bucket_name = NON_EXISTENT_NAME;
let integration = AmazonS3::new(region, bucket_name);
let mut config = maybe_skip_integration!();
config.bucket = NON_EXISTENT_NAME.into();
let integration = AmazonS3::new(
config.access_key_id,
config.secret_access_key,
config.region,
&config.bucket,
)
.expect("Valid S3 config");
let mut location = integration.new_path();
location.set_file_name(NON_EXISTENT_NAME);
let data = Bytes::from("arbitrary data");
@ -643,7 +690,7 @@ mod tests {
} = err
{
assert!(matches!(source, rusoto_core::RusotoError::Unknown(_)));
assert_eq!(bucket, bucket_name);
assert_eq!(bucket, config.bucket);
assert_eq!(location, NON_EXISTENT_NAME);
} else {
panic!("unexpected error type")
@ -654,9 +701,15 @@ mod tests {
#[tokio::test]
async fn s3_test_delete_nonexistent_location() -> Result<()> {
maybe_skip_integration!();
let (region, bucket_name) = region_and_bucket_name()?;
let integration = AmazonS3::new(region, &bucket_name);
let config = maybe_skip_integration!();
let integration = AmazonS3::new(
config.access_key_id,
config.secret_access_key,
config.region,
config.bucket,
)
.expect("Valid S3 config");
let mut location = integration.new_path();
location.set_file_name(NON_EXISTENT_NAME);
@ -669,11 +722,18 @@ mod tests {
#[tokio::test]
async fn s3_test_delete_nonexistent_region() -> Result<()> {
maybe_skip_integration!();
let mut config = maybe_skip_integration!();
// Assumes environment variables do not provide credentials to AWS US West 1
let (_, bucket_name) = region_and_bucket_name()?;
let region = rusoto_core::Region::UsWest1;
let integration = AmazonS3::new(region, &bucket_name);
config.region = "us-west-1".into();
let integration = AmazonS3::new(
config.access_key_id,
config.secret_access_key,
config.region,
&config.bucket,
)
.expect("Valid S3 config");
let mut location = integration.new_path();
location.set_file_name(NON_EXISTENT_NAME);
@ -685,7 +745,7 @@ mod tests {
} = err
{
assert!(matches!(source, rusoto_core::RusotoError::Unknown(_)));
assert_eq!(bucket, bucket_name);
assert_eq!(bucket, config.bucket);
assert_eq!(location, NON_EXISTENT_NAME);
} else {
panic!("unexpected error type")
@ -696,10 +756,17 @@ mod tests {
#[tokio::test]
async fn s3_test_delete_nonexistent_bucket() -> Result<()> {
maybe_skip_integration!();
let (region, _) = region_and_bucket_name()?;
let bucket_name = NON_EXISTENT_NAME;
let integration = AmazonS3::new(region, bucket_name);
let mut config = maybe_skip_integration!();
config.bucket = NON_EXISTENT_NAME.into();
let integration = AmazonS3::new(
config.access_key_id,
config.secret_access_key,
config.region,
&config.bucket,
)
.expect("Valid S3 config");
let mut location = integration.new_path();
location.set_file_name(NON_EXISTENT_NAME);
@ -711,7 +778,7 @@ mod tests {
} = err
{
assert!(matches!(source, rusoto_core::RusotoError::Unknown(_)));
assert_eq!(bucket, bucket_name);
assert_eq!(bucket, config.bucket);
assert_eq!(location, NON_EXISTENT_NAME);
} else {
panic!("unexpected error type")

View File

@ -245,14 +245,20 @@ impl MicrosoftAzure {
/// Configure a connection to container with given name on Microsoft Azure
/// Blob store.
///
/// The credentials `account` and `master_key` must provide access to the
/// The credentials `account` and `access_key` must provide access to the
/// store.
pub fn new(account: String, master_key: String, container_name: impl Into<String>) -> Self {
pub fn new(
account: impl Into<String>,
access_key: impl Into<String>,
container_name: impl Into<String>,
) -> Self {
let account = account.into();
let access_key = access_key.into();
// From https://github.com/Azure/azure-sdk-for-rust/blob/master/sdk/storage/examples/blob_00.rs#L29
let http_client: Arc<Box<dyn HttpClient>> = Arc::new(Box::new(reqwest::Client::new()));
let storage_account_client =
StorageAccountClient::new_access_key(Arc::clone(&http_client), &account, &master_key);
StorageAccountClient::new_access_key(Arc::clone(&http_client), &account, &access_key);
let storage_client = storage_account_client.as_storage_client();
@ -265,21 +271,6 @@ impl MicrosoftAzure {
container_name,
}
}
/// Configure a connection to container with given name on Microsoft Azure
/// Blob store.
///
/// The credentials `account` and `master_key` must be set via the
/// environment variables `AZURE_STORAGE_ACCOUNT` and
/// `AZURE_STORAGE_MASTER_KEY` respectively.
pub fn new_from_env(container_name: impl Into<String>) -> Self {
let account = std::env::var("AZURE_STORAGE_ACCOUNT")
.expect("Set env variable AZURE_STORAGE_ACCOUNT first!");
let master_key = std::env::var("AZURE_STORAGE_MASTER_KEY")
.expect("Set env variable AZURE_STORAGE_MASTER_KEY first!");
Self::new(account, master_key, container_name)
}
}
#[cfg(test)]
@ -291,16 +282,23 @@ mod tests {
type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
type Result<T, E = Error> = std::result::Result<T, E>;
// Helper macro to skip tests if the GCP environment variables are not set.
#[derive(Debug)]
struct AzureConfig {
storage_account: String,
access_key: String,
bucket: String,
}
// Helper macro to skip tests if the Azure environment variables are not set.
// Skips become hard errors if TEST_INTEGRATION is set.
macro_rules! maybe_skip_integration {
() => {
() => {{
dotenv::dotenv().ok();
let required_vars = [
"AZURE_STORAGE_ACCOUNT",
"INFLUXDB_IOX_BUCKET",
"AZURE_STORAGE_MASTER_KEY",
"AZURE_STORAGE_ACCESS_KEY",
];
let unset_vars: Vec<_> = required_vars
.iter()
@ -326,17 +324,24 @@ mod tests {
unset_var_names
);
return Ok(());
} else {
AzureConfig {
storage_account: env::var("AZURE_STORAGE_ACCOUNT")
.expect("already checked AZURE_STORAGE_ACCOUNT"),
access_key: env::var("AZURE_STORAGE_ACCESS_KEY")
.expect("already checked AZURE_STORAGE_ACCESS_KEY"),
bucket: env::var("INFLUXDB_IOX_BUCKET")
.expect("already checked INFLUXDB_IOX_BUCKET"),
}
}
};
}};
}
#[tokio::test]
async fn azure_blob_test() -> Result<()> {
maybe_skip_integration!();
let container_name = env::var("INFLUXDB_IOX_BUCKET")
.map_err(|_| "The environment variable INFLUXDB_IOX_BUCKET must be set")?;
let integration = MicrosoftAzure::new_from_env(container_name);
let config = maybe_skip_integration!();
let integration =
MicrosoftAzure::new(config.storage_account, config.access_key, config.bucket);
put_get_delete_list(&integration).await?;
list_with_delimiter(&integration).await?;

View File

@ -8,7 +8,7 @@ use async_trait::async_trait;
use bytes::Bytes;
use futures::{stream::BoxStream, Stream, StreamExt, TryStreamExt};
use snafu::{ensure, futures::TryStreamExt as _, ResultExt, Snafu};
use std::{convert::TryFrom, io};
use std::{convert::TryFrom, env, io};
/// A specialized `Result` for Google Cloud Storage object store-related errors
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -239,7 +239,14 @@ impl ObjectStoreApi for GoogleCloudStorage {
impl GoogleCloudStorage {
/// Configure a connection to Google Cloud Storage.
pub fn new(bucket_name: impl Into<String>) -> Self {
pub fn new(
service_account_path: impl AsRef<std::ffi::OsStr>,
bucket_name: impl Into<String>,
) -> Self {
// The cloud storage crate currently only supports authentication via
// environment variables. Set the environment variable explicitly so
// that we can optionally accept command line arguments instead.
env::set_var("SERVICE_ACCOUNT", service_account_path);
Self {
bucket_name: bucket_name.into(),
}
@ -261,39 +268,59 @@ mod test {
const NON_EXISTENT_NAME: &str = "nonexistentname";
#[derive(Debug)]
struct GoogleCloudConfig {
bucket: String,
service_account: String,
}
// Helper macro to skip tests if the GCP environment variables are not set.
// Skips become hard errors if TEST_INTEGRATION is set.
macro_rules! maybe_skip_integration {
() => {
() => {{
dotenv::dotenv().ok();
let bucket_name = env::var("INFLUXDB_IOX_BUCKET");
let required_vars = ["INFLUXDB_IOX_BUCKET", "GOOGLE_SERVICE_ACCOUNT"];
let unset_vars: Vec<_> = required_vars
.iter()
.filter_map(|&name| match env::var(name) {
Ok(_) => None,
Err(_) => Some(name),
})
.collect();
let unset_var_names = unset_vars.join(", ");
let force = std::env::var("TEST_INTEGRATION");
match (bucket_name.is_ok(), force.is_ok()) {
(false, true) => {
panic!("TEST_INTEGRATION is set, but INFLUXDB_IOX_BUCKET is not")
if force.is_ok() && !unset_var_names.is_empty() {
panic!(
"TEST_INTEGRATION is set, \
but variable(s) {} need to be set",
unset_var_names
)
} else if force.is_err() && !unset_var_names.is_empty() {
eprintln!(
"skipping Google Cloud integration test - set \
{} to run",
unset_var_names
);
return Ok(());
} else {
GoogleCloudConfig {
bucket: env::var("INFLUXDB_IOX_BUCKET")
.expect("already checked INFLUXDB_IOX_BUCKET"),
service_account: env::var("GOOGLE_SERVICE_ACCOUNT")
.expect("already checked GOOGLE_SERVICE_ACCOUNT"),
}
(false, false) => {
eprintln!("skipping integration test - set INFLUXDB_IOX_BUCKET to run");
return Ok(());
}
_ => {}
}
};
}
fn bucket_name() -> Result<String> {
Ok(env::var("INFLUXDB_IOX_BUCKET")
.map_err(|_| "The environment variable INFLUXDB_IOX_BUCKET must be set")?)
}};
}
#[tokio::test]
async fn gcs_test() -> Result<()> {
maybe_skip_integration!();
let bucket_name = bucket_name()?;
let config = maybe_skip_integration!();
let integration = GoogleCloudStorage::new(config.service_account, config.bucket);
let integration = GoogleCloudStorage::new(&bucket_name);
put_get_delete_list(&integration).await?;
list_with_delimiter(&integration).await?;
Ok(())
@ -301,9 +328,8 @@ mod test {
#[tokio::test]
async fn gcs_test_get_nonexistent_location() -> Result<()> {
maybe_skip_integration!();
let bucket_name = bucket_name()?;
let integration = GoogleCloudStorage::new(&bucket_name);
let config = maybe_skip_integration!();
let integration = GoogleCloudStorage::new(config.service_account, &config.bucket);
let mut location = integration.new_path();
location.set_file_name(NON_EXISTENT_NAME);
@ -319,7 +345,7 @@ mod test {
}) = err.downcast_ref::<Error>()
{
assert!(matches!(source, cloud_storage::Error::Reqwest(_)));
assert_eq!(bucket, &bucket_name);
assert_eq!(bucket, &config.bucket);
assert_eq!(location, NON_EXISTENT_NAME);
} else {
panic!("unexpected error type")
@ -330,9 +356,10 @@ mod test {
#[tokio::test]
async fn gcs_test_get_nonexistent_bucket() -> Result<()> {
maybe_skip_integration!();
let bucket_name = NON_EXISTENT_NAME;
let integration = GoogleCloudStorage::new(bucket_name);
let mut config = maybe_skip_integration!();
config.bucket = NON_EXISTENT_NAME.into();
let integration = GoogleCloudStorage::new(config.service_account, &config.bucket);
let mut location = integration.new_path();
location.set_file_name(NON_EXISTENT_NAME);
@ -343,7 +370,7 @@ mod test {
if let Some(Error::UnableToStreamListData { source, bucket }) = err.downcast_ref::<Error>()
{
assert!(matches!(source, cloud_storage::Error::Google(_)));
assert_eq!(bucket, bucket_name);
assert_eq!(bucket, &config.bucket);
} else {
panic!("unexpected error type")
}
@ -353,9 +380,8 @@ mod test {
#[tokio::test]
async fn gcs_test_delete_nonexistent_location() -> Result<()> {
maybe_skip_integration!();
let bucket_name = bucket_name()?;
let integration = GoogleCloudStorage::new(&bucket_name);
let config = maybe_skip_integration!();
let integration = GoogleCloudStorage::new(config.service_account, &config.bucket);
let mut location = integration.new_path();
location.set_file_name(NON_EXISTENT_NAME);
@ -369,7 +395,7 @@ mod test {
} = err
{
assert!(matches!(source, cloud_storage::Error::Google(_)));
assert_eq!(bucket, bucket_name);
assert_eq!(bucket, config.bucket);
assert_eq!(location, NON_EXISTENT_NAME);
} else {
panic!("unexpected error type")
@ -380,9 +406,9 @@ mod test {
#[tokio::test]
async fn gcs_test_delete_nonexistent_bucket() -> Result<()> {
maybe_skip_integration!();
let bucket_name = NON_EXISTENT_NAME;
let integration = GoogleCloudStorage::new(bucket_name);
let mut config = maybe_skip_integration!();
config.bucket = NON_EXISTENT_NAME.into();
let integration = GoogleCloudStorage::new(config.service_account, &config.bucket);
let mut location = integration.new_path();
location.set_file_name(NON_EXISTENT_NAME);
@ -396,7 +422,7 @@ mod test {
} = err
{
assert!(matches!(source, cloud_storage::Error::Google(_)));
assert_eq!(bucket, bucket_name);
assert_eq!(bucket, config.bucket);
assert_eq!(location, NON_EXISTENT_NAME);
} else {
panic!("unexpected error type")
@ -407,9 +433,10 @@ mod test {
#[tokio::test]
async fn gcs_test_put_nonexistent_bucket() -> Result<()> {
maybe_skip_integration!();
let bucket_name = NON_EXISTENT_NAME;
let integration = GoogleCloudStorage::new(bucket_name);
let mut config = maybe_skip_integration!();
config.bucket = NON_EXISTENT_NAME.into();
let integration = GoogleCloudStorage::new(config.service_account, &config.bucket);
let mut location = integration.new_path();
location.set_file_name(NON_EXISTENT_NAME);
@ -432,7 +459,7 @@ mod test {
} = err
{
assert!(matches!(source, cloud_storage::Error::Other(_)));
assert_eq!(bucket, bucket_name);
assert_eq!(bucket, config.bucket);
assert_eq!(location, NON_EXISTENT_NAME);
} else {
panic!("unexpected error type");

View File

@ -11,6 +11,10 @@ pub const DEFAULT_API_BIND_ADDR: &str = "127.0.0.1:8080";
/// The default bind address for the gRPC.
pub const DEFAULT_GRPC_BIND_ADDR: &str = "127.0.0.1:8082";
/// The AWS region to use for Amazon S3 based object storage if none is
/// specified.
pub const FALLBACK_AWS_REGION: &str = "us-east-1";
#[derive(Debug, StructOpt)]
#[structopt(
name = "server",
@ -102,27 +106,94 @@ Possible values (case insensitive):
* memory (default): Effectively no object persistence.
* file: Stores objects in the local filesystem. Must also set `--data-dir`.
* s3: Amazon S3. Must also set `--bucket`, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, and
AWS_DEFAULT_REGION.
* google: Google Cloud Storage. Must also set `--bucket` and SERVICE_ACCOUNT.
* azure: Microsoft Azure blob storage. Must also set `--bucket`, AZURE_STORAGE_ACCOUNT,
and AZURE_STORAGE_MASTER_KEY.
* s3: Amazon S3. Must also set `--bucket`, `--aws-access-key-id`, `--aws-secret-access-key`, and
possibly `--aws-default-region`.
* google: Google Cloud Storage. Must also set `--bucket` and `--google-service-account`.
* azure: Microsoft Azure blob storage. Must also set `--bucket`, `--azure-storage-account`,
and `--azure-storage-access-key`.
"#,
)]
pub object_store: Option<ObjectStore>,
/// Name of the bucket to use for the object store. Must also set
/// `--object_store` to a cloud object storage to have any effect.
/// `--object-store` to a cloud object storage to have any effect.
///
/// If using Google Cloud Storage for the object store, this item, as well
/// as SERVICE_ACCOUNT must be set.
/// If using Google Cloud Storage for the object store, this item as well
/// as `--google-service-account` must be set.
///
/// If using S3 for the object store, this item, as well
/// as AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY and AWS_DEFAULT_REGION must
/// be set.
/// If using S3 for the object store, must set this item as well
/// as `--aws-access-key-id` and `--aws-secret-access-key`. Can also set
/// `--aws-default-region` if not using the fallback region.
///
/// If using Azure for the object store, set this item to the name of a
/// container you've created in the associated storage account, under
/// Blob Service > Containers. Must also set `--azure-storage-account` and
/// `--azure-storage-access-key`.
#[structopt(long = "--bucket", env = "INFLUXDB_IOX_BUCKET")]
pub bucket: Option<String>,
/// When using Amazon S3 as the object store, set this to an access key that
/// has permission to read from and write to the specified S3 bucket.
///
/// Must also set `--object-store=s3`, `--bucket`, and
/// `--aws-secret-access-key`. Can also set `--aws-default-region` if not
/// using the fallback region.
///
/// Prefer the environment variable over the command line flag in shared
/// environments.
#[structopt(long = "--aws-access-key-id", env = "AWS_ACCESS_KEY_ID")]
pub aws_access_key_id: Option<String>,
/// When using Amazon S3 as the object store, set this to the secret access
/// key that goes with the specified access key ID.
///
/// Must also set `--object-store=s3`, `--bucket`, `--aws-access-key-id`.
/// Can also set `--aws-default-region` if not using the fallback region.
///
/// Prefer the environment variable over the command line flag in shared
/// environments.
#[structopt(long = "--aws-secret-access-key", env = "AWS_SECRET_ACCESS_KEY")]
pub aws_secret_access_key: Option<String>,
/// When using Amazon S3 as the object store, set this to the region
/// that goes with the specified bucket if different from the fallback
/// value.
///
/// Must also set `--object-store=s3`, `--bucket`, `--aws-access-key-id`,
/// and `--aws-secret-access-key`.
#[structopt(
long = "--aws-default-region",
env = "AWS_DEFAULT_REGION",
default_value = FALLBACK_AWS_REGION,
)]
pub aws_default_region: String,
/// When using Google Cloud Storage as the object store, set this to the
/// path to the JSON file that contains the Google credentials.
///
/// Must also set `--object-store=google` and `--bucket`.
#[structopt(long = "--google-service-account", env = "GOOGLE_SERVICE_ACCOUNT")]
pub google_service_account: Option<String>,
/// When using Microsoft Azure as the object store, set this to the
/// name you see when going to All Services > Storage accounts > [name].
///
/// Must also set `--object-store=azure`, `--bucket`, and
/// `--azure-storage-access-key`.
#[structopt(long = "--azure-storage-account", env = "AZURE_STORAGE_ACCOUNT")]
pub azure_storage_account: Option<String>,
/// When using Microsoft Azure as the object store, set this to one of the
/// Key values in the Storage account's Settings > Access keys.
///
/// Must also set `--object-store=azure`, `--bucket`, and
/// `--azure-storage-account`.
///
/// Prefer the environment variable over the command line flag in shared
/// environments.
#[structopt(long = "--azure-storage-access-key", env = "AZURE_STORAGE_ACCESS_KEY")]
pub azure_storage_access_key: Option<String>,
/// If set, Jaeger traces are emitted to this host
/// using the OpenTelemetry tracer.
///
@ -151,13 +222,13 @@ Possible values (case insensitive):
/// - user set environment variables
/// - .env file contents
/// - pre-configured default values
pub fn load_config() -> Config {
pub fn load_config() -> Box<Config> {
// Load the Config struct - this pulls in any envs set by the user or
// sourced above, and applies any defaults.
//
//let args = std::env::args().filter(|arg| arg != "server");
Config::from_iter(strip_server(std::env::args()).iter())
Box::new(Config::from_iter(strip_server(std::env::args()).iter()))
}
fn parse_socket_addr(s: &str) -> std::io::Result<SocketAddr> {

View File

@ -3,7 +3,9 @@ use crate::commands::{
server::{load_config, Config, ObjectStore as ObjStoreOpt},
};
use hyper::Server;
use object_store::{self, aws::AmazonS3, gcp::GoogleCloudStorage, ObjectStore};
use object_store::{
self, aws::AmazonS3, azure::MicrosoftAzure, gcp::GoogleCloudStorage, ObjectStore,
};
use panic_logging::SendPanicsToTracing;
use server::{ConnectionManagerImpl as ConnectionManager, Server as AppServer};
use snafu::{ResultExt, Snafu};
@ -21,18 +23,6 @@ pub enum Error {
source: std::io::Error,
},
#[snafu(display("Unable to initialize database in directory {:?}: {}", db_dir, source))]
InitializingMutableBuffer {
db_dir: PathBuf,
source: Box<dyn std::error::Error + Send + Sync>,
},
#[snafu(display("Unable to restore WAL from directory {:?}: {}", dir, source))]
RestoringMutableBuffer {
dir: PathBuf,
source: Box<dyn std::error::Error + Send + Sync>,
},
#[snafu(display(
"Unable to bind to listen for HTTP requests on {}: {}",
bind_addr,
@ -59,11 +49,21 @@ pub enum Error {
#[snafu(display("Error serving RPC: {}", source))]
ServingRPC { source: self::rpc::Error },
#[snafu(display("Specifed {} for the object store, but not a bucket", object_store))]
InvalidCloudObjectStoreConfiguration { object_store: ObjStoreOpt },
#[snafu(display(
"Specified {} for the object store, required configuration missing for {}",
object_store,
missing
))]
MissingObjectStoreConfig {
object_store: ObjStoreOpt,
missing: String,
},
#[snafu(display("Specified file for the object store, but not a database directory"))]
InvalidFileObjectStoreConfiguration,
// Creating a new S3 object store can fail if the region is *specified* but
// not *parseable* as a rusoto `Region`. The other object store constructors
// don't return `Result`.
#[snafu(display("Amazon S3 configuration was invalid: {}", source))]
InvalidS3Config { source: object_store::aws::Error },
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -73,7 +73,7 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
///
/// The logging_level passed in is the global setting (e.g. if -v or
/// -vv was passed in before 'server')
pub async fn main(logging_level: LoggingLevel, config: Option<Config>) -> Result<()> {
pub async fn main(logging_level: LoggingLevel, config: Option<Box<Config>>) -> Result<()> {
// load config from environment if no command line
let config = config.unwrap_or_else(load_config);
@ -101,7 +101,7 @@ pub async fn main(logging_level: LoggingLevel, config: Option<Config>) -> Result
}
}
let object_store = ObjectStore::try_from(&config)?;
let object_store = ObjectStore::try_from(&*config)?;
let object_storage = Arc::new(object_store);
let connection_manager = ConnectionManager {};
@ -165,40 +165,101 @@ impl TryFrom<&Config> for ObjectStore {
Ok(Self::new_in_memory(object_store::memory::InMemory::new()))
}
Some(ObjStoreOpt::Google) => match config.bucket.as_ref() {
Some(bucket) => Ok(Self::new_google_cloud_storage(GoogleCloudStorage::new(
bucket,
))),
None => InvalidCloudObjectStoreConfiguration {
object_store: ObjStoreOpt::Google,
}
.fail(),
},
Some(ObjStoreOpt::Google) => {
match (
config.bucket.as_ref(),
config.google_service_account.as_ref(),
) {
(Some(bucket), Some(service_account)) => Ok(Self::new_google_cloud_storage(
GoogleCloudStorage::new(service_account, bucket),
)),
(bucket, service_account) => {
let mut missing_args = vec![];
Some(ObjStoreOpt::S3) => {
match config.bucket.as_ref() {
Some(bucket) => {
// rusoto::Region's default takes the value from the AWS_DEFAULT_REGION env
// var.
Ok(Self::new_amazon_s3(AmazonS3::new(
Default::default(),
bucket,
)))
if bucket.is_none() {
missing_args.push("bucket");
}
if service_account.is_none() {
missing_args.push("google-service-account");
}
MissingObjectStoreConfig {
object_store: ObjStoreOpt::Google,
missing: missing_args.join(", "),
}
.fail()
}
None => InvalidCloudObjectStoreConfiguration {
object_store: ObjStoreOpt::S3,
}
.fail(),
}
}
Some(ObjStoreOpt::Azure) => match config.bucket.as_ref() {
Some(_bucket) => unimplemented!(),
None => InvalidCloudObjectStoreConfiguration {
object_store: ObjStoreOpt::Azure,
Some(ObjStoreOpt::S3) => {
match (
config.bucket.as_ref(),
config.aws_access_key_id.as_ref(),
config.aws_secret_access_key.as_ref(),
config.aws_default_region.as_str(),
) {
(Some(bucket), Some(key_id), Some(secret_key), region) => {
Ok(Self::new_amazon_s3(
AmazonS3::new(key_id, secret_key, region, bucket)
.context(InvalidS3Config)?,
))
}
(bucket, key_id, secret_key, _) => {
let mut missing_args = vec![];
if bucket.is_none() {
missing_args.push("bucket");
}
if key_id.is_none() {
missing_args.push("aws-access-key-id");
}
if secret_key.is_none() {
missing_args.push("aws-secret-access-key");
}
MissingObjectStoreConfig {
object_store: ObjStoreOpt::S3,
missing: missing_args.join(", "),
}
.fail()
}
}
.fail(),
},
}
Some(ObjStoreOpt::Azure) => {
match (
config.bucket.as_ref(),
config.azure_storage_account.as_ref(),
config.azure_storage_access_key.as_ref(),
) {
(Some(bucket), Some(storage_account), Some(access_key)) => {
Ok(Self::new_microsoft_azure(MicrosoftAzure::new(
storage_account,
access_key,
bucket,
)))
}
(bucket, storage_account, access_key) => {
let mut missing_args = vec![];
if bucket.is_none() {
missing_args.push("bucket");
}
if storage_account.is_none() {
missing_args.push("azure-storage-account");
}
if access_key.is_none() {
missing_args.push("azure-storage-access-key");
}
MissingObjectStoreConfig {
object_store: ObjStoreOpt::Azure,
missing: missing_args.join(", "),
}
.fail()
}
}
}
Some(ObjStoreOpt::File) => match config.database_directory.as_ref() {
Some(db_dir) => {
@ -206,7 +267,11 @@ impl TryFrom<&Config> for ObjectStore {
.context(CreatingDatabaseDirectory { path: db_dir })?;
Ok(Self::new_file(object_store::disk::File::new(&db_dir)))
}
None => InvalidFileObjectStoreConfiguration.fail(),
None => MissingObjectStoreConfig {
object_store: ObjStoreOpt::File,
missing: "data-dir",
}
.fail(),
},
}
}
@ -217,6 +282,7 @@ mod tests {
use super::*;
use object_store::ObjectStoreIntegration;
use structopt::StructOpt;
use tempfile::TempDir;
#[test]
fn default_object_store_is_memory() {
@ -241,4 +307,144 @@ mod tests {
ObjectStore(ObjectStoreIntegration::InMemory(_))
));
}
#[test]
fn valid_s3_config() {
let config = Config::from_iter_safe(&[
"server",
"--object-store",
"s3",
"--bucket",
"mybucket",
"--aws-access-key-id",
"NotARealAWSAccessKey",
"--aws-secret-access-key",
"NotARealAWSSecretAccessKey",
])
.unwrap();
let object_store = ObjectStore::try_from(&config).unwrap();
assert!(matches!(
object_store,
ObjectStore(ObjectStoreIntegration::AmazonS3(_))
));
}
#[test]
fn s3_config_missing_params() {
let config = Config::from_iter_safe(&["server", "--object-store", "s3"]).unwrap();
let err = ObjectStore::try_from(&config).unwrap_err().to_string();
assert_eq!(
err,
"Specified S3 for the object store, required configuration missing for \
bucket, aws-access-key-id, aws-secret-access-key"
);
}
#[test]
fn valid_google_config() {
let config = Config::from_iter_safe(&[
"server",
"--object-store",
"google",
"--bucket",
"mybucket",
"--google-service-account",
"~/Not/A/Real/path.json",
])
.unwrap();
let object_store = ObjectStore::try_from(&config).unwrap();
assert!(matches!(
object_store,
ObjectStore(ObjectStoreIntegration::GoogleCloudStorage(_))
));
}
#[test]
fn google_config_missing_params() {
let config = Config::from_iter_safe(&["server", "--object-store", "google"]).unwrap();
let err = ObjectStore::try_from(&config).unwrap_err().to_string();
assert_eq!(
err,
"Specified Google for the object store, required configuration missing for \
bucket, google-service-account"
);
}
#[test]
fn valid_azure_config() {
let config = Config::from_iter_safe(&[
"server",
"--object-store",
"azure",
"--bucket",
"mybucket",
"--azure-storage-account",
"NotARealStorageAccount",
"--azure-storage-access-key",
"NotARealKey",
])
.unwrap();
let object_store = ObjectStore::try_from(&config).unwrap();
assert!(matches!(
object_store,
ObjectStore(ObjectStoreIntegration::MicrosoftAzure(_))
));
}
#[test]
fn azure_config_missing_params() {
let config = Config::from_iter_safe(&["server", "--object-store", "azure"]).unwrap();
let err = ObjectStore::try_from(&config).unwrap_err().to_string();
assert_eq!(
err,
"Specified Azure for the object store, required configuration missing for \
bucket, azure-storage-account, azure-storage-access-key"
);
}
#[test]
fn valid_file_config() {
let root = TempDir::new().unwrap();
let config = Config::from_iter_safe(&[
"server",
"--object-store",
"file",
"--data-dir",
root.path().to_str().unwrap(),
])
.unwrap();
let object_store = ObjectStore::try_from(&config).unwrap();
assert!(matches!(
object_store,
ObjectStore(ObjectStoreIntegration::File(_))
));
}
#[test]
fn file_config_missing_params() {
let config = Config::from_iter_safe(&["server", "--object-store", "file"]).unwrap();
let err = ObjectStore::try_from(&config).unwrap_err().to_string();
assert_eq!(
err,
"Specified File for the object store, required configuration missing for \
data-dir"
);
}
}

View File

@ -110,7 +110,8 @@ enum Command {
},
Database(commands::database::Config),
Stats(commands::stats::Config),
Server(commands::server::Config),
// Clippy recommended boxing this variant because it's much larger than the others
Server(Box<commands::server::Config>),
Writer(commands::writer::Config),
}