refactor: Make aws/azure/gcs optional features and stop compiling 100 dependencies during dev (#1933)
* feat: make aws, gcp, azure dependencies optional * fix: only run object store tests if the features are enabled * fix: clean up testing * fix: rename step * fix: add to list of jobs * fix: remove test with object store * fix: review commentspull/24376/head
parent
dac1e6f5ea
commit
1a79bf7e99
|
@ -110,6 +110,7 @@ jobs:
|
|||
- store_artifacts:
|
||||
path: target/doc/
|
||||
destination: rustdoc
|
||||
|
||||
test:
|
||||
docker:
|
||||
- image: quay.io/influxdb/rust:ci
|
||||
|
@ -190,6 +191,9 @@ jobs:
|
|||
- run:
|
||||
name: Build benches
|
||||
command: cargo test --workspace --benches --no-run
|
||||
- run:
|
||||
name: Build with object store support
|
||||
command: cargo build --features="aws,gcp,azure"
|
||||
- cache_save
|
||||
|
||||
# Lint protobufs.
|
||||
|
@ -243,7 +247,7 @@ jobs:
|
|||
- cache_restore
|
||||
- run:
|
||||
name: Cargo release build with target arch set for CRoaring
|
||||
command: ROARING_ARCH=x86-64 cargo build --release
|
||||
command: ROARING_ARCH=x86-64 cargo build --release --features="aws,gcp,azure"
|
||||
- run: |
|
||||
echo sha256sum after build is
|
||||
sha256sum target/release/influxdb_iox
|
||||
|
|
|
@ -131,3 +131,8 @@ rand = "0.8.3"
|
|||
rdkafka = "0.26.0"
|
||||
reqwest = "0.11"
|
||||
tempfile = "3.1.0"
|
||||
|
||||
[features]
|
||||
azure = ["object_store/azure"]
|
||||
gcp = ["object_store/gcp"]
|
||||
aws = ["object_store/aws"]
|
||||
|
|
|
@ -8,19 +8,19 @@ edition = "2018"
|
|||
async-trait = "0.1.42"
|
||||
# Microsoft Azure Blob storage integration
|
||||
# In order to support tokio 1.0 and delimiters, needed to pull in unreleased azure sdk
|
||||
azure_core = { git = "https://github.com/Azure/azure-sdk-for-rust.git", rev = "14ff9326bb1ba07f98733a548988eccd4532b945" }
|
||||
azure_storage = { git = "https://github.com/Azure/azure-sdk-for-rust.git", rev = "14ff9326bb1ba07f98733a548988eccd4532b945", default-features = false, features = ["table", "blob", "queue"] }
|
||||
azure_core = { git = "https://github.com/Azure/azure-sdk-for-rust.git", rev = "14ff9326bb1ba07f98733a548988eccd4532b945", optional = true }
|
||||
azure_storage = { git = "https://github.com/Azure/azure-sdk-for-rust.git", rev = "14ff9326bb1ba07f98733a548988eccd4532b945", optional = true, default-features = false, features = ["table", "blob", "queue"] }
|
||||
bytes = "1.0"
|
||||
chrono = "0.4"
|
||||
# Google Cloud Storage integration
|
||||
cloud-storage = "0.9.0"
|
||||
cloud-storage = {version = "0.9.0", optional = true}
|
||||
futures = "0.3"
|
||||
itertools = "0.10.1"
|
||||
percent-encoding = "2.1"
|
||||
# rusoto crates are for Amazon S3 integration
|
||||
rusoto_core = "0.46.0"
|
||||
rusoto_credential = "0.46.0"
|
||||
rusoto_s3 = "0.46.0"
|
||||
rusoto_core = { version = "0.46.0", optional = true}
|
||||
rusoto_credential = { version = "0.46.0", optional = true}
|
||||
rusoto_s3 = { version = "0.46.0", optional = true}
|
||||
snafu = { version = "0.6.10", features = ["futures"] }
|
||||
tokio = { version = "1.0", features = ["macros", "fs"] }
|
||||
# Filesystem integration
|
||||
|
@ -30,6 +30,11 @@ reqwest = "0.11"
|
|||
walkdir = "2"
|
||||
tempfile = "3.1.0"
|
||||
|
||||
[features]
|
||||
azure = ["azure_core", "azure_storage"]
|
||||
gcp = ["cloud-storage"]
|
||||
aws = ["rusoto_core", "rusoto_credential", "rusoto_s3"]
|
||||
|
||||
[dev-dependencies] # In alphabetical order
|
||||
dotenv = "0.15.0"
|
||||
tempfile = "3.1.0"
|
||||
|
|
|
@ -341,6 +341,17 @@ pub(crate) fn new_s3(
|
|||
})
|
||||
}
|
||||
|
||||
pub(crate) fn new_failing_s3() -> Result<AmazonS3> {
|
||||
new_s3(
|
||||
Some("foo"),
|
||||
Some("bar"),
|
||||
"us-east-1",
|
||||
"bucket",
|
||||
None as Option<&str>,
|
||||
None as Option<&str>,
|
||||
)
|
||||
}
|
||||
|
||||
impl AmazonS3 {
|
||||
/// List objects with the given prefix and a set delimiter of `/`. Returns
|
||||
/// common prefixes (directories) in addition to object metadata. Optionally
|
||||
|
|
|
@ -14,6 +14,7 @@ use tokio_util::io::{ReaderStream, StreamReader};
|
|||
///
|
||||
/// The temporary file will be deleted when the result stream
|
||||
/// is dropped.
|
||||
#[allow(dead_code)]
|
||||
pub async fn slurp_stream_tempfile<S>(bytes: S) -> Result<BufferedStream<File>>
|
||||
where
|
||||
S: Stream<Item = Result<Bytes>> + Send + Sync,
|
||||
|
@ -122,14 +123,4 @@ mod tests {
|
|||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_slurp_stream_tempfile() {
|
||||
check_stream(slurp_stream_tempfile(test_data()).await.unwrap()).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_slurp_stream_memory() {
|
||||
check_stream(slurp_stream_memory(test_data()).await.unwrap()).await;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,124 @@
|
|||
//! Crate that mimics the interface of the the various object stores
|
||||
//! but does nothing if they are not enabled.
|
||||
use async_trait::async_trait;
|
||||
use snafu::Snafu;
|
||||
|
||||
use crate::{path::cloud::CloudPath, ObjectStoreApi};
|
||||
|
||||
/// A specialized `Error` for Azure object store-related errors
|
||||
#[derive(Debug, Snafu, Clone)]
|
||||
#[allow(missing_docs)]
|
||||
pub enum Error {
|
||||
#[snafu(display(
|
||||
"'{}' not supported with this build. Hint: recompile with appropriate features",
|
||||
name
|
||||
))]
|
||||
NotSupported { name: String },
|
||||
}
|
||||
/// Result for the dummy object store
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
/// An object store that always generates an error
|
||||
pub struct DummyObjectStore {
|
||||
name: String,
|
||||
}
|
||||
|
||||
/// If aws feature not available, use DummyObjectStore
|
||||
pub type AmazonS3 = DummyObjectStore;
|
||||
|
||||
/// If azure feature not available, use DummyObjectStore
|
||||
pub type MicrosoftAzure = DummyObjectStore;
|
||||
|
||||
/// If gcp feature not available, use DummyObjectStore
|
||||
pub type GoogleCloudStorage = DummyObjectStore;
|
||||
|
||||
#[async_trait]
|
||||
impl ObjectStoreApi for DummyObjectStore {
|
||||
type Path = CloudPath;
|
||||
type Error = Error;
|
||||
|
||||
fn new_path(&self) -> Self::Path {
|
||||
CloudPath::default()
|
||||
}
|
||||
|
||||
async fn put<S>(
|
||||
&self,
|
||||
_location: &Self::Path,
|
||||
_bytes: S,
|
||||
_length: Option<usize>,
|
||||
) -> crate::Result<(), Self::Error>
|
||||
where
|
||||
S: futures::Stream<Item = std::io::Result<bytes::Bytes>> + Send + Sync + 'static,
|
||||
{
|
||||
NotSupported { name: &self.name }.fail()
|
||||
}
|
||||
|
||||
async fn get(
|
||||
&self,
|
||||
_location: &Self::Path,
|
||||
) -> crate::Result<
|
||||
futures::stream::BoxStream<'static, crate::Result<bytes::Bytes, Self::Error>>,
|
||||
Self::Error,
|
||||
> {
|
||||
NotSupported { name: &self.name }.fail()
|
||||
}
|
||||
|
||||
async fn delete(&self, _location: &Self::Path) -> crate::Result<(), Self::Error> {
|
||||
NotSupported { name: &self.name }.fail()
|
||||
}
|
||||
|
||||
async fn list<'a>(
|
||||
&'a self,
|
||||
_prefix: Option<&'a Self::Path>,
|
||||
) -> crate::Result<
|
||||
futures::stream::BoxStream<'a, crate::Result<Vec<Self::Path>, Self::Error>>,
|
||||
Self::Error,
|
||||
> {
|
||||
NotSupported { name: &self.name }.fail()
|
||||
}
|
||||
|
||||
async fn list_with_delimiter(
|
||||
&self,
|
||||
_prefix: &Self::Path,
|
||||
) -> crate::Result<crate::ListResult<Self::Path>, Self::Error> {
|
||||
NotSupported { name: &self.name }.fail()
|
||||
}
|
||||
}
|
||||
|
||||
/// Stub when s3 is not configured
|
||||
#[allow(dead_code)]
|
||||
pub(crate) fn new_s3(
|
||||
_access_key_id: Option<impl Into<String>>,
|
||||
_secret_access_key: Option<impl Into<String>>,
|
||||
_region: impl Into<String>,
|
||||
_bucket_name: impl Into<String>,
|
||||
_endpoint: Option<impl Into<String>>,
|
||||
_session_token: Option<impl Into<String>>,
|
||||
) -> Result<DummyObjectStore> {
|
||||
NotSupported { name: "aws" }.fail()
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub(crate) fn new_failing_s3() -> Result<AmazonS3> {
|
||||
Ok(DummyObjectStore { name: "aws".into() })
|
||||
}
|
||||
|
||||
/// Stub when gcs is not configured
|
||||
#[allow(dead_code)]
|
||||
pub(crate) fn new_gcs(
|
||||
_service_account_path: impl AsRef<std::ffi::OsStr>,
|
||||
_bucket_name: impl Into<String>,
|
||||
) -> Result<DummyObjectStore> {
|
||||
NotSupported { name: "gcs" }.fail()
|
||||
}
|
||||
|
||||
/// Stub when azure is not configured
|
||||
#[allow(dead_code)]
|
||||
pub(crate) fn new_azure(
|
||||
_account: impl Into<String>,
|
||||
_access_key: impl Into<String>,
|
||||
_container_name: impl Into<String>,
|
||||
) -> Result<DummyObjectStore> {
|
||||
NotSupported { name: "azure" }.fail()
|
||||
}
|
|
@ -17,15 +17,27 @@
|
|||
//!
|
||||
//! Future compatibility will include Azure Blob Storage, Minio, and Ceph.
|
||||
|
||||
#[cfg(feature = "aws")]
|
||||
mod aws;
|
||||
#[cfg(feature = "azure")]
|
||||
mod azure;
|
||||
mod buffer;
|
||||
pub mod disk;
|
||||
#[cfg(feature = "gcp")]
|
||||
mod gcp;
|
||||
pub mod memory;
|
||||
pub mod path;
|
||||
pub mod throttle;
|
||||
|
||||
pub mod dummy;
|
||||
|
||||
#[cfg(not(feature = "aws"))]
|
||||
use dummy as aws;
|
||||
#[cfg(not(feature = "azure"))]
|
||||
use dummy as azure;
|
||||
#[cfg(not(feature = "gcp"))]
|
||||
use dummy as gcp;
|
||||
|
||||
use aws::AmazonS3;
|
||||
use azure::MicrosoftAzure;
|
||||
use disk::File;
|
||||
|
@ -126,11 +138,18 @@ impl ObjectStore {
|
|||
Self(ObjectStoreIntegration::InMemory(in_mem))
|
||||
}
|
||||
|
||||
/// Configure throttled in-memory storage.
|
||||
/// For Testing: Configure throttled in-memory storage.
|
||||
pub fn new_in_memory_throttled(in_mem_throttled: ThrottledStore<InMemory>) -> Self {
|
||||
Self(ObjectStoreIntegration::InMemoryThrottled(in_mem_throttled))
|
||||
}
|
||||
|
||||
/// For Testing: Configure a object store with invalid credentials
|
||||
/// that will always fail on operations (hopefully)
|
||||
pub fn new_failing_store() -> Result<Self> {
|
||||
let s3 = aws::new_failing_s3()?;
|
||||
Ok(Self(ObjectStoreIntegration::AmazonS3(s3)))
|
||||
}
|
||||
|
||||
/// Configure local file storage.
|
||||
pub fn new_file(file: File) -> Self {
|
||||
Self(ObjectStoreIntegration::File(file))
|
||||
|
@ -497,6 +516,9 @@ pub enum Error {
|
|||
|
||||
#[snafu(display("In-memory-based Object Store error: {}", source))]
|
||||
InMemoryObjectStoreError { source: memory::Error },
|
||||
|
||||
#[snafu(display("{}", source))]
|
||||
DummyObjectStoreError { source: dummy::Error },
|
||||
}
|
||||
|
||||
impl From<disk::Error> for Error {
|
||||
|
@ -505,18 +527,21 @@ impl From<disk::Error> for Error {
|
|||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "gcp")]
|
||||
impl From<gcp::Error> for Error {
|
||||
fn from(source: gcp::Error) -> Self {
|
||||
Self::GcsObjectStoreError { source }
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "aws")]
|
||||
impl From<aws::Error> for Error {
|
||||
fn from(source: aws::Error) -> Self {
|
||||
Self::AwsObjectStoreError { source }
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "azure")]
|
||||
impl From<azure::Error> for Error {
|
||||
fn from(source: azure::Error) -> Self {
|
||||
Self::AzureObjectStoreError { source }
|
||||
|
@ -529,6 +554,12 @@ impl From<memory::Error> for Error {
|
|||
}
|
||||
}
|
||||
|
||||
impl From<dummy::Error> for Error {
|
||||
fn from(source: dummy::Error) -> Self {
|
||||
Self::DummyObjectStoreError { source }
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
@ -701,6 +732,7 @@ mod tests {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub(crate) async fn get_nonexistent_object(
|
||||
storage: &ObjectStore,
|
||||
location: Option<<ObjectStore as ObjectStoreApi>::Path>,
|
||||
|
|
|
@ -1874,15 +1874,7 @@ mod tests {
|
|||
#[tokio::test]
|
||||
async fn init_error_generic() {
|
||||
// use an object store that will hopefully fail to read
|
||||
let store = ObjectStore::new_amazon_s3(
|
||||
Some("foo".to_string()),
|
||||
Some("bar".to_string()),
|
||||
"us-east-1".to_string(),
|
||||
"bucket".to_string(),
|
||||
None as Option<String>,
|
||||
None as Option<String>,
|
||||
)
|
||||
.unwrap();
|
||||
let store = ObjectStore::new_failing_store().unwrap();
|
||||
|
||||
let manager = TestConnectionManager::new();
|
||||
let config = config_with_store(store);
|
||||
|
|
|
@ -65,13 +65,13 @@ pub enum Error {
|
|||
// Creating a new S3 object store can fail if the region is *specified* but
|
||||
// not *parseable* as a rusoto `Region`. The other object store constructors
|
||||
// don't return `Result`.
|
||||
#[snafu(display("Amazon S3 configuration was invalid: {}", source))]
|
||||
#[snafu(display("Error configuring Amazon S3: {}", source))]
|
||||
InvalidS3Config { source: object_store::Error },
|
||||
|
||||
#[snafu(display("GCS configuration was invalid: {}", source))]
|
||||
#[snafu(display("Error configuring GCS: {}", source))]
|
||||
InvalidGCSConfig { source: object_store::Error },
|
||||
|
||||
#[snafu(display("Microsoft Azure configuration was invalid: {}", source))]
|
||||
#[snafu(display("Error configuring Microsoft Azure: {}", source))]
|
||||
InvalidAzureConfig { source: object_store::Error },
|
||||
|
||||
#[snafu(display("Cannot read from object store: {}", source))]
|
||||
|
@ -478,6 +478,7 @@ mod tests {
|
|||
}
|
||||
|
||||
#[test]
|
||||
#[cfg(feature = "aws")]
|
||||
fn valid_s3_config() {
|
||||
let config = Config::from_iter_safe(&[
|
||||
"server",
|
||||
|
@ -513,6 +514,7 @@ mod tests {
|
|||
}
|
||||
|
||||
#[test]
|
||||
#[cfg(feature = "gcp")]
|
||||
fn valid_google_config() {
|
||||
let config = Config::from_iter_safe(&[
|
||||
"server",
|
||||
|
@ -547,6 +549,7 @@ mod tests {
|
|||
}
|
||||
|
||||
#[test]
|
||||
#[cfg(feature = "azure")]
|
||||
fn valid_azure_config() {
|
||||
let config = Config::from_iter_safe(&[
|
||||
"server",
|
||||
|
|
Loading…
Reference in New Issue