Merge pull request #144 from influxdata/rusoto-stream

pull/24376/head
Jake Goulding 2020-06-19 09:25:30 -04:00 committed by GitHub
commit 6a97995a19
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 79 additions and 29 deletions

16
Cargo.lock generated
View File

@ -2289,9 +2289,9 @@ checksum = "cabe4fa914dec5870285fa7f71f602645da47c486e68486d2b4ceb4a343e90ac"
[[package]]
name = "rusoto_core"
version = "0.43.0"
version = "0.44.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3a8d624cb48fcaca612329e4dd544380aa329ef338e83d3a90f5b7897e631971"
checksum = "841ca8f73e7498ba39146ab43acea906bbbb807d92ec0b7ea4b6293d2621f80d"
dependencies = [
"async-trait",
"base64 0.12.1",
@ -2318,9 +2318,9 @@ dependencies = [
[[package]]
name = "rusoto_credential"
version = "0.43.0"
version = "0.44.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba3e7cdf483d7198d9bca7414746d3ba656239e89e467b715d0571912f0b492f"
checksum = "60669ddc1bdbb83ce225593649d36b4c5f6bf9db47cc1ab3e81281abffc853f4"
dependencies = [
"async-trait",
"chrono",
@ -2338,9 +2338,9 @@ dependencies = [
[[package]]
name = "rusoto_s3"
version = "0.43.0"
version = "0.44.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b6bc3221ae5a2c036d5757eee68a2ffb6b7f87b8a83adbf4271c8133fdee01c"
checksum = "eebe039c71a8ab54ce6614e2967ef034bb3202f306e4025901f620cdfa5680c8"
dependencies = [
"async-trait",
"bytes",
@ -2351,9 +2351,9 @@ dependencies = [
[[package]]
name = "rusoto_signature"
version = "0.43.0"
version = "0.44.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "62940a2bd479900a1bf8935b8f254d3e19368ac3ac4570eb4bd48eb46551a1b7"
checksum = "9eddff187ac18c5a91d9ccda9353f30cf531620dce437c4db661dfe2e23b2029"
dependencies = [
"base64 0.12.1",
"bytes",

View File

@ -11,9 +11,9 @@ futures = "0.3.5"
snafu = { version = "0.6.6", features = ["futures"] }
# Amazon S3 integration
rusoto_core = "0.43.0"
rusoto_credential = "0.43.0"
rusoto_s3 = "0.43.0"
rusoto_core = "0.44.0"
rusoto_credential = "0.44.0"
rusoto_s3 = "0.44.0"
# Google Cloud Storage integration
cloud-storage = { version = "0.4.0" }

View File

@ -14,9 +14,10 @@
use bytes::Bytes;
use futures::{stream, Stream, StreamExt, TryStreamExt};
use rusoto_core::ByteStream;
use rusoto_credential::ChainProvider;
use rusoto_s3::S3;
use snafu::{futures::TryStreamExt as _, OptionExt, ResultExt, Snafu};
use snafu::{ensure, futures::TryStreamExt as _, OptionExt, ResultExt, Snafu};
use tokio::sync::RwLock;
use std::{collections::BTreeMap, fmt};
@ -42,14 +43,16 @@ impl ObjectStore {
}
/// Save the provided bytes to the specified location.
pub async fn put<S>(&self, location: &str, bytes: S) -> Result<()>
pub async fn put<S>(&self, location: &str, bytes: S, length: usize) -> Result<()>
where
S: Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
{
match &self.0 {
ObjectStoreIntegration::AmazonS3(s3) => s3.put(location, bytes).await?,
ObjectStoreIntegration::GoogleCloudStorage(gcs) => gcs.put(location, bytes).await?,
ObjectStoreIntegration::InMemory(in_mem) => in_mem.put(location, bytes).await?,
ObjectStoreIntegration::AmazonS3(s3) => s3.put(location, bytes, length).await?,
ObjectStoreIntegration::GoogleCloudStorage(gcs) => {
gcs.put(location, bytes, length).await?
}
ObjectStoreIntegration::InMemory(in_mem) => in_mem.put(location, bytes, length).await?,
}
Ok(())
@ -115,7 +118,7 @@ impl GoogleCloudStorage {
}
/// Save the provided bytes to the specified location.
async fn put<S>(&self, location: &str, bytes: S) -> InternalResult<()>
async fn put<S>(&self, location: &str, bytes: S, length: usize) -> InternalResult<()>
where
S: Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
{
@ -126,6 +129,14 @@ impl GoogleCloudStorage {
.expect("Should have been able to collect streaming data")
.to_vec();
ensure!(
temporary_non_streaming.len() == length,
DataDoesNotMatchLength {
actual: temporary_non_streaming.len(),
expected: length,
}
);
let location = location.to_string();
let bucket_name = self.bucket_name.clone();
@ -229,23 +240,16 @@ impl AmazonS3 {
}
/// Save the provided bytes to the specified location.
async fn put<S>(&self, location: &str, bytes: S) -> InternalResult<()>
async fn put<S>(&self, location: &str, bytes: S, length: usize) -> InternalResult<()>
where
S: Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
{
// Rusoto theoretically supports streaming, but won't actually until this is fixed:
// https://github.com/rusoto/rusoto/issues/1752
let temporary_non_streaming = bytes
.map_ok(|b| bytes::BytesMut::from(&b[..]))
.try_concat()
.await
.expect("Should have been able to collect streaming data")
.to_vec();
let bytes = ByteStream::new_with_size(bytes, length);
let put_request = rusoto_s3::PutObjectRequest {
bucket: self.bucket_name.clone(),
key: location.to_string(),
body: Some(temporary_non_streaming.into()),
body: Some(bytes),
..Default::default()
};
@ -349,7 +353,7 @@ impl InMemory {
}
/// Save the provided bytes to the specified location.
async fn put<S>(&self, location: &str, bytes: S) -> InternalResult<()>
async fn put<S>(&self, location: &str, bytes: S, length: usize) -> InternalResult<()>
where
S: Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
{
@ -358,6 +362,15 @@ impl InMemory {
.try_concat()
.await
.context(UnableToPutDataInMemory)?;
ensure!(
content.len() == length,
DataDoesNotMatchLength {
actual: content.len(),
expected: length,
}
);
let content = content.freeze();
self.storage
@ -442,6 +455,11 @@ impl Error {
#[derive(Debug, Snafu)]
enum InternalError {
DataDoesNotMatchLength {
expected: usize,
actual: usize,
},
UnableToPutDataToGcs {
source: tokio::task::JoinError,
},
@ -502,6 +520,16 @@ mod tests {
type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
type Result<T, E = Error> = std::result::Result<T, E>;
macro_rules! assert_error {
($res:expr, $error_pat:pat$(,)?) => {
assert!(
matches!($res, Err(super::Error($error_pat))),
"was: {:?}",
$res,
)
};
}
async fn flatten_list_stream(
storage: &ObjectStore,
prefix: Option<&str>,
@ -524,7 +552,11 @@ mod tests {
let stream_data = std::io::Result::Ok(data.clone());
storage
.put(location, futures::stream::once(async move { stream_data }))
.put(
location,
futures::stream::once(async move { stream_data }),
data.len(),
)
.await?;
// List everything
@ -636,5 +668,23 @@ mod tests {
put_get_delete_list(&integration).await?;
Ok(())
}
#[tokio::test]
async fn length_mismatch_is_an_error() -> Result<()> {
let integration = ObjectStore::new_in_memory(InMemory::new());
let bytes = stream::once(async { Ok(Bytes::from("hello world")) });
let res = integration.put("junk", bytes, 0).await;
assert_error!(
res,
InternalError::DataDoesNotMatchLength {
expected: 0,
actual: 11,
},
);
Ok(())
}
}
}