feat: Azure support for object store
Closes #528 This patch adds support for Microsfot Azure Blob storage. The implementations requires an account, a key and container name. They can be configured via the environment variables `AZURE_STORAGE_ACCOUNT`, `AZURE_STORAGE_MASTER_KEY` and `AZURE_STORAGE_CONTAINER`.pull/24376/head
parent
6c626e8279
commit
2cd383af6f
|
@ -1,5 +1,20 @@
|
|||
# This file is automatically @generated by Cargo.
|
||||
# It is not intended for manual editing.
|
||||
[[package]]
|
||||
name = "RustyXML"
|
||||
version = "0.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8b5ace29ee3216de37c0546865ad08edef58b0f9e76838ed8959a84a990e58c5"
|
||||
|
||||
[[package]]
|
||||
name = "addr2line"
|
||||
version = "0.14.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7c0929d69e78dd9bf5408269919fcbcaeb2e35e5d43e5815517cdc6a8e11a423"
|
||||
dependencies = [
|
||||
"gimli",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "adler"
|
||||
version = "0.2.3"
|
||||
|
@ -188,12 +203,108 @@ version = "1.0.1"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a"
|
||||
|
||||
[[package]]
|
||||
name = "azure_sdk_core"
|
||||
version = "0.43.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "57d8dd27eee0644b886305eca21c48425403a8bb87ec57f52f516093504fa3a5"
|
||||
dependencies = [
|
||||
"RustyXML",
|
||||
"base64 0.12.3",
|
||||
"bytes",
|
||||
"chrono",
|
||||
"failure",
|
||||
"futures",
|
||||
"http",
|
||||
"hyper",
|
||||
"hyper-rustls",
|
||||
"log",
|
||||
"quick-error",
|
||||
"serde",
|
||||
"serde-xml-rs",
|
||||
"serde_derive",
|
||||
"serde_json",
|
||||
"url",
|
||||
"uuid",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "azure_sdk_storage_blob"
|
||||
version = "0.45.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "eaf0e2fa1a14ea8c99f081aa2fd9098b663b5faaa5377d63aab380c5a62afeca"
|
||||
dependencies = [
|
||||
"RustyXML",
|
||||
"azure_sdk_core",
|
||||
"azure_sdk_storage_core",
|
||||
"base64 0.12.3",
|
||||
"chrono",
|
||||
"futures",
|
||||
"http",
|
||||
"hyper",
|
||||
"log",
|
||||
"md5",
|
||||
"percent-encoding",
|
||||
"serde",
|
||||
"serde-xml-rs",
|
||||
"serde_derive",
|
||||
"uuid",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "azure_sdk_storage_core"
|
||||
version = "0.44.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "93730b33cf379bacf5e56b5765f98a1f1ee27b2d530a2f8f027f4e806c53b4fd"
|
||||
dependencies = [
|
||||
"azure_sdk_core",
|
||||
"base64 0.12.3",
|
||||
"bytes",
|
||||
"chrono",
|
||||
"futures",
|
||||
"http",
|
||||
"hyper",
|
||||
"hyper-rustls",
|
||||
"log",
|
||||
"mime",
|
||||
"quick-error",
|
||||
"ring",
|
||||
"serde",
|
||||
"serde-xml-rs",
|
||||
"serde_derive",
|
||||
"serde_json",
|
||||
"smallvec",
|
||||
"time 0.2.23",
|
||||
"url",
|
||||
"uuid",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "backtrace"
|
||||
version = "0.3.55"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ef5140344c85b01f9bbb4d4b7288a8aa4b3287ccef913a14bcc78a1063623598"
|
||||
dependencies = [
|
||||
"addr2line",
|
||||
"cfg-if 1.0.0",
|
||||
"libc",
|
||||
"miniz_oxide",
|
||||
"object",
|
||||
"rustc-demangle",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "base-x"
|
||||
version = "0.2.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a4521f3e3d031370679b3b140beb36dfe4801b09ac77e30c61941f97df3ef28b"
|
||||
|
||||
[[package]]
|
||||
name = "base64"
|
||||
version = "0.11.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b41b7ea54a0c9d92199de89e20e58d49f02f8e699814ef3fdf266f6f748d15c7"
|
||||
|
||||
[[package]]
|
||||
name = "base64"
|
||||
version = "0.12.3"
|
||||
|
@ -462,16 +573,32 @@ version = "0.1.5"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "245097e9a4535ee1e3e3931fcfcd55a796a44c643e8596ff6566d68f09b87bbc"
|
||||
|
||||
[[package]]
|
||||
name = "core-foundation"
|
||||
version = "0.7.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "57d24c7a13c43e870e37c1556b74555437870a04514f7685f5b354e090567171"
|
||||
dependencies = [
|
||||
"core-foundation-sys 0.7.0",
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "core-foundation"
|
||||
version = "0.9.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0a89e2ae426ea83155dccf10c0fa6b1463ef6d5fcb44cee0b224a408fa640a62"
|
||||
dependencies = [
|
||||
"core-foundation-sys",
|
||||
"core-foundation-sys 0.8.2",
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "core-foundation-sys"
|
||||
version = "0.7.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b3a71ab494c0b5b860bdc8407ae08978052417070c2ced38573a9157ad75b8ac"
|
||||
|
||||
[[package]]
|
||||
name = "core-foundation-sys"
|
||||
version = "0.8.2"
|
||||
|
@ -719,6 +846,15 @@ dependencies = [
|
|||
"memchr",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ct-logs"
|
||||
version = "0.6.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4d3686f5fa27dbc1d76c751300376e167c5a43387f44bb451fd1c24776e49113"
|
||||
dependencies = [
|
||||
"sct",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "data_types"
|
||||
version = "0.1.0"
|
||||
|
@ -904,6 +1040,28 @@ dependencies = [
|
|||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "failure"
|
||||
version = "0.1.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d32e9bd16cc02eae7db7ef620b392808b89f6a5e16bb3497d159c6b92a0f4f86"
|
||||
dependencies = [
|
||||
"backtrace",
|
||||
"failure_derive",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "failure_derive"
|
||||
version = "0.1.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "aa4da3c766cd7a0db8242e326e9e4e081edd567072893ed320008189715366a4"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn",
|
||||
"synstructure",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "fake-simd"
|
||||
version = "0.1.2"
|
||||
|
@ -1155,6 +1313,12 @@ dependencies = [
|
|||
"wasi 0.9.0+wasi-snapshot-preview1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "gimli"
|
||||
version = "0.23.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f6503fe142514ca4799d4c26297c4248239fe8838d827db6bd6065c6ed29a6ce"
|
||||
|
||||
[[package]]
|
||||
name = "glob"
|
||||
version = "0.3.0"
|
||||
|
@ -1302,6 +1466,24 @@ dependencies = [
|
|||
"want",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hyper-rustls"
|
||||
version = "0.20.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ac965ea399ec3a25ac7d13b8affd4b8f39325cca00858ddf5eb29b79e6b14b08"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"ct-logs",
|
||||
"futures-util",
|
||||
"hyper",
|
||||
"log",
|
||||
"rustls",
|
||||
"rustls-native-certs",
|
||||
"tokio",
|
||||
"tokio-rustls",
|
||||
"webpki",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hyper-tls"
|
||||
version = "0.4.3"
|
||||
|
@ -1841,8 +2023,8 @@ dependencies = [
|
|||
"openssl-probe",
|
||||
"openssl-sys",
|
||||
"schannel",
|
||||
"security-framework",
|
||||
"security-framework-sys",
|
||||
"security-framework 2.0.0",
|
||||
"security-framework-sys 2.0.0",
|
||||
"tempfile",
|
||||
]
|
||||
|
||||
|
@ -1984,10 +2166,19 @@ dependencies = [
|
|||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "object"
|
||||
version = "0.22.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8d3b63360ec3cb337817c2dbd47ab4a0f170d285d8e5a2064600f3def1402397"
|
||||
|
||||
[[package]]
|
||||
name = "object_store"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"azure_sdk_core",
|
||||
"azure_sdk_storage_blob",
|
||||
"azure_sdk_storage_core",
|
||||
"bytes",
|
||||
"chrono",
|
||||
"cloud-storage",
|
||||
|
@ -2764,6 +2955,12 @@ dependencies = [
|
|||
"crossbeam-utils 0.8.1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rustc-demangle"
|
||||
version = "0.1.18"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6e3bad0ee36814ca07d7968269dd4b7ec89ec2da10c4bb613928d3077083c232"
|
||||
|
||||
[[package]]
|
||||
name = "rustc-hash"
|
||||
version = "1.1.0"
|
||||
|
@ -2779,6 +2976,31 @@ dependencies = [
|
|||
"semver",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rustls"
|
||||
version = "0.17.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c0d4a31f5d68413404705d6982529b0e11a9aacd4839d1d6222ee3b8cb4015e1"
|
||||
dependencies = [
|
||||
"base64 0.11.0",
|
||||
"log",
|
||||
"ring",
|
||||
"sct",
|
||||
"webpki",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rustls-native-certs"
|
||||
version = "0.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a75ffeb84a6bd9d014713119542ce415db3a3e4748f0bfce1e1416cd224a23a5"
|
||||
dependencies = [
|
||||
"openssl-probe",
|
||||
"rustls",
|
||||
"schannel",
|
||||
"security-framework 0.4.4",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rustyline"
|
||||
version = "6.3.0"
|
||||
|
@ -2835,6 +3057,29 @@ version = "1.1.0"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
|
||||
|
||||
[[package]]
|
||||
name = "sct"
|
||||
version = "0.6.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e3042af939fca8c3453b7af0f1c66e533a15a86169e39de2657310ade8f98d3c"
|
||||
dependencies = [
|
||||
"ring",
|
||||
"untrusted",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "security-framework"
|
||||
version = "0.4.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "64808902d7d99f78eaddd2b4e2509713babc3dc3c85ad6f4c447680f3c01e535"
|
||||
dependencies = [
|
||||
"bitflags",
|
||||
"core-foundation 0.7.0",
|
||||
"core-foundation-sys 0.7.0",
|
||||
"libc",
|
||||
"security-framework-sys 0.4.3",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "security-framework"
|
||||
version = "2.0.0"
|
||||
|
@ -2842,10 +3087,20 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
|||
checksum = "c1759c2e3c8580017a484a7ac56d3abc5a6c1feadf88db2f3633f12ae4268c69"
|
||||
dependencies = [
|
||||
"bitflags",
|
||||
"core-foundation",
|
||||
"core-foundation-sys",
|
||||
"core-foundation 0.9.1",
|
||||
"core-foundation-sys 0.8.2",
|
||||
"libc",
|
||||
"security-framework-sys 2.0.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "security-framework-sys"
|
||||
version = "0.4.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "17bf11d99252f512695eb468de5516e5cf75455521e69dfe343f3b74e4748405"
|
||||
dependencies = [
|
||||
"core-foundation-sys 0.7.0",
|
||||
"libc",
|
||||
"security-framework-sys",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -2854,7 +3109,7 @@ version = "2.0.0"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f99b9d5e26d2a71633cc4f2ebae7cc9f874044e0c351a27e17892d76dce5678b"
|
||||
dependencies = [
|
||||
"core-foundation-sys",
|
||||
"core-foundation-sys 0.8.2",
|
||||
"libc",
|
||||
]
|
||||
|
||||
|
@ -2882,6 +3137,18 @@ dependencies = [
|
|||
"serde_derive",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "serde-xml-rs"
|
||||
version = "0.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "efe415925cf3d0bbb2fc47d09b56ce03eef51c5d56846468a39bcc293c7a846c"
|
||||
dependencies = [
|
||||
"log",
|
||||
"serde",
|
||||
"thiserror",
|
||||
"xml-rs",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "serde_cbor"
|
||||
version = "0.11.1"
|
||||
|
@ -3028,6 +3295,9 @@ name = "smallvec"
|
|||
version = "1.5.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ae524f056d7d770e174287294f562e95044c68e88dec909a00d2094805db9d75"
|
||||
dependencies = [
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "snafu"
|
||||
|
@ -3182,6 +3452,18 @@ dependencies = [
|
|||
"unicode-xid",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "synstructure"
|
||||
version = "0.12.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b834f2d66f734cb897113e34aaff2f1ab4719ca946f9a7358dba8f8064148701"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn",
|
||||
"unicode-xid",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tempfile"
|
||||
version = "3.1.0"
|
||||
|
@ -3237,18 +3519,18 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "thiserror"
|
||||
version = "1.0.23"
|
||||
version = "1.0.22"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "76cc616c6abf8c8928e2fdcc0dbfab37175edd8fb49a4641066ad1364fdab146"
|
||||
checksum = "0e9ae34b84616eedaaf1e9dd6026dbe00dcafa92aa0c8077cb69df1fcfe5e53e"
|
||||
dependencies = [
|
||||
"thiserror-impl",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "thiserror-impl"
|
||||
version = "1.0.23"
|
||||
version = "1.0.22"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9be73a2caec27583d0046ef3796c3794f868a5bc813db689eed00c7631275cd1"
|
||||
checksum = "9ba20f23e85b10754cd195504aebf6a27e2e6cbe28c17778a0c930724628dd56"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
|
@ -3395,6 +3677,18 @@ dependencies = [
|
|||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio-rustls"
|
||||
version = "0.13.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "15cb62a0d2770787abc96e99c1cd98fcf17f94959f3af63ca85bdfb203f051b4"
|
||||
dependencies = [
|
||||
"futures-core",
|
||||
"rustls",
|
||||
"tokio",
|
||||
"webpki",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio-tls"
|
||||
version = "0.3.1"
|
||||
|
@ -3992,6 +4286,16 @@ dependencies = [
|
|||
"wasm-bindgen",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "webpki"
|
||||
version = "0.21.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b8e38c0608262c46d4a56202ebabdeb094cef7e560ca7a226c6bf055188aa4ea"
|
||||
dependencies = [
|
||||
"ring",
|
||||
"untrusted",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "which"
|
||||
version = "3.1.1"
|
||||
|
|
|
@ -24,6 +24,11 @@ tokio = { version = "0.2", features = ["full"] }
|
|||
# Filesystem integration
|
||||
tokio-util = "0.3.1"
|
||||
|
||||
# Microsoft Azure Blob storage integration
|
||||
azure_sdk_core = "0.43.7"
|
||||
azure_sdk_storage_blob = "0.45.3"
|
||||
azure_sdk_storage_core = "0.44.4"
|
||||
|
||||
[dev-dependencies]
|
||||
tempfile = "3.1.0"
|
||||
dotenv = "0.15.0"
|
||||
|
|
|
@ -0,0 +1,249 @@
|
|||
//! This module contains the IOx implementation for using Azure Blob storage as
|
||||
//! the object store.
|
||||
use crate::{
|
||||
path::{CloudConverter, ObjectStorePath},
|
||||
DataDoesNotMatchLength, Result, UnableToDeleteDataFromAzure, UnableToGetDataFromAzure,
|
||||
UnableToListDataFromAzure, UnableToPutDataToAzure,
|
||||
};
|
||||
use azure_sdk_core::prelude::*;
|
||||
use azure_sdk_storage_blob::prelude::*;
|
||||
use bytes::Bytes;
|
||||
use futures::{stream, FutureExt, Stream, TryStreamExt};
|
||||
use snafu::{ensure, ResultExt};
|
||||
use std::io;
|
||||
use std::sync::Arc;
|
||||
|
||||
/// Configuration for connecting to [Microsoft Azure Blob Storage](https://azure.microsoft.com/en-us/services/storage/blobs/).
|
||||
#[derive(Debug)]
|
||||
pub struct MicrosoftAzure {
|
||||
client: Arc<azure_sdk_storage_core::key_client::KeyClient>,
|
||||
container_name: String,
|
||||
}
|
||||
|
||||
impl MicrosoftAzure {
|
||||
/// Configure a connection to container with given name on Microsoft Azure
|
||||
/// Blob store.
|
||||
///
|
||||
/// The credentials `account` and `master_key` must provide access to the
|
||||
/// store.
|
||||
pub fn new(account: String, master_key: String, container_name: impl Into<String>) -> Self {
|
||||
Self {
|
||||
client: Arc::new(azure_sdk_storage_core::client::with_access_key(
|
||||
&account,
|
||||
&master_key,
|
||||
)),
|
||||
container_name: container_name.into(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Configure a connection to container with given name on Microsoft Azure
|
||||
/// Blob store.
|
||||
///
|
||||
/// The credentials `account` and `master_key` must be set via the
|
||||
/// environment variables `AZURE_STORAGE_ACCOUNT` and
|
||||
/// `AZURE_STORAGE_MASTER_KEY` respectively.
|
||||
pub fn new_from_env(container_name: impl Into<String>) -> Self {
|
||||
let account = std::env::var("AZURE_STORAGE_ACCOUNT")
|
||||
.expect("Set env variable AZURE_STORAGE_ACCOUNT first!");
|
||||
let master_key = std::env::var("AZURE_STORAGE_MASTER_KEY")
|
||||
.expect("Set env variable AZURE_STORAGE_MASTER_KEY first!");
|
||||
|
||||
Self::new(account, master_key, container_name)
|
||||
}
|
||||
|
||||
/// Save the provided bytes to the specified location.
|
||||
pub async fn put<S>(&self, location: &ObjectStorePath, bytes: S, length: usize) -> Result<()>
|
||||
where
|
||||
S: Stream<Item = io::Result<Bytes>> + Send + Sync + 'static,
|
||||
{
|
||||
let location = CloudConverter::convert(&location);
|
||||
let temporary_non_streaming = bytes
|
||||
.map_ok(|b| bytes::BytesMut::from(&b[..]))
|
||||
.try_concat()
|
||||
.await
|
||||
.expect("Should have been able to collect streaming data");
|
||||
|
||||
ensure!(
|
||||
temporary_non_streaming.len() == length,
|
||||
DataDoesNotMatchLength {
|
||||
actual: temporary_non_streaming.len(),
|
||||
expected: length,
|
||||
}
|
||||
);
|
||||
|
||||
self.client
|
||||
.put_block_blob()
|
||||
.with_container_name(&self.container_name)
|
||||
.with_blob_name(&location)
|
||||
.with_body(&temporary_non_streaming)
|
||||
.finalize()
|
||||
.await
|
||||
.context(UnableToPutDataToAzure {
|
||||
location: location.to_owned(),
|
||||
})?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Return the bytes that are stored at the specified location.
|
||||
pub async fn get(
|
||||
&self,
|
||||
location: &ObjectStorePath,
|
||||
) -> Result<impl Stream<Item = Result<Bytes>>> {
|
||||
let client = self.client.clone();
|
||||
let container_name = self.container_name.clone();
|
||||
let location = CloudConverter::convert(&location);
|
||||
Ok(async move {
|
||||
client
|
||||
.get_blob()
|
||||
.with_container_name(&container_name)
|
||||
.with_blob_name(&location)
|
||||
.finalize()
|
||||
.await
|
||||
.map(|blob| blob.data.into())
|
||||
.context(UnableToGetDataFromAzure {
|
||||
location: location.to_owned(),
|
||||
})
|
||||
}
|
||||
.into_stream())
|
||||
}
|
||||
|
||||
/// Delete the object at the specified location.
|
||||
pub async fn delete(&self, location: &ObjectStorePath) -> Result<()> {
|
||||
let location = CloudConverter::convert(&location);
|
||||
self.client
|
||||
.delete_blob()
|
||||
.with_container_name(&self.container_name)
|
||||
.with_blob_name(&location)
|
||||
.with_delete_snapshots_method(DeleteSnapshotsMethod::Include)
|
||||
.finalize()
|
||||
.await
|
||||
.context(UnableToDeleteDataFromAzure {
|
||||
location: location.to_owned(),
|
||||
})?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// List all the objects with the given prefix.
|
||||
pub async fn list<'a>(
|
||||
&'a self,
|
||||
prefix: Option<&'a ObjectStorePath>,
|
||||
) -> Result<impl Stream<Item = Result<Vec<ObjectStorePath>>> + 'a> {
|
||||
#[derive(Clone)]
|
||||
enum ListState {
|
||||
Start,
|
||||
HasMore(String),
|
||||
Done,
|
||||
}
|
||||
|
||||
Ok(stream::unfold(ListState::Start, move |state| async move {
|
||||
let mut request = self
|
||||
.client
|
||||
.list_blobs()
|
||||
.with_container_name(&self.container_name);
|
||||
|
||||
let prefix = prefix.map(CloudConverter::convert);
|
||||
if let Some(ref p) = prefix {
|
||||
request = request.with_prefix(p);
|
||||
}
|
||||
|
||||
match state {
|
||||
ListState::HasMore(ref token) => {
|
||||
request = request.with_next_marker(token);
|
||||
}
|
||||
ListState::Done => {
|
||||
return None;
|
||||
}
|
||||
ListState::Start => {}
|
||||
}
|
||||
|
||||
let resp = match request.finalize().await.context(UnableToListDataFromAzure) {
|
||||
Ok(resp) => resp,
|
||||
Err(err) => return Some((Err(err), state)),
|
||||
};
|
||||
|
||||
let next_state = if let Some(token) = resp.incomplete_vector.token() {
|
||||
ListState::HasMore(token.to_string())
|
||||
} else {
|
||||
ListState::Done
|
||||
};
|
||||
|
||||
let names = resp
|
||||
.incomplete_vector
|
||||
.vector
|
||||
.into_iter()
|
||||
.map(|blob| ObjectStorePath::from_cloud_unchecked(blob.name))
|
||||
.collect();
|
||||
|
||||
Some((Ok(names), next_state))
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::{tests::put_get_delete_list, ObjectStore};
|
||||
use std::env;
|
||||
|
||||
type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
|
||||
type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
|
||||
// Helper macro to skip tests if the GCP environment variables are not set.
|
||||
// Skips become hard errors if TEST_INTEGRATION is set.
|
||||
macro_rules! maybe_skip_integration {
|
||||
() => {
|
||||
dotenv::dotenv().ok();
|
||||
|
||||
let account = env::var("AZURE_STORAGE_ACCOUNT");
|
||||
let container = env::var("AZURE_STORAGE_CONTAINER");
|
||||
let force = std::env::var("TEST_INTEGRATION");
|
||||
|
||||
match (account.is_ok(), container.is_ok(), force.is_ok()) {
|
||||
(false, false, true) => {
|
||||
panic!(
|
||||
"TEST_INTEGRATION is set, \
|
||||
but AZURE_STROAGE_ACCOUNT and AZURE_STORAGE_CONTAINER are not"
|
||||
)
|
||||
}
|
||||
(false, true, true) => {
|
||||
panic!("TEST_INTEGRATION is set, but AZURE_STORAGE_ACCOUNT is not")
|
||||
}
|
||||
(true, false, true) => {
|
||||
panic!("TEST_INTEGRATION is set, but AZURE_STORAGE_CONTAINER is not")
|
||||
}
|
||||
(false, false, false) => {
|
||||
eprintln!(
|
||||
"skipping integration test - set \
|
||||
AZURE_STROAGE_ACCOUNT and AZURE_STORAGE_CONTAINER to run"
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
(false, true, false) => {
|
||||
eprintln!("skipping integration test - set AZURE_STORAGE_ACCOUNT to run");
|
||||
return Ok(());
|
||||
}
|
||||
(true, false, false) => {
|
||||
eprintln!("skipping integration test - set AZURE_STROAGE_CONTAINER to run");
|
||||
return Ok(());
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn azure_blob_test() -> Result<()> {
|
||||
maybe_skip_integration!();
|
||||
|
||||
let container_name = env::var("AZURE_STORAGE_CONTAINER")
|
||||
.map_err(|_| "The environment variable AZURE_STORAGE_CONTAINER must be set")?;
|
||||
let azure = MicrosoftAzure::new_from_env(container_name);
|
||||
|
||||
let integration = ObjectStore::new_microsoft_azure(azure);
|
||||
put_get_delete_list(&integration).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
|
@ -157,7 +157,7 @@ mod test {
|
|||
|
||||
const NON_EXISTENT_NAME: &str = "nonexistentname";
|
||||
|
||||
// Helper macro to skip tests if the AWS environment variables are not set.
|
||||
// Helper macro to skip tests if the GCP environment variables are not set.
|
||||
// Skips become hard errors if TEST_INTEGRATION is set.
|
||||
macro_rules! maybe_skip_integration {
|
||||
() => {
|
||||
|
|
|
@ -16,12 +16,14 @@
|
|||
//! Future compatibility will include Azure Blob Storage, Minio, and Ceph.
|
||||
|
||||
pub mod aws;
|
||||
pub mod azure;
|
||||
pub mod disk;
|
||||
pub mod gcp;
|
||||
pub mod memory;
|
||||
pub mod path;
|
||||
|
||||
use aws::AmazonS3;
|
||||
use azure::MicrosoftAzure;
|
||||
use disk::File;
|
||||
use gcp::GoogleCloudStorage;
|
||||
use memory::InMemory;
|
||||
|
@ -31,7 +33,7 @@ use bytes::Bytes;
|
|||
use chrono::{DateTime, Utc};
|
||||
use futures::{Stream, StreamExt, TryStreamExt};
|
||||
use snafu::Snafu;
|
||||
use std::{io, path::PathBuf};
|
||||
use std::{io, path::PathBuf, unimplemented};
|
||||
|
||||
/// Universal interface to multiple object store services.
|
||||
#[derive(Debug)]
|
||||
|
@ -58,6 +60,11 @@ impl ObjectStore {
|
|||
Self(ObjectStoreIntegration::File(file))
|
||||
}
|
||||
|
||||
/// Configure a connection to Microsoft Azure Blob store.
|
||||
pub fn new_microsoft_azure(azure: MicrosoftAzure) -> Self {
|
||||
Self(ObjectStoreIntegration::MicrosoftAzure(Box::new(azure)))
|
||||
}
|
||||
|
||||
/// Save the provided bytes to the specified location.
|
||||
pub async fn put<S>(&self, location: &ObjectStorePath, bytes: S, length: usize) -> Result<()>
|
||||
where
|
||||
|
@ -69,6 +76,7 @@ impl ObjectStore {
|
|||
GoogleCloudStorage(gcs) => gcs.put(location, bytes, length).await?,
|
||||
InMemory(in_mem) => in_mem.put(location, bytes, length).await?,
|
||||
File(file) => file.put(location, bytes, length).await?,
|
||||
MicrosoftAzure(azure) => azure.put(location, bytes, length).await?,
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
@ -85,6 +93,7 @@ impl ObjectStore {
|
|||
GoogleCloudStorage(gcs) => gcs.get(location).await?.boxed(),
|
||||
InMemory(in_mem) => in_mem.get(location).await?.boxed(),
|
||||
File(file) => file.get(location).await?.boxed(),
|
||||
MicrosoftAzure(azure) => azure.get(location).await?.boxed(),
|
||||
}
|
||||
.err_into())
|
||||
}
|
||||
|
@ -97,6 +106,7 @@ impl ObjectStore {
|
|||
GoogleCloudStorage(gcs) => gcs.delete(location).await?,
|
||||
InMemory(in_mem) => in_mem.delete(location).await?,
|
||||
File(file) => file.delete(location).await?,
|
||||
MicrosoftAzure(azure) => azure.delete(location).await?,
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
@ -113,6 +123,7 @@ impl ObjectStore {
|
|||
GoogleCloudStorage(gcs) => gcs.list(prefix).await?.boxed(),
|
||||
InMemory(in_mem) => in_mem.list(prefix).await?.boxed(),
|
||||
File(file) => file.list(prefix).await?.boxed(),
|
||||
MicrosoftAzure(azure) => azure.list(prefix).await?.boxed(),
|
||||
}
|
||||
.err_into())
|
||||
}
|
||||
|
@ -130,6 +141,7 @@ impl ObjectStore {
|
|||
GoogleCloudStorage(_gcs) => unimplemented!(),
|
||||
InMemory(in_mem) => in_mem.list_with_delimiter(prefix, &None).await,
|
||||
File(_file) => unimplemented!(),
|
||||
MicrosoftAzure(_azure) => unimplemented!(),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -139,7 +151,7 @@ impl ObjectStore {
|
|||
pub fn convert_path(&self, path: &ObjectStorePath) -> String {
|
||||
use ObjectStoreIntegration::*;
|
||||
match &self.0 {
|
||||
AmazonS3(_) | GoogleCloudStorage(_) | InMemory(_) => {
|
||||
AmazonS3(_) | GoogleCloudStorage(_) | InMemory(_) | MicrosoftAzure(_) => {
|
||||
path::CloudConverter::convert(path)
|
||||
}
|
||||
File(_) => path::FileConverter::convert(path).display().to_string(),
|
||||
|
@ -158,6 +170,8 @@ pub enum ObjectStoreIntegration {
|
|||
InMemory(InMemory),
|
||||
/// Local file system storage
|
||||
File(File),
|
||||
/// Microsoft Azure Blob storage
|
||||
MicrosoftAzure(Box<MicrosoftAzure>),
|
||||
}
|
||||
|
||||
/// Result of a list call that includes objects, prefixes (directories) and a
|
||||
|
@ -274,6 +288,22 @@ pub enum Error {
|
|||
},
|
||||
NoDataInMemory,
|
||||
|
||||
UnableToPutDataToAzure {
|
||||
source: azure_sdk_core::errors::AzureError,
|
||||
location: String,
|
||||
},
|
||||
UnableToGetDataFromAzure {
|
||||
source: azure_sdk_core::errors::AzureError,
|
||||
location: String,
|
||||
},
|
||||
UnableToDeleteDataFromAzure {
|
||||
source: azure_sdk_core::errors::AzureError,
|
||||
location: String,
|
||||
},
|
||||
UnableToListDataFromAzure {
|
||||
source: azure_sdk_core::errors::AzureError,
|
||||
},
|
||||
|
||||
#[snafu(display("Unable to create file {}: {}", path.display(), err))]
|
||||
UnableToCreateFile {
|
||||
err: io::Error,
|
||||
|
|
Loading…
Reference in New Issue