Merge branch 'main' into ntran/all_soft_deleted

pull/24376/head
kodiakhq[bot] 2021-09-29 13:39:53 +00:00 committed by GitHub
commit ac63f1c6cb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 397 additions and 662 deletions

1
Cargo.lock generated
View File

@ -2487,6 +2487,7 @@ dependencies = [
"futures-test", "futures-test",
"indexmap", "indexmap",
"itertools", "itertools",
"observability_deps",
"percent-encoding", "percent-encoding",
"reqwest", "reqwest",
"rusoto_core", "rusoto_core",

View File

@ -3,11 +3,9 @@ package influxdata.iox.management.v1;
option go_package = "github.com/influxdata/iox/management/v1"; option go_package = "github.com/influxdata/iox/management/v1";
import "google/longrunning/operations.proto"; import "google/longrunning/operations.proto";
import "google/protobuf/field_mask.proto";
import "google/protobuf/timestamp.proto"; import "google/protobuf/timestamp.proto";
import "influxdata/iox/management/v1/database_rules.proto"; import "influxdata/iox/management/v1/database_rules.proto";
import "influxdata/iox/management/v1/chunk.proto"; import "influxdata/iox/management/v1/chunk.proto";
import "influxdata/iox/management/v1/parse_delete.proto";
import "influxdata/iox/management/v1/partition.proto"; import "influxdata/iox/management/v1/partition.proto";
service ManagementService { service ManagementService {

View File

@ -22,17 +22,14 @@ use data_types::{
server_id::ServerId, server_id::ServerId,
DatabaseName, DatabaseName,
}; };
use futures::{ use futures::{stream::BoxStream, StreamExt, TryStreamExt};
stream::{self, BoxStream},
Stream, StreamExt, TryStreamExt,
};
use object_store::{ use object_store::{
path::{parsed::DirsAndFileName, ObjectStorePath, Path}, path::{parsed::DirsAndFileName, ObjectStorePath, Path},
ObjectStore, ObjectStoreApi, Result, ObjectStore, ObjectStoreApi, Result,
}; };
use observability_deps::tracing::warn; use observability_deps::tracing::warn;
use snafu::{ensure, OptionExt, ResultExt, Snafu}; use snafu::{ensure, OptionExt, ResultExt, Snafu};
use std::{collections::BTreeMap, io, sync::Arc}; use std::{collections::BTreeMap, sync::Arc};
use tokio::sync::mpsc::channel; use tokio::sync::mpsc::channel;
use tokio_stream::wrappers::ReceiverStream; use tokio_stream::wrappers::ReceiverStream;
@ -382,12 +379,7 @@ impl IoxObjectStore {
/// Write the file in the database directory that indicates this database is marked as deleted, /// Write the file in the database directory that indicates this database is marked as deleted,
/// without yet actually deleting this directory or any files it contains in object storage. /// without yet actually deleting this directory or any files it contains in object storage.
pub async fn write_tombstone(&self) -> Result<()> { pub async fn write_tombstone(&self) -> Result<()> {
let stream = stream::once(async move { Ok(Bytes::new()) }); self.inner.put(&self.tombstone_path(), Bytes::new()).await
let len = 0;
self.inner
.put(&self.tombstone_path(), stream, Some(len))
.await
} }
/// Remove the tombstone file to restore a database generation. Will return an error if this /// Remove the tombstone file to restore a database generation. Will return an error if this
@ -472,18 +464,14 @@ impl IoxObjectStore {
} }
/// Store the data for this parquet file in this database's object store. /// Store the data for this parquet file in this database's object store.
pub async fn put_catalog_transaction_file<S>( pub async fn put_catalog_transaction_file(
&self, &self,
location: &TransactionFilePath, location: &TransactionFilePath,
bytes: S, bytes: Bytes,
length: Option<usize>, ) -> Result<()> {
) -> Result<()>
where
S: Stream<Item = io::Result<Bytes>> + Send + Sync + 'static,
{
let full_path = self.transactions_path.join(location); let full_path = self.transactions_path.join(location);
self.inner.put(&full_path, bytes, length).await self.inner.put(&full_path, bytes).await
} }
/// Delete all catalog transaction files for this database. /// Delete all catalog transaction files for this database.
@ -538,18 +526,10 @@ impl IoxObjectStore {
} }
/// Store the data for this parquet file in this database's object store. /// Store the data for this parquet file in this database's object store.
pub async fn put_parquet_file<S>( pub async fn put_parquet_file(&self, location: &ParquetFilePath, bytes: Bytes) -> Result<()> {
&self,
location: &ParquetFilePath,
bytes: S,
length: Option<usize>,
) -> Result<()>
where
S: Stream<Item = io::Result<Bytes>> + Send + Sync + 'static,
{
let full_path = self.data_path.join(location); let full_path = self.data_path.join(location);
self.inner.put(&full_path, bytes, length).await self.inner.put(&full_path, bytes).await
} }
/// Remove the data for this parquet file from this database's object store /// Remove the data for this parquet file from this database's object store
@ -584,12 +564,7 @@ impl IoxObjectStore {
/// Store the data for the database rules /// Store the data for the database rules
pub async fn put_database_rules_file(&self, bytes: Bytes) -> Result<()> { pub async fn put_database_rules_file(&self, bytes: Bytes) -> Result<()> {
let len = bytes.len(); self.inner.put(&self.db_rules_path(), bytes).await
let stream = stream::once(async move { Ok(bytes) });
self.inner
.put(&self.db_rules_path(), stream, Some(len))
.await
} }
/// Delete the data for the database rules /// Delete the data for the database rules
@ -648,16 +623,8 @@ mod tests {
async fn add_file(object_store: &ObjectStore, location: &Path) { async fn add_file(object_store: &ObjectStore, location: &Path) {
let data = Bytes::from("arbitrary data"); let data = Bytes::from("arbitrary data");
let stream_data = std::io::Result::Ok(data.clone());
object_store object_store.put(location, data).await.unwrap();
.put(
location,
futures::stream::once(async move { stream_data }),
None,
)
.await
.unwrap();
} }
async fn parquet_files(iox_object_store: &IoxObjectStore) -> Vec<ParquetFilePath> { async fn parquet_files(iox_object_store: &IoxObjectStore) -> Vec<ParquetFilePath> {
@ -675,14 +642,9 @@ mod tests {
async fn add_parquet_file(iox_object_store: &IoxObjectStore, location: &ParquetFilePath) { async fn add_parquet_file(iox_object_store: &IoxObjectStore, location: &ParquetFilePath) {
let data = Bytes::from("arbitrary data"); let data = Bytes::from("arbitrary data");
let stream_data = std::io::Result::Ok(data.clone());
iox_object_store iox_object_store
.put_parquet_file( .put_parquet_file(location, data)
location,
futures::stream::once(async move { stream_data }),
None,
)
.await .await
.unwrap(); .unwrap();
} }
@ -778,14 +740,9 @@ mod tests {
location: &TransactionFilePath, location: &TransactionFilePath,
) { ) {
let data = Bytes::from("arbitrary data"); let data = Bytes::from("arbitrary data");
let stream_data = std::io::Result::Ok(data.clone());
iox_object_store iox_object_store
.put_catalog_transaction_file( .put_catalog_transaction_file(location, data)
location,
futures::stream::once(async move { stream_data }),
None,
)
.await .await
.unwrap(); .unwrap();
} }
@ -898,18 +855,16 @@ mod tests {
// GET // GET
let updated_file_content = Bytes::from("goodbye moon"); let updated_file_content = Bytes::from("goodbye moon");
let updated_file_stream = stream::once({ let expected_content = updated_file_content.clone();
let bytes = updated_file_content.clone();
async move { Ok(bytes) }
});
object_store object_store
.put(&rules_path, updated_file_stream, None) .put(&rules_path, updated_file_content)
.await .await
.unwrap(); .unwrap();
let actual_content = iox_object_store.get_database_rules_file().await.unwrap(); let actual_content = iox_object_store.get_database_rules_file().await.unwrap();
assert_eq!(updated_file_content, actual_content); assert_eq!(expected_content, actual_content);
// DELETE // DELETE
iox_object_store.delete_database_rules_file().await.unwrap(); iox_object_store.delete_database_rules_file().await.unwrap();
@ -1066,11 +1021,7 @@ mod tests {
not_rules_path.push_all_dirs(&[&server_id.to_string(), not_a_db.as_str(), "0"]); not_rules_path.push_all_dirs(&[&server_id.to_string(), not_a_db.as_str(), "0"]);
not_rules_path.set_file_name("not_rules.txt"); not_rules_path.set_file_name("not_rules.txt");
object_store object_store
.put( .put(&not_rules_path, Bytes::new())
&not_rules_path,
stream::once(async move { Ok(Bytes::new()) }),
None,
)
.await .await
.unwrap(); .unwrap();
@ -1080,11 +1031,7 @@ mod tests {
invalid_db_name_rules_path.push_all_dirs(&[&server_id.to_string(), &invalid_db_name, "0"]); invalid_db_name_rules_path.push_all_dirs(&[&server_id.to_string(), &invalid_db_name, "0"]);
invalid_db_name_rules_path.set_file_name("rules.pb"); invalid_db_name_rules_path.set_file_name("rules.pb");
object_store object_store
.put( .put(&invalid_db_name_rules_path, Bytes::new())
&invalid_db_name_rules_path,
stream::once(async move { Ok(Bytes::new()) }),
None,
)
.await .await
.unwrap(); .unwrap();
@ -1124,11 +1071,7 @@ mod tests {
not_rules_path.push_all_dirs(&[&server_id.to_string(), not_a_db.as_str(), "0"]); not_rules_path.push_all_dirs(&[&server_id.to_string(), not_a_db.as_str(), "0"]);
not_rules_path.set_file_name("not_rules.txt"); not_rules_path.set_file_name("not_rules.txt");
object_store object_store
.put( .put(&not_rules_path, Bytes::new())
&not_rules_path,
stream::once(async move { Ok(Bytes::new()) }),
None,
)
.await .await
.unwrap(); .unwrap();
@ -1138,11 +1081,7 @@ mod tests {
invalid_db_name_rules_path.push_all_dirs(&[&server_id.to_string(), &invalid_db_name, "0"]); invalid_db_name_rules_path.push_all_dirs(&[&server_id.to_string(), &invalid_db_name, "0"]);
invalid_db_name_rules_path.set_file_name("rules.pb"); invalid_db_name_rules_path.set_file_name("rules.pb");
object_store object_store
.put( .put(&invalid_db_name_rules_path, Bytes::new())
&invalid_db_name_rules_path,
stream::once(async move { Ok(Bytes::new()) }),
None,
)
.await .await
.unwrap(); .unwrap();
@ -1158,11 +1097,7 @@ mod tests {
]); ]);
no_generations_path.set_file_name("not_rules.txt"); no_generations_path.set_file_name("not_rules.txt");
object_store object_store
.put( .put(&no_generations_path, Bytes::new())
&no_generations_path,
stream::once(async move { Ok(Bytes::new()) }),
None,
)
.await .await
.unwrap(); .unwrap();
@ -1256,11 +1191,7 @@ mod tests {
not_rules_path.push_all_dirs(&[&server_id.to_string(), not_a_db.as_str(), "0"]); not_rules_path.push_all_dirs(&[&server_id.to_string(), not_a_db.as_str(), "0"]);
not_rules_path.set_file_name("not_rules.txt"); not_rules_path.set_file_name("not_rules.txt");
object_store object_store
.put( .put(&not_rules_path, Bytes::new())
&not_rules_path,
stream::once(async move { Ok(Bytes::new()) }),
None,
)
.await .await
.unwrap(); .unwrap();
@ -1270,11 +1201,7 @@ mod tests {
invalid_db_name_rules_path.push_all_dirs(&[&server_id.to_string(), &invalid_db_name, "0"]); invalid_db_name_rules_path.push_all_dirs(&[&server_id.to_string(), &invalid_db_name, "0"]);
invalid_db_name_rules_path.set_file_name("rules.pb"); invalid_db_name_rules_path.set_file_name("rules.pb");
object_store object_store
.put( .put(&invalid_db_name_rules_path, Bytes::new())
&invalid_db_name_rules_path,
stream::once(async move { Ok(Bytes::new()) }),
None,
)
.await .await
.unwrap(); .unwrap();
@ -1289,11 +1216,7 @@ mod tests {
]); ]);
no_generations_path.set_file_name("not_rules.txt"); no_generations_path.set_file_name("not_rules.txt");
object_store object_store
.put( .put(&no_generations_path, Bytes::new())
&no_generations_path,
stream::once(async move { Ok(Bytes::new()) }),
None,
)
.await .await
.unwrap(); .unwrap();

View File

@ -18,6 +18,7 @@ futures = "0.3"
# https://github.com/tkaitchuck/aHash/issues/95 # https://github.com/tkaitchuck/aHash/issues/95
indexmap = { version = "~1.6.2", optional = true } indexmap = { version = "~1.6.2", optional = true }
itertools = "0.10.1" itertools = "0.10.1"
observability_deps = { path = "../observability_deps" }
percent-encoding = "2.1" percent-encoding = "2.1"
# rusoto crates are for Amazon S3 integration # rusoto crates are for Amazon S3 integration
rusoto_core = { version = "0.47.0", optional = true} rusoto_core = { version = "0.47.0", optional = true}

View File

@ -1,7 +1,6 @@
//! This module contains the IOx implementation for using S3 as the object //! This module contains the IOx implementation for using S3 as the object
//! store. //! store.
use crate::{ use crate::{
buffer::slurp_stream_tempfile,
path::{cloud::CloudPath, DELIMITER}, path::{cloud::CloudPath, DELIMITER},
ListResult, ObjectMeta, ObjectStoreApi, ListResult, ObjectMeta, ObjectStoreApi,
}; };
@ -10,18 +9,21 @@ use bytes::Bytes;
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use futures::{ use futures::{
stream::{self, BoxStream}, stream::{self, BoxStream},
Stream, StreamExt, TryStreamExt, Future, StreamExt, TryStreamExt,
}; };
use observability_deps::tracing::{debug, warn};
use rusoto_core::ByteStream; use rusoto_core::ByteStream;
use rusoto_credential::{InstanceMetadataProvider, StaticProvider}; use rusoto_credential::{InstanceMetadataProvider, StaticProvider};
use rusoto_s3::S3; use rusoto_s3::S3;
use snafu::{OptionExt, ResultExt, Snafu}; use snafu::{OptionExt, ResultExt, Snafu};
use std::convert::TryFrom; use std::{convert::TryFrom, fmt, time::Duration};
use std::{fmt, io};
/// A specialized `Result` for object store-related errors /// A specialized `Result` for object store-related errors
pub type Result<T, E = Error> = std::result::Result<T, E>; pub type Result<T, E = Error> = std::result::Result<T, E>;
/// The maximum number of times a request will be retried in the case of an AWS server error
pub const MAX_NUM_RETRIES: u32 = 3;
/// A specialized `Error` for object store-related errors /// A specialized `Error` for object store-related errors
#[derive(Debug, Snafu)] #[derive(Debug, Snafu)]
#[allow(missing_docs)] #[allow(missing_docs)]
@ -140,35 +142,38 @@ impl ObjectStoreApi for AmazonS3 {
CloudPath::default() CloudPath::default()
} }
async fn put<S>(&self, location: &Self::Path, bytes: S, length: Option<usize>) -> Result<()> async fn put(&self, location: &Self::Path, bytes: Bytes) -> Result<()> {
where let bucket_name = self.bucket_name.clone();
S: Stream<Item = io::Result<Bytes>> + Send + Sync + 'static, let key = location.to_raw();
{ let request_factory = move || {
let bytes = match length { let bytes = bytes.clone();
Some(length) => ByteStream::new_with_size(bytes, length),
None => { let length = bytes.len();
let bytes = slurp_stream_tempfile(bytes) let stream_data = std::io::Result::Ok(bytes);
.await let stream = futures::stream::once(async move { stream_data });
.context(UnableToBufferStream)?; let byte_stream = ByteStream::new_with_size(stream, length);
let length = bytes.size();
ByteStream::new_with_size(bytes, length) rusoto_s3::PutObjectRequest {
bucket: bucket_name.clone(),
key: key.clone(),
body: Some(byte_stream),
..Default::default()
} }
}; };
let put_request = rusoto_s3::PutObjectRequest { let s3 = self.client.clone();
bucket: self.bucket_name.clone(),
key: location.to_raw(),
body: Some(bytes),
..Default::default()
};
self.client s3_request(move || {
.put_object(put_request) let (s3, request_factory) = (s3.clone(), request_factory.clone());
async move { s3.put_object(request_factory()).await }
})
.await .await
.context(UnableToPutData { .context(UnableToPutData {
bucket: &self.bucket_name, bucket: &self.bucket_name,
location: location.to_raw(), location: location.to_raw(),
})?; })?;
Ok(()) Ok(())
} }
@ -204,19 +209,27 @@ impl ObjectStoreApi for AmazonS3 {
async fn delete(&self, location: &Self::Path) -> Result<()> { async fn delete(&self, location: &Self::Path) -> Result<()> {
let key = location.to_raw(); let key = location.to_raw();
let delete_request = rusoto_s3::DeleteObjectRequest { let bucket_name = self.bucket_name.clone();
bucket: self.bucket_name.clone(),
let request_factory = move || rusoto_s3::DeleteObjectRequest {
bucket: bucket_name.clone(),
key: key.clone(), key: key.clone(),
..Default::default() ..Default::default()
}; };
self.client let s3 = self.client.clone();
.delete_object(delete_request)
s3_request(move || {
let (s3, request_factory) = (s3.clone(), request_factory.clone());
async move { s3.delete_object(request_factory()).await }
})
.await .await
.context(UnableToDeleteData { .context(UnableToDeleteData {
bucket: self.bucket_name.to_owned(), bucket: &self.bucket_name,
location: key, location: location.to_raw(),
})?; })?;
Ok(()) Ok(())
} }
@ -224,68 +237,78 @@ impl ObjectStoreApi for AmazonS3 {
&'a self, &'a self,
prefix: Option<&'a Self::Path>, prefix: Option<&'a Self::Path>,
) -> Result<BoxStream<'a, Result<Vec<Self::Path>>>> { ) -> Result<BoxStream<'a, Result<Vec<Self::Path>>>> {
#[derive(Clone)] Ok(self
enum ListState { .list_objects_v2(prefix, None)
Start, .await?
HasMore(String), .map_ok(|list_objects_v2_result| {
Done, let contents = list_objects_v2_result.contents.unwrap_or_default();
}
use ListState::*;
Ok(stream::unfold(ListState::Start, move |state| async move {
let mut list_request = rusoto_s3::ListObjectsV2Request {
bucket: self.bucket_name.clone(),
prefix: prefix.map(|p| p.to_raw()),
..Default::default()
};
match state.clone() {
HasMore(continuation_token) => {
list_request.continuation_token = Some(continuation_token);
}
Done => {
return None;
}
// If this is the first request we've made, we don't need to make any modifications
// to the request
Start => {}
}
let resp = self
.client
.list_objects_v2(list_request)
.await
.context(UnableToListData {
bucket: &self.bucket_name,
});
let resp = match resp {
Ok(resp) => resp,
Err(e) => return Some((Err(e), state)),
};
let contents = resp.contents.unwrap_or_default();
let names = contents let names = contents
.into_iter() .into_iter()
.flat_map(|object| object.key.map(CloudPath::raw)) .flat_map(|object| object.key.map(CloudPath::raw))
.collect(); .collect();
// The AWS response contains a field named `is_truncated` as well as names
// `next_continuation_token`, and we're assuming that `next_continuation_token`
// is only set when `is_truncated` is true (and therefore not
// checking `is_truncated`).
let next_state = if let Some(next_continuation_token) = resp.next_continuation_token {
ListState::HasMore(next_continuation_token)
} else {
ListState::Done
};
Some((Ok(names), next_state))
}) })
.boxed()) .boxed())
} }
async fn list_with_delimiter(&self, prefix: &Self::Path) -> Result<ListResult<Self::Path>> { async fn list_with_delimiter(&self, prefix: &Self::Path) -> Result<ListResult<Self::Path>> {
self.list_with_delimiter_and_token(prefix, &None).await Ok(self
.list_objects_v2(Some(prefix), Some(DELIMITER.to_string()))
.await?
.try_fold(
ListResult {
next_token: None,
common_prefixes: vec![],
objects: vec![],
},
|acc, list_objects_v2_result| async move {
let mut res = acc;
let contents = list_objects_v2_result.contents.unwrap_or_default();
let mut objects = contents
.into_iter()
.map(|object| {
let location = CloudPath::raw(
object.key.expect("object doesn't exist without a key"),
);
let last_modified = match object.last_modified {
Some(lm) => DateTime::parse_from_rfc3339(&lm)
.context(UnableToParseLastModified {
bucket: &self.bucket_name,
})?
.with_timezone(&Utc),
None => Utc::now(),
};
let size = usize::try_from(object.size.unwrap_or(0))
.expect("unsupported size on this platform");
Ok(ObjectMeta {
location,
last_modified,
size,
})
})
.collect::<Result<Vec<_>>>()?;
res.objects.append(&mut objects);
res.common_prefixes.extend(
list_objects_v2_result
.common_prefixes
.unwrap_or_default()
.into_iter()
.map(|p| {
CloudPath::raw(
p.prefix.expect("can't have a prefix without a value"),
)
}),
);
Ok(res)
},
)
.await?)
} }
} }
@ -355,75 +378,142 @@ pub(crate) fn new_failing_s3() -> Result<AmazonS3> {
} }
impl AmazonS3 { impl AmazonS3 {
/// List objects with the given prefix and a set delimiter of `/`. Returns async fn list_objects_v2(
/// common prefixes (directories) in addition to object metadata. Optionally &self,
/// takes a continuation token for paging. prefix: Option<&CloudPath>,
pub async fn list_with_delimiter_and_token<'a>( delimiter: Option<String>,
&'a self, ) -> Result<BoxStream<'_, Result<rusoto_s3::ListObjectsV2Output>>> {
prefix: &'a CloudPath, #[derive(Clone)]
next_token: &Option<String>, enum ListState {
) -> Result<ListResult<CloudPath>> { Start,
let converted_prefix = prefix.to_raw(); HasMore(String),
Done,
}
use ListState::*;
let mut list_request = rusoto_s3::ListObjectsV2Request { let raw_prefix = prefix.map(|p| p.to_raw());
bucket: self.bucket_name.clone(), let bucket = self.bucket_name.clone();
prefix: Some(converted_prefix),
delimiter: Some(DELIMITER.to_string()), let request_factory = move || rusoto_s3::ListObjectsV2Request {
bucket,
prefix: raw_prefix.clone(),
delimiter: delimiter.clone(),
..Default::default() ..Default::default()
}; };
if let Some(t) = next_token { Ok(stream::unfold(ListState::Start, move |state| {
list_request.continuation_token = Some(t.clone()); let request_factory = request_factory.clone();
let s3 = self.client.clone();
async move {
let continuation_token = match state.clone() {
HasMore(continuation_token) => Some(continuation_token),
Done => {
return None;
} }
// If this is the first request we've made, we don't need to make any
// modifications to the request
Start => None,
};
let resp = self let resp = s3_request(move || {
.client let (s3, request_factory, continuation_token) = (
.list_objects_v2(list_request) s3.clone(),
request_factory.clone(),
continuation_token.clone(),
);
async move {
s3.list_objects_v2(rusoto_s3::ListObjectsV2Request {
continuation_token,
..request_factory()
})
.await .await
.context(UnableToListData { }
bucket: &self.bucket_name,
})?;
let contents = resp.contents.unwrap_or_default();
let objects = contents
.into_iter()
.map(|object| {
let location =
CloudPath::raw(object.key.expect("object doesn't exist without a key"));
let last_modified = match object.last_modified {
Some(lm) => DateTime::parse_from_rfc3339(&lm)
.context(UnableToParseLastModified {
bucket: &self.bucket_name,
})?
.with_timezone(&Utc),
None => Utc::now(),
};
let size = usize::try_from(object.size.unwrap_or(0))
.expect("unsupported size on this platform");
Ok(ObjectMeta {
location,
last_modified,
size,
}) })
}) .await;
.collect::<Result<Vec<_>>>()?;
let common_prefixes = resp let resp = match resp {
.common_prefixes Ok(resp) => resp,
.unwrap_or_default() Err(e) => return Some((Err(e), state)),
.into_iter()
.map(|p| CloudPath::raw(p.prefix.expect("can't have a prefix without a value")))
.collect();
let result = ListResult {
objects,
common_prefixes,
next_token: resp.next_continuation_token,
}; };
Ok(result) // The AWS response contains a field named `is_truncated` as well as
// `next_continuation_token`, and we're assuming that `next_continuation_token`
// is only set when `is_truncated` is true (and therefore not
// checking `is_truncated`).
let next_state =
if let Some(next_continuation_token) = &resp.next_continuation_token {
ListState::HasMore(next_continuation_token.to_string())
} else {
ListState::Done
};
Some((Ok(resp), next_state))
}
})
.map_err(move |e| Error::UnableToListData {
source: e,
bucket: self.bucket_name.clone(),
})
.boxed())
}
}
/// Handles retrying a request to S3 up to `MAX_NUM_RETRIES` times if S3 returns 5xx server errors.
///
/// The `future_factory` argument is a function `F` that takes no arguments and, when called, will
/// return a `Future` (type `G`) that, when `await`ed, will perform a request to S3 through
/// `rusoto` and return a `Result` that returns some type `R` on success and some
/// `rusoto_core::RusotoError<E>` on error.
///
/// If the executed `Future` returns success, this function will return that success.
/// If the executed `Future` returns a 5xx server error, this function will wait an amount of
/// time that increases exponentially with the number of times it has retried, get a new `Future` by
/// calling `future_factory` again, and retry the request by `await`ing the `Future` again.
/// The retries will continue until the maximum number of retries has been attempted. In that case,
/// this function will return the last encountered error.
///
/// Client errors (4xx) will never be retried by this function.
async fn s3_request<E, F, G, R>(future_factory: F) -> Result<R, rusoto_core::RusotoError<E>>
where
E: std::error::Error,
F: Fn() -> G,
G: Future<Output = Result<R, rusoto_core::RusotoError<E>>> + Send,
{
let mut attempts = 0;
loop {
let request = future_factory();
let result = request.await;
match result {
Ok(r) => return Ok(r),
Err(error) => {
attempts += 1;
let should_retry = matches!(
error,
rusoto_core::RusotoError::Unknown(ref response)
if response.status.is_server_error()
);
if attempts > MAX_NUM_RETRIES {
warn!(
?error,
attempts, "maximum number of retries exceeded for AWS S3 request"
);
return Err(error);
} else if !should_retry {
return Err(error);
} else {
debug!(?error, attempts, "retrying AWS S3 request");
let wait_time = Duration::from_millis(2u64.pow(attempts) * 50);
tokio::time::sleep(wait_time).await;
}
}
}
} }
} }
@ -694,16 +784,8 @@ mod tests {
let mut location = integration.new_path(); let mut location = integration.new_path();
location.set_file_name(NON_EXISTENT_NAME); location.set_file_name(NON_EXISTENT_NAME);
let data = Bytes::from("arbitrary data"); let data = Bytes::from("arbitrary data");
let stream_data = std::io::Result::Ok(data.clone());
let err = integration let err = integration.put(&location, data).await.unwrap_err();
.put(
&location,
futures::stream::once(async move { stream_data }),
Some(data.len()),
)
.await
.unwrap_err();
if let ObjectStoreError::AwsObjectStoreError { if let ObjectStoreError::AwsObjectStoreError {
source: source:
@ -740,16 +822,8 @@ mod tests {
let mut location = integration.new_path(); let mut location = integration.new_path();
location.set_file_name(NON_EXISTENT_NAME); location.set_file_name(NON_EXISTENT_NAME);
let data = Bytes::from("arbitrary data"); let data = Bytes::from("arbitrary data");
let stream_data = std::io::Result::Ok(data.clone());
let err = integration let err = integration.put(&location, data).await.unwrap_err();
.put(
&location,
futures::stream::once(async move { stream_data }),
Some(data.len()),
)
.await
.unwrap_err();
if let ObjectStoreError::AwsObjectStoreError { if let ObjectStoreError::AwsObjectStoreError {
source: source:

View File

@ -14,11 +14,10 @@ use azure_storage::{
use bytes::Bytes; use bytes::Bytes;
use futures::{ use futures::{
stream::{self, BoxStream}, stream::{self, BoxStream},
FutureExt, Stream, StreamExt, TryStreamExt, FutureExt, StreamExt,
}; };
use snafu::{ensure, ResultExt, Snafu}; use snafu::{ResultExt, Snafu};
use std::sync::Arc; use std::{convert::TryInto, sync::Arc};
use std::{convert::TryInto, io};
/// A specialized `Result` for Azure object store-related errors /// A specialized `Result` for Azure object store-related errors
pub type Result<T, E = Error> = std::result::Result<T, E>; pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -27,9 +26,6 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug, Snafu)] #[derive(Debug, Snafu)]
#[allow(missing_docs)] #[allow(missing_docs)]
pub enum Error { pub enum Error {
#[snafu(display("Expected streamed data to have length {}, got {}", expected, actual))]
DataDoesNotMatchLength { expected: usize, actual: usize },
#[snafu(display("Unable to DELETE data. Location: {}, Error: {}", location, source,))] #[snafu(display("Unable to DELETE data. Location: {}, Error: {}", location, source,))]
UnableToDeleteData { UnableToDeleteData {
source: Box<dyn std::error::Error + Send + Sync>, source: Box<dyn std::error::Error + Send + Sync>,
@ -70,30 +66,14 @@ impl ObjectStoreApi for MicrosoftAzure {
CloudPath::default() CloudPath::default()
} }
async fn put<S>(&self, location: &Self::Path, bytes: S, length: Option<usize>) -> Result<()> async fn put(&self, location: &Self::Path, bytes: Bytes) -> Result<()> {
where
S: Stream<Item = io::Result<Bytes>> + Send + Sync + 'static,
{
let location = location.to_raw(); let location = location.to_raw();
let temporary_non_streaming = bytes
.map_ok(|b| bytes::BytesMut::from(&b[..]))
.try_concat()
.await
.expect("Should have been able to collect streaming data");
if let Some(length) = length { let bytes = bytes::BytesMut::from(&*bytes);
ensure!(
temporary_non_streaming.len() == length,
DataDoesNotMatchLength {
actual: temporary_non_streaming.len(),
expected: length,
}
);
}
self.container_client self.container_client
.as_blob_client(&location) .as_blob_client(&location)
.put_block_blob(temporary_non_streaming) .put_block_blob(bytes)
.execute() .execute()
.await .await
.context(UnableToPutData { .context(UnableToPutData {

View File

@ -7,9 +7,9 @@ use async_trait::async_trait;
use bytes::Bytes; use bytes::Bytes;
use futures::{ use futures::{
stream::{self, BoxStream}, stream::{self, BoxStream},
Stream, StreamExt, TryStreamExt, StreamExt, TryStreamExt,
}; };
use snafu::{ensure, OptionExt, ResultExt, Snafu}; use snafu::{OptionExt, ResultExt, Snafu};
use std::sync::Arc; use std::sync::Arc;
use std::{collections::BTreeSet, convert::TryFrom, io, path::PathBuf}; use std::{collections::BTreeSet, convert::TryFrom, io, path::PathBuf};
use tokio::fs; use tokio::fs;
@ -23,9 +23,6 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug, Snafu)] #[derive(Debug, Snafu)]
#[allow(missing_docs)] #[allow(missing_docs)]
pub enum Error { pub enum Error {
#[snafu(display("Expected streamed data to have length {}, got {}", expected, actual))]
DataDoesNotMatchLength { expected: usize, actual: usize },
#[snafu(display("File size for {} did not fit in a usize: {}", path.display(), source))] #[snafu(display("File size for {} did not fit in a usize: {}", path.display(), source))]
FileSizeOverflowedUsize { FileSizeOverflowedUsize {
source: std::num::TryFromIntError, source: std::num::TryFromIntError,
@ -58,9 +55,6 @@ pub enum Error {
#[snafu(display("Unable to read data from file {}: {}", path.display(), source))] #[snafu(display("Unable to read data from file {}: {}", path.display(), source))]
UnableToReadBytes { source: io::Error, path: PathBuf }, UnableToReadBytes { source: io::Error, path: PathBuf },
#[snafu(display("Unable to stream data from the request into memory: {}", source))]
UnableToStreamDataIntoMemory { source: std::io::Error },
} }
/// Local filesystem storage suitable for testing or for opting out of using a /// Local filesystem storage suitable for testing or for opting out of using a
@ -79,25 +73,8 @@ impl ObjectStoreApi for File {
FilePath::default() FilePath::default()
} }
async fn put<S>(&self, location: &Self::Path, bytes: S, length: Option<usize>) -> Result<()> async fn put(&self, location: &Self::Path, bytes: Bytes) -> Result<()> {
where let content = bytes::BytesMut::from(&*bytes);
S: Stream<Item = io::Result<Bytes>> + Send + Sync + 'static,
{
let content = bytes
.map_ok(|b| bytes::BytesMut::from(&b[..]))
.try_concat()
.await
.context(UnableToStreamDataIntoMemory)?;
if let Some(length) = length {
ensure!(
content.len() == length,
DataDoesNotMatchLength {
actual: content.len(),
expected: length,
}
);
}
let path = self.path(location); let path = self.path(location);
@ -322,9 +299,8 @@ mod tests {
use crate::{ use crate::{
tests::{list_with_delimiter, put_get_delete_list}, tests::{list_with_delimiter, put_get_delete_list},
Error as ObjectStoreError, ObjectStore, ObjectStoreApi, ObjectStorePath, ObjectStore, ObjectStoreApi, ObjectStorePath,
}; };
use futures::stream;
use tempfile::TempDir; use tempfile::TempDir;
#[tokio::test] #[tokio::test]
@ -336,45 +312,18 @@ mod tests {
list_with_delimiter(&integration).await.unwrap(); list_with_delimiter(&integration).await.unwrap();
} }
#[tokio::test]
async fn length_mismatch_is_an_error() {
let root = TempDir::new().unwrap();
let integration = ObjectStore::new_file(root.path());
let bytes = stream::once(async { Ok(Bytes::from("hello world")) });
let mut location = integration.new_path();
location.set_file_name("junk");
let res = integration.put(&location, bytes, Some(0)).await;
assert!(matches!(
res.err().unwrap(),
ObjectStoreError::FileObjectStoreError {
source: Error::DataDoesNotMatchLength {
expected: 0,
actual: 11,
}
}
));
}
#[tokio::test] #[tokio::test]
async fn creates_dir_if_not_present() { async fn creates_dir_if_not_present() {
let root = TempDir::new().unwrap(); let root = TempDir::new().unwrap();
let integration = ObjectStore::new_file(root.path()); let integration = ObjectStore::new_file(root.path());
let data = Bytes::from("arbitrary data");
let mut location = integration.new_path(); let mut location = integration.new_path();
location.push_all_dirs(&["nested", "file", "test_file"]); location.push_all_dirs(&["nested", "file", "test_file"]);
let stream_data = std::io::Result::Ok(data.clone()); let data = Bytes::from("arbitrary data");
integration let expected_data = data.clone();
.put(
&location, integration.put(&location, data).await.unwrap();
futures::stream::once(async move { stream_data }),
Some(data.len()),
)
.await
.unwrap();
let read_data = integration let read_data = integration
.get(&location) .get(&location)
@ -384,7 +333,7 @@ mod tests {
.try_concat() .try_concat()
.await .await
.unwrap(); .unwrap();
assert_eq!(&*read_data, data); assert_eq!(&*read_data, expected_data);
} }
#[tokio::test] #[tokio::test]
@ -392,19 +341,13 @@ mod tests {
let root = TempDir::new().unwrap(); let root = TempDir::new().unwrap();
let integration = ObjectStore::new_file(root.path()); let integration = ObjectStore::new_file(root.path());
let data = Bytes::from("arbitrary data");
let stream_data = std::io::Result::Ok(data.clone());
let mut location = integration.new_path(); let mut location = integration.new_path();
location.set_file_name("some_file"); location.set_file_name("some_file");
integration
.put( let data = Bytes::from("arbitrary data");
&location, let expected_data = data.clone();
futures::stream::once(async move { stream_data }),
None, integration.put(&location, data).await.unwrap();
)
.await
.unwrap();
let read_data = integration let read_data = integration
.get(&location) .get(&location)
@ -414,7 +357,7 @@ mod tests {
.try_concat() .try_concat()
.await .await
.unwrap(); .unwrap();
assert_eq!(&*read_data, data); assert_eq!(&*read_data, expected_data);
} }
#[tokio::test] #[tokio::test]

View File

@ -1,6 +1,7 @@
//! Crate that mimics the interface of the the various object stores //! Crate that mimics the interface of the the various object stores
//! but does nothing if they are not enabled. //! but does nothing if they are not enabled.
use async_trait::async_trait; use async_trait::async_trait;
use bytes::Bytes;
use snafu::Snafu; use snafu::Snafu;
use crate::{path::cloud::CloudPath, ObjectStoreApi}; use crate::{path::cloud::CloudPath, ObjectStoreApi};
@ -42,15 +43,7 @@ impl ObjectStoreApi for DummyObjectStore {
CloudPath::default() CloudPath::default()
} }
async fn put<S>( async fn put(&self, _location: &Self::Path, _bytes: Bytes) -> crate::Result<(), Self::Error> {
&self,
_location: &Self::Path,
_bytes: S,
_length: Option<usize>,
) -> crate::Result<(), Self::Error>
where
S: futures::Stream<Item = std::io::Result<bytes::Bytes>> + Send + Sync + 'static,
{
NotSupported { name: &self.name }.fail() NotSupported { name: &self.name }.fail()
} }

View File

@ -7,9 +7,9 @@ use crate::{
use async_trait::async_trait; use async_trait::async_trait;
use bytes::Bytes; use bytes::Bytes;
use cloud_storage::Client; use cloud_storage::Client;
use futures::{stream::BoxStream, Stream, StreamExt, TryStreamExt}; use futures::{stream::BoxStream, StreamExt, TryStreamExt};
use snafu::{ensure, ResultExt, Snafu}; use snafu::{ResultExt, Snafu};
use std::{convert::TryFrom, env, io}; use std::{convert::TryFrom, env};
/// A specialized `Result` for Google Cloud Storage object store-related errors /// A specialized `Result` for Google Cloud Storage object store-related errors
pub type Result<T, E = Error> = std::result::Result<T, E>; pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -86,27 +86,7 @@ impl ObjectStoreApi for GoogleCloudStorage {
CloudPath::default() CloudPath::default()
} }
async fn put<S>(&self, location: &Self::Path, bytes: S, length: Option<usize>) -> Result<()> async fn put(&self, location: &Self::Path, bytes: Bytes) -> Result<()> {
where
S: Stream<Item = io::Result<Bytes>> + Send + Sync + 'static,
{
let temporary_non_streaming = bytes
.map_ok(|b| bytes::BytesMut::from(&b[..]))
.try_concat()
.await
.expect("Should have been able to collect streaming data")
.to_vec();
if let Some(length) = length {
ensure!(
temporary_non_streaming.len() == length,
DataDoesNotMatchLength {
actual: temporary_non_streaming.len(),
expected: length,
}
);
}
let location = location.to_raw(); let location = location.to_raw();
let location_copy = location.clone(); let location_copy = location.clone();
let bucket_name = self.bucket_name.clone(); let bucket_name = self.bucket_name.clone();
@ -115,7 +95,7 @@ impl ObjectStoreApi for GoogleCloudStorage {
.object() .object()
.create( .create(
&bucket_name, &bucket_name,
temporary_non_streaming, bytes.to_vec(),
&location_copy, &location_copy,
"application/octet-stream", "application/octet-stream",
) )
@ -362,7 +342,11 @@ mod test {
}, },
}) = err.downcast_ref::<ObjectStoreError>() }) = err.downcast_ref::<ObjectStoreError>()
{ {
assert!(matches!(source, cloud_storage::Error::Reqwest(_))); assert!(
matches!(source, cloud_storage::Error::Other(_)),
"got: {:?}",
source
);
assert_eq!(bucket, &config.bucket); assert_eq!(bucket, &config.bucket);
assert_eq!(location, NON_EXISTENT_NAME); assert_eq!(location, NON_EXISTENT_NAME);
} else { } else {
@ -461,18 +445,9 @@ mod test {
let mut location = integration.new_path(); let mut location = integration.new_path();
location.set_file_name(NON_EXISTENT_NAME); location.set_file_name(NON_EXISTENT_NAME);
let data = Bytes::from("arbitrary data"); let data = Bytes::from("arbitrary data");
let stream_data = std::io::Result::Ok(data.clone());
let err = integration let err = integration.put(&location, data).await.unwrap_err();
.put(
&location,
futures::stream::once(async move { stream_data }),
Some(data.len()),
)
.await
.unwrap_err();
if let ObjectStoreError::GcsObjectStoreError { if let ObjectStoreError::GcsObjectStoreError {
source: source:

View File

@ -50,15 +50,16 @@ use throttle::ThrottledStore;
/// Publically expose throttling configuration /// Publically expose throttling configuration
pub use throttle::ThrottleConfig; pub use throttle::ThrottleConfig;
use crate::cache::{Cache, LocalFSCache}; use crate::{
use crate::path::Path; cache::{Cache, LocalFSCache},
path::Path,
};
use async_trait::async_trait; use async_trait::async_trait;
use bytes::Bytes; use bytes::Bytes;
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use futures::{stream::BoxStream, Stream, StreamExt, TryFutureExt, TryStreamExt}; use futures::{stream::BoxStream, StreamExt, TryFutureExt, TryStreamExt};
use snafu::{ResultExt, Snafu}; use snafu::{ResultExt, Snafu};
use std::sync::Arc; use std::{path::PathBuf, sync::Arc};
use std::{io, path::PathBuf};
/// Universal API to multiple object store services. /// Universal API to multiple object store services.
#[async_trait] #[async_trait]
@ -73,14 +74,7 @@ pub trait ObjectStoreApi: Send + Sync + 'static {
fn new_path(&self) -> Self::Path; fn new_path(&self) -> Self::Path;
/// Save the provided bytes to the specified location. /// Save the provided bytes to the specified location.
async fn put<S>( async fn put(&self, location: &Self::Path, bytes: Bytes) -> Result<(), Self::Error>;
&self,
location: &Self::Path,
bytes: S,
length: Option<usize>,
) -> Result<(), Self::Error>
where
S: Stream<Item = io::Result<Bytes>> + Send + Sync + 'static;
/// Return the bytes that are stored at the specified location. /// Return the bytes that are stored at the specified location.
async fn get( async fn get(
@ -239,31 +233,26 @@ impl ObjectStoreApi for ObjectStore {
} }
} }
async fn put<S>(&self, location: &Self::Path, bytes: S, length: Option<usize>) -> Result<()> async fn put(&self, location: &Self::Path, bytes: Bytes) -> Result<()> {
where
S: Stream<Item = io::Result<Bytes>> + Send + Sync + 'static,
{
use ObjectStoreIntegration::*; use ObjectStoreIntegration::*;
match (&self.integration, location) { match (&self.integration, location) {
(AmazonS3(s3), path::Path::AmazonS3(location)) => { (AmazonS3(s3), path::Path::AmazonS3(location)) => s3.put(location, bytes).await?,
s3.put(location, bytes, length).await?
}
(GoogleCloudStorage(gcs), path::Path::GoogleCloudStorage(location)) => gcs (GoogleCloudStorage(gcs), path::Path::GoogleCloudStorage(location)) => gcs
.put(location, bytes, length) .put(location, bytes)
.await .await
.context(GcsObjectStoreError)?, .context(GcsObjectStoreError)?,
(InMemory(in_mem), path::Path::InMemory(location)) => { (InMemory(in_mem), path::Path::InMemory(location)) => {
in_mem.put(location, bytes, length).await? in_mem.put(location, bytes).await?
} }
(InMemoryThrottled(in_mem_throttled), path::Path::InMemory(location)) => { (InMemoryThrottled(in_mem_throttled), path::Path::InMemory(location)) => {
in_mem_throttled.put(location, bytes, length).await? in_mem_throttled.put(location, bytes).await?
} }
(File(file), path::Path::File(location)) => file (File(file), path::Path::File(location)) => file
.put(location, bytes, length) .put(location, bytes)
.await .await
.context(FileObjectStoreError)?, .context(FileObjectStoreError)?,
(MicrosoftAzure(azure), path::Path::MicrosoftAzure(location)) => { (MicrosoftAzure(azure), path::Path::MicrosoftAzure(location)) => {
azure.put(location, bytes, length).await? azure.put(location, bytes).await?
} }
_ => unreachable!(), _ => unreachable!(),
} }
@ -677,19 +666,13 @@ mod tests {
content_list content_list
); );
let data = Bytes::from("arbitrary data");
let mut location = storage.new_path(); let mut location = storage.new_path();
location.push_dir("test_dir"); location.push_dir("test_dir");
location.set_file_name("test_file.json"); location.set_file_name("test_file.json");
let stream_data = std::io::Result::Ok(data.clone()); let data = Bytes::from("arbitrary data");
storage let expected_data = data.clone();
.put( storage.put(&location, data).await?;
&location,
futures::stream::once(async move { stream_data }),
Some(data.len()),
)
.await?;
// List everything // List everything
let content_list = flatten_list_stream(storage, None).await?; let content_list = flatten_list_stream(storage, None).await?;
@ -713,7 +696,7 @@ mod tests {
.map_ok(|b| bytes::BytesMut::from(&b[..])) .map_ok(|b| bytes::BytesMut::from(&b[..]))
.try_concat() .try_concat()
.await?; .await?;
assert_eq!(&*read_data, data); assert_eq!(&*read_data, expected_data);
storage.delete(&location).await?; storage.delete(&location).await?;
@ -748,15 +731,8 @@ mod tests {
.collect(); .collect();
for f in &files { for f in &files {
let stream_data = std::io::Result::Ok(data.clone()); let data = data.clone();
storage storage.put(f, data).await.unwrap();
.put(
f,
futures::stream::once(async move { stream_data }),
Some(data.len()),
)
.await
.unwrap();
} }
// ==================== check: prefix-list `mydb/wb` (directory) ==================== // ==================== check: prefix-list `mydb/wb` (directory) ====================

View File

@ -4,10 +4,10 @@ use crate::{path::parsed::DirsAndFileName, ListResult, ObjectMeta, ObjectStoreAp
use async_trait::async_trait; use async_trait::async_trait;
use bytes::Bytes; use bytes::Bytes;
use chrono::Utc; use chrono::Utc;
use futures::{stream::BoxStream, Stream, StreamExt, TryStreamExt}; use futures::{stream::BoxStream, StreamExt};
use snafu::{ensure, OptionExt, ResultExt, Snafu}; use snafu::{OptionExt, Snafu};
use std::collections::BTreeMap;
use std::collections::BTreeSet; use std::collections::BTreeSet;
use std::{collections::BTreeMap, io};
use tokio::sync::RwLock; use tokio::sync::RwLock;
/// A specialized `Result` for in-memory object store-related errors /// A specialized `Result` for in-memory object store-related errors
@ -17,12 +17,6 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug, Snafu)] #[derive(Debug, Snafu)]
#[allow(missing_docs)] #[allow(missing_docs)]
pub enum Error { pub enum Error {
#[snafu(display("Expected streamed data to have length {}, got {}", expected, actual))]
DataDoesNotMatchLength { expected: usize, actual: usize },
#[snafu(display("Unable to stream data from the request into memory: {}", source))]
UnableToStreamDataIntoMemory { source: std::io::Error },
#[snafu(display("No data in memory found. Location: {}", location))] #[snafu(display("No data in memory found. Location: {}", location))]
NoDataInMemory { location: String }, NoDataInMemory { location: String },
} }
@ -43,32 +37,11 @@ impl ObjectStoreApi for InMemory {
DirsAndFileName::default() DirsAndFileName::default()
} }
async fn put<S>(&self, location: &Self::Path, bytes: S, length: Option<usize>) -> Result<()> async fn put(&self, location: &Self::Path, bytes: Bytes) -> Result<()> {
where
S: Stream<Item = io::Result<Bytes>> + Send + Sync + 'static,
{
let content = bytes
.map_ok(|b| bytes::BytesMut::from(&b[..]))
.try_concat()
.await
.context(UnableToStreamDataIntoMemory)?;
if let Some(length) = length {
ensure!(
content.len() == length,
DataDoesNotMatchLength {
actual: content.len(),
expected: length,
}
);
}
let content = content.freeze();
self.storage self.storage
.write() .write()
.await .await
.insert(location.to_owned(), content); .insert(location.to_owned(), bytes);
Ok(()) Ok(())
} }
@ -177,9 +150,9 @@ mod tests {
use crate::{ use crate::{
tests::{list_with_delimiter, put_get_delete_list}, tests::{list_with_delimiter, put_get_delete_list},
Error as ObjectStoreError, ObjectStore, ObjectStoreApi, ObjectStorePath, ObjectStore, ObjectStoreApi, ObjectStorePath,
}; };
use futures::stream; use futures::TryStreamExt;
#[tokio::test] #[tokio::test]
async fn in_memory_test() { async fn in_memory_test() {
@ -189,43 +162,17 @@ mod tests {
list_with_delimiter(&integration).await.unwrap(); list_with_delimiter(&integration).await.unwrap();
} }
#[tokio::test]
async fn length_mismatch_is_an_error() {
let integration = ObjectStore::new_in_memory();
let bytes = stream::once(async { Ok(Bytes::from("hello world")) });
let mut location = integration.new_path();
location.set_file_name("junk");
let res = integration.put(&location, bytes, Some(0)).await;
assert!(matches!(
res.err().unwrap(),
ObjectStoreError::InMemoryObjectStoreError {
source: Error::DataDoesNotMatchLength {
expected: 0,
actual: 11,
}
}
));
}
#[tokio::test] #[tokio::test]
async fn unknown_length() { async fn unknown_length() {
let integration = ObjectStore::new_in_memory(); let integration = ObjectStore::new_in_memory();
let data = Bytes::from("arbitrary data");
let stream_data = std::io::Result::Ok(data.clone());
let mut location = integration.new_path(); let mut location = integration.new_path();
location.set_file_name("some_file"); location.set_file_name("some_file");
integration
.put( let data = Bytes::from("arbitrary data");
&location, let expected_data = data.clone();
futures::stream::once(async move { stream_data }),
None, integration.put(&location, data).await.unwrap();
)
.await
.unwrap();
let read_data = integration let read_data = integration
.get(&location) .get(&location)
@ -235,6 +182,6 @@ mod tests {
.try_concat() .try_concat()
.await .await
.unwrap(); .unwrap();
assert_eq!(&*read_data, data); assert_eq!(&*read_data, expected_data);
} }
} }

View File

@ -1,78 +1,75 @@
//! This module contains the IOx implementation for wrapping existing object store types into an artificial "sleep" wrapper. //! This module contains the IOx implementation for wrapping existing object store types into an artificial "sleep" wrapper.
use std::{convert::TryInto, io, sync::Arc}; use std::convert::TryInto;
use crate::{ListResult, ObjectStoreApi, Result}; use crate::{ListResult, ObjectStoreApi, Result};
use async_trait::async_trait; use async_trait::async_trait;
use bytes::Bytes; use bytes::Bytes;
use futures::{stream::BoxStream, Stream, StreamExt}; use futures::{stream::BoxStream, StreamExt};
use tokio::{ use tokio::time::{sleep, Duration};
sync::Mutex,
time::{sleep, Duration},
};
/// Configuration settings for throttled store /// Configuration settings for throttled store
#[derive(Debug, Default, Clone, Copy)] #[derive(Debug, Default, Clone, Copy)]
pub struct ThrottleConfig { pub struct ThrottleConfig {
/// Sleep duration for every call to [`delete`](ThrottledStore::delete). /// Sleep duration for every call to [`delete`](ThrottledStore::delete).
/// ///
/// Sleeping is done before the underlying store is called and independently of the success of the operation. /// Sleeping is done before the underlying store is called and independently of the success of
/// the operation.
pub wait_delete_per_call: Duration, pub wait_delete_per_call: Duration,
/// Sleep duration for every byte received during [`get`](ThrottledStore::get). /// Sleep duration for every byte received during [`get`](ThrottledStore::get).
/// ///
/// Sleeping is performed after the underlying store returned and only for successful gets. The sleep duration is /// Sleeping is performed after the underlying store returned and only for successful gets. The
/// additive to [`wait_get_per_call`](Self::wait_get_per_call). /// sleep duration is additive to [`wait_get_per_call`](Self::wait_get_per_call).
/// ///
/// Note that the per-byte sleep only happens as the user consumes the output bytes. Should there be an /// Note that the per-byte sleep only happens as the user consumes the output bytes. Should
/// intermediate failure (i.e. after partly consuming the output bytes), the resulting sleep time will be partial as well. /// there be an intermediate failure (i.e. after partly consuming the output bytes), the
/// resulting sleep time will be partial as well.
pub wait_get_per_byte: Duration, pub wait_get_per_byte: Duration,
/// Sleep duration for every call to [`get`](ThrottledStore::get). /// Sleep duration for every call to [`get`](ThrottledStore::get).
/// ///
/// Sleeping is done before the underlying store is called and independently of the success of the operation. The /// Sleeping is done before the underlying store is called and independently of the success of
/// sleep duration is additive to [`wait_get_per_byte`](Self::wait_get_per_byte). /// the operation. The sleep duration is additive to
/// [`wait_get_per_byte`](Self::wait_get_per_byte).
pub wait_get_per_call: Duration, pub wait_get_per_call: Duration,
/// Sleep duration for every call to [`list`](ThrottledStore::list). /// Sleep duration for every call to [`list`](ThrottledStore::list).
/// ///
/// Sleeping is done before the underlying store is called and independently of the success of the operation. The /// Sleeping is done before the underlying store is called and independently of the success of
/// sleep duration is additive to [`wait_list_per_entry`](Self::wait_list_per_entry). /// the operation. The sleep duration is additive to
/// [`wait_list_per_entry`](Self::wait_list_per_entry).
pub wait_list_per_call: Duration, pub wait_list_per_call: Duration,
/// Sleep duration for every entry received during [`list`](ThrottledStore::list). /// Sleep duration for every entry received during [`list`](ThrottledStore::list).
/// ///
/// Sleeping is performed after the underlying store returned and only for successful lists. The sleep duration is /// Sleeping is performed after the underlying store returned and only for successful lists.
/// additive to [`wait_list_per_call`](Self::wait_list_per_call). /// The sleep duration is additive to [`wait_list_per_call`](Self::wait_list_per_call).
/// ///
/// Note that the per-entry sleep only happens as the user consumes the output entries. Should there be an /// Note that the per-entry sleep only happens as the user consumes the output entries. Should
/// intermediate failure (i.e. after partly consuming the output entries), the resulting sleep time will be partial as well. /// there be an intermediate failure (i.e. after partly consuming the output entries), the
/// resulting sleep time will be partial as well.
pub wait_list_per_entry: Duration, pub wait_list_per_entry: Duration,
/// Sleep duration for every call to [`list_with_delimiter`](ThrottledStore::list_with_delimiter). /// Sleep duration for every call to
/// [`list_with_delimiter`](ThrottledStore::list_with_delimiter).
/// ///
/// Sleeping is done before the underlying store is called and independently of the success of the operation. The /// Sleeping is done before the underlying store is called and independently of the success of
/// sleep duration is additive to [`wait_list_with_delimiter_per_entry`](Self::wait_list_with_delimiter_per_entry). /// the operation. The sleep duration is additive to
/// [`wait_list_with_delimiter_per_entry`](Self::wait_list_with_delimiter_per_entry).
pub wait_list_with_delimiter_per_call: Duration, pub wait_list_with_delimiter_per_call: Duration,
/// Sleep duration for every entry received during [`list_with_delimiter`](ThrottledStore::list_with_delimiter). /// Sleep duration for every entry received during
/// [`list_with_delimiter`](ThrottledStore::list_with_delimiter).
/// ///
/// Sleeping is performed after the underlying store returned and only for successful gets. The sleep duration is /// Sleeping is performed after the underlying store returned and only for successful gets. The
/// additive to [`wait_list_with_delimiter_per_call`](Self::wait_list_with_delimiter_per_call). /// sleep duration is additive to
/// [`wait_list_with_delimiter_per_call`](Self::wait_list_with_delimiter_per_call).
pub wait_list_with_delimiter_per_entry: Duration, pub wait_list_with_delimiter_per_entry: Duration,
/// Sleep duration for every byte send during [`put`](ThrottledStore::put).
///
/// Sleeping is done before the underlying store is called and independently of the complete success of the operation. The
/// sleep duration is additive to [`wait_put_per_call`](Self::wait_put_per_call).
///
/// Note that the per-byte sleep only happens as the underlying store consumes the bytes. Should there be an
/// intermediate failure (i.e. after partly consuming the input bytes), the resulting sleep time will be partial as well.
pub wait_put_per_byte: Duration,
/// Sleep duration for every call to [`put`](ThrottledStore::put). /// Sleep duration for every call to [`put`](ThrottledStore::put).
/// ///
/// Sleeping is done before the underlying store is called and independently of the success of the operation. The /// Sleeping is done before the underlying store is called and independently of the success of
/// sleep duration is additive to [`wait_put_per_byte`](Self::wait_put_per_byte). /// the operation.
pub wait_put_per_call: Duration, pub wait_put_per_call: Duration,
} }
@ -80,7 +77,8 @@ pub struct ThrottleConfig {
/// ///
/// This can be used for performance testing. /// This can be used for performance testing.
/// ///
/// **Note that the behavior of the wrapper is deterministic and might not reflect real-world conditions!** /// **Note that the behavior of the wrapper is deterministic and might not reflect real-world
/// conditions!**
#[derive(Debug)] #[derive(Debug)]
pub struct ThrottledStore<T: ObjectStoreApi> { pub struct ThrottledStore<T: ObjectStoreApi> {
inner: T, inner: T,
@ -114,43 +112,10 @@ impl<T: ObjectStoreApi> ObjectStoreApi for ThrottledStore<T> {
self.inner.new_path() self.inner.new_path()
} }
async fn put<S>( async fn put(&self, location: &Self::Path, bytes: Bytes) -> Result<(), Self::Error> {
&self,
location: &Self::Path,
bytes: S,
length: Option<usize>,
) -> Result<(), Self::Error>
where
S: Stream<Item = io::Result<Bytes>> + Send + Sync + 'static,
{
sleep(self.config.wait_put_per_call).await; sleep(self.config.wait_put_per_call).await;
// need to copy to avoid moving / referencing `self` self.inner.put(location, bytes).await
let wait_put_per_byte = self.config.wait_put_per_byte;
let length_remaining = Arc::new(Mutex::new(length));
let bytes = bytes.then(move |bytes_result| {
let length_remaining = Arc::clone(&length_remaining);
async move {
match bytes_result {
Ok(bytes) => {
let mut bytes_len = bytes.len();
let mut length_remaining_inner2 = length_remaining.lock().await;
if let Some(length) = length_remaining_inner2.as_mut() {
let length_new = length.saturating_sub(bytes_len);
bytes_len = bytes_len.min(*length);
*length = length_new;
};
sleep(wait_put_per_byte * usize_to_u32_saturate(bytes_len)).await;
Ok(bytes)
}
Err(err) => Err(err),
}
}
});
self.inner.put(location, bytes, length).await
} }
async fn get( async fn get(
@ -363,12 +328,7 @@ mod tests {
assert_bounds!(measure_put(&store, 10).await, 1); assert_bounds!(measure_put(&store, 10).await, 1);
store.config_mut().wait_put_per_call = ZERO; store.config_mut().wait_put_per_call = ZERO;
store.config_mut().wait_put_per_byte = WAIT_TIME; assert_bounds!(measure_put(&store, 0).await, 0);
assert_bounds!(measure_put(&store, 2).await, 2);
store.config_mut().wait_put_per_call = WAIT_TIME;
store.config_mut().wait_put_per_byte = WAIT_TIME;
assert_bounds!(measure_put(&store, 2).await, 3);
} }
async fn place_test_object( async fn place_test_object(
@ -379,10 +339,9 @@ mod tests {
path.set_file_name("foo"); path.set_file_name("foo");
if let Some(n_bytes) = n_bytes { if let Some(n_bytes) = n_bytes {
let data = std::iter::repeat(1u8).take(n_bytes).collect(); let data: Vec<_> = std::iter::repeat(1u8).take(n_bytes).collect();
let stream_data = std::io::Result::Ok(data); let bytes = Bytes::from(data);
let stream = futures::stream::once(async move { stream_data }); store.put(&path, bytes).await.unwrap();
store.put(&path, stream, None).await.unwrap();
} else { } else {
// ensure object is absent // ensure object is absent
store.delete(&path).await.unwrap(); store.delete(&path).await.unwrap();
@ -416,9 +375,7 @@ mod tests {
path.set_file_name(&i.to_string()); path.set_file_name(&i.to_string());
let data = Bytes::from("bar"); let data = Bytes::from("bar");
let stream_data = std::io::Result::Ok(data); store.put(&path, data).await.unwrap();
let stream = futures::stream::once(async move { stream_data });
store.put(&path, stream, None).await.unwrap();
} }
prefix prefix
@ -483,12 +440,11 @@ mod tests {
let mut path = store.new_path(); let mut path = store.new_path();
path.set_file_name("foo"); path.set_file_name("foo");
let data = std::iter::repeat(1u8).take(n_bytes).collect(); let data: Vec<_> = std::iter::repeat(1u8).take(n_bytes).collect();
let stream_data = std::io::Result::Ok(data); let bytes = Bytes::from(data);
let stream = futures::stream::once(async move { stream_data });
let t0 = Instant::now(); let t0 = Instant::now();
store.put(&path, stream, None).await.unwrap(); store.put(&path, bytes).await.unwrap();
t0.elapsed() t0.elapsed()
} }

View File

@ -1325,13 +1325,9 @@ mod tests {
let tkey = trace.tkeys[0]; let tkey = trace.tkeys[0];
let path = TransactionFilePath::new_transaction(tkey.revision_counter, tkey.uuid); let path = TransactionFilePath::new_transaction(tkey.revision_counter, tkey.uuid);
let data = Bytes::from("foo"); let data = Bytes::from("foo");
let len = data.len();
iox_object_store iox_object_store
.put_catalog_transaction_file( .put_catalog_transaction_file(&path, data)
&path,
futures::stream::once(async move { Ok(data) }),
Some(len),
)
.await .await
.unwrap(); .unwrap();
@ -2186,13 +2182,9 @@ mod tests {
let tkey = trace.tkeys[0]; let tkey = trace.tkeys[0];
let path = TransactionFilePath::new_transaction(tkey.revision_counter, tkey.uuid); let path = TransactionFilePath::new_transaction(tkey.revision_counter, tkey.uuid);
let data = Bytes::from("foo"); let data = Bytes::from("foo");
let len = data.len();
iox_object_store iox_object_store
.put_catalog_transaction_file( .put_catalog_transaction_file(&path, data)
&path,
futures::stream::once(async move { Ok(data) }),
Some(len),
)
.await .await
.unwrap(); .unwrap();

View File

@ -36,14 +36,9 @@ pub async fn store_transaction_proto(
let mut data = Vec::new(); let mut data = Vec::new();
proto.encode(&mut data).context(Serialization {})?; proto.encode(&mut data).context(Serialization {})?;
let data = Bytes::from(data); let data = Bytes::from(data);
let len = data.len();
iox_object_store iox_object_store
.put_catalog_transaction_file( .put_catalog_transaction_file(path, data)
path,
futures::stream::once(async move { Ok(data) }),
Some(len),
)
.await .await
.context(Write {})?; .context(Write {})?;

View File

@ -210,16 +210,10 @@ impl Storage {
/// Put the given vector of bytes to the specified location /// Put the given vector of bytes to the specified location
pub async fn to_object_store(&self, data: Vec<u8>, path: &ParquetFilePath) -> Result<()> { pub async fn to_object_store(&self, data: Vec<u8>, path: &ParquetFilePath) -> Result<()> {
let len = data.len();
let data = Bytes::from(data); let data = Bytes::from(data);
let stream_data = Result::Ok(data);
self.iox_object_store self.iox_object_store
.put_parquet_file( .put_parquet_file(path, data)
path,
futures::stream::once(async move { stream_data }),
Some(len),
)
.await .await
.context(WritingToObjectStore) .context(WritingToObjectStore)
} }

View File

@ -19,7 +19,7 @@ use datafusion::{
}, },
}; };
use internal_types::schema::{merge::SchemaMerger, sort::SortKey, Schema}; use internal_types::schema::{merge::SchemaMerger, sort::SortKey, Schema};
use observability_deps::tracing::{debug, info, trace}; use observability_deps::tracing::{debug, trace};
use predicate::predicate::{Predicate, PredicateBuilder}; use predicate::predicate::{Predicate, PredicateBuilder};
use crate::{ use crate::{
@ -823,14 +823,14 @@ impl<C: QueryChunk + 'static> Deduplicater<C> {
} else { } else {
// The chunk is sorted but not on different order with super sort key. // The chunk is sorted but not on different order with super sort key.
// Log it for investigating data set to improve performance further // Log it for investigating data set to improve performance further
info!(chunk_type=?chunk.chunk_type(), debug!(chunk_type=?chunk.chunk_type(),
chunk_ID=?chunk.id(), chunk_ID=?chunk.id(),
chunk_current_sort_order=?chunk_sort_key, chunk_current_sort_order=?chunk_sort_key,
chunk_super_sort_key=?output_sort_key, chunk_super_sort_key=?output_sort_key,
"Chunk will get resorted in build_sort_plan due to new cardinality rate between key columns"); "Chunk will get resorted in build_sort_plan due to new cardinality rate between key columns");
} }
} else { } else {
info!(chunk_type=?chunk.chunk_type(), debug!(chunk_type=?chunk.chunk_type(),
chunk_ID=?chunk.id(), chunk_ID=?chunk.id(),
"Chunk is not yet sorted and will get sorted in build_sort_plan"); "Chunk is not yet sorted and will get sorted in build_sort_plan");
} }
@ -844,7 +844,7 @@ impl<C: QueryChunk + 'static> Deduplicater<C> {
// Now get the key subset of the super key that includes the chunk's pk columns // Now get the key subset of the super key that includes the chunk's pk columns
let chunk_sort_key = output_sort_key.selected_sort_key(key_columns.clone()); let chunk_sort_key = output_sort_key.selected_sort_key(key_columns.clone());
info!(chunk_type=?chunk.chunk_type(), debug!(chunk_type=?chunk.chunk_type(),
chunk_ID=?chunk.id(), chunk_ID=?chunk.id(),
pk_columns=?key_columns, pk_columns=?key_columns,
sort_key=?chunk_sort_key, sort_key=?chunk_sort_key,

View File

@ -4087,15 +4087,8 @@ mod tests {
} }
async fn create_empty_file(iox_object_store: &IoxObjectStore, path: &ParquetFilePath) { async fn create_empty_file(iox_object_store: &IoxObjectStore, path: &ParquetFilePath) {
let data = Bytes::default();
let len = data.len();
iox_object_store iox_object_store
.put_parquet_file( .put_parquet_file(path, Bytes::new())
path,
futures::stream::once(async move { Ok(data) }),
Some(len),
)
.await .await
.unwrap(); .unwrap();
} }

View File

@ -1947,11 +1947,7 @@ mod tests {
fake_db_path.set_file_name("not-a-generation"); fake_db_path.set_file_name("not-a-generation");
application application
.object_store() .object_store()
.put( .put(&fake_db_path, Bytes::new())
&fake_db_path,
futures::stream::once(async move { Ok(Bytes::new()) }),
None,
)
.await .await
.unwrap(); .unwrap();

View File

@ -160,7 +160,6 @@ fn create_throttled_store() -> Arc<ObjectStore> {
// for upload/download: assume 1GByte/s // for upload/download: assume 1GByte/s
wait_get_per_byte: Duration::from_secs(1) / 1_000_000_000, wait_get_per_byte: Duration::from_secs(1) / 1_000_000_000,
wait_put_per_byte: Duration::from_secs(1) / 1_000_000_000,
}; };
Arc::new(ObjectStore::new_in_memory_throttled(config)) Arc::new(ObjectStore::new_in_memory_throttled(config))

View File

@ -218,7 +218,6 @@ impl TryFrom<&ObjectStoreConfig> for ObjectStore {
// for upload/download: assume 1GByte/s // for upload/download: assume 1GByte/s
wait_get_per_byte: Duration::from_secs(1) / 1_000_000_000, wait_get_per_byte: Duration::from_secs(1) / 1_000_000_000,
wait_put_per_byte: Duration::from_secs(1) / 1_000_000_000,
}; };
Ok(Self::new_in_memory_throttled(config)) Ok(Self::new_in_memory_throttled(config))