feat: add throttled store wrapper

pull/24376/head
Marco Neumann 2021-04-26 12:02:03 +02:00
parent 289332132c
commit 53e83f8385
2 changed files with 532 additions and 0 deletions

View File

@ -23,6 +23,7 @@ pub mod disk;
pub mod gcp;
pub mod memory;
pub mod path;
pub mod throttle;
use aws::AmazonS3;
use azure::MicrosoftAzure;
@ -30,6 +31,7 @@ use disk::File;
use gcp::GoogleCloudStorage;
use memory::InMemory;
use path::ObjectStorePath;
use throttle::ThrottledStore;
use async_trait::async_trait;
use bytes::Bytes;
@ -104,6 +106,11 @@ impl ObjectStore {
Self(ObjectStoreIntegration::InMemory(in_mem))
}
/// Configure throttled in-memory storage.
pub fn new_in_memory_throttled(in_mem_throttled: ThrottledStore<InMemory>) -> Self {
Self(ObjectStoreIntegration::InMemoryThrottled(in_mem_throttled))
}
/// Configure local file storage.
pub fn new_file(file: File) -> Self {
Self(ObjectStoreIntegration::File(file))
@ -126,6 +133,9 @@ impl ObjectStoreApi for ObjectStore {
AmazonS3(s3) => path::Path::AmazonS3(s3.new_path()),
GoogleCloudStorage(gcs) => path::Path::GoogleCloudStorage(gcs.new_path()),
InMemory(in_mem) => path::Path::InMemory(in_mem.new_path()),
InMemoryThrottled(in_mem_throttled) => {
path::Path::InMemory(in_mem_throttled.new_path())
}
File(file) => path::Path::File(file.new_path()),
MicrosoftAzure(azure) => path::Path::MicrosoftAzure(azure.new_path()),
}
@ -147,6 +157,9 @@ impl ObjectStoreApi for ObjectStore {
(InMemory(in_mem), path::Path::InMemory(location)) => {
in_mem.put(location, bytes, length).await?
}
(InMemoryThrottled(in_mem_throttled), path::Path::InMemory(location)) => {
in_mem_throttled.put(location, bytes, length).await?
}
(File(file), path::Path::File(location)) => file
.put(location, bytes, length)
.await
@ -172,6 +185,9 @@ impl ObjectStoreApi for ObjectStore {
(InMemory(in_mem), path::Path::InMemory(location)) => {
in_mem.get(location).await?.err_into().boxed()
}
(InMemoryThrottled(in_mem_throttled), path::Path::InMemory(location)) => {
in_mem_throttled.get(location).await?.err_into().boxed()
}
(File(file), path::Path::File(location)) => file
.get(location)
.await
@ -193,6 +209,9 @@ impl ObjectStoreApi for ObjectStore {
gcs.delete(location).await?
}
(InMemory(in_mem), path::Path::InMemory(location)) => in_mem.delete(location).await?,
(InMemoryThrottled(in_mem_throttled), path::Path::InMemory(location)) => {
in_mem_throttled.delete(location).await?
}
(File(file), path::Path::File(location)) => file.delete(location).await?,
(MicrosoftAzure(azure), path::Path::MicrosoftAzure(location)) => {
azure.delete(location).await?
@ -248,6 +267,21 @@ impl ObjectStoreApi for ObjectStore {
.err_into()
.boxed(),
(InMemoryThrottled(in_mem_throttled), Some(path::Path::InMemory(prefix))) => {
in_mem_throttled
.list(Some(prefix))
.await?
.map_ok(|s| s.into_iter().map(path::Path::InMemory).collect())
.err_into()
.boxed()
}
(InMemoryThrottled(in_mem_throttled), None) => in_mem_throttled
.list(None)
.await?
.map_ok(|s| s.into_iter().map(path::Path::InMemory).collect())
.err_into()
.boxed(),
(File(file), Some(path::Path::File(prefix))) => file
.list(Some(prefix))
.await?
@ -295,6 +329,11 @@ impl ObjectStoreApi for ObjectStore {
.map_ok(|list_result| list_result.map_paths(path::Path::InMemory))
.await
.context(InMemoryObjectStoreError),
(InMemoryThrottled(in_mem_throttled), path::Path::InMemory(prefix)) => in_mem_throttled
.list_with_delimiter(prefix)
.map_ok(|list_result| list_result.map_paths(path::Path::InMemory))
.await
.context(InMemoryObjectStoreError),
(File(file), path::Path::File(prefix)) => file
.list_with_delimiter(prefix)
.map_ok(|list_result| list_result.map_paths(path::Path::File))
@ -319,6 +358,8 @@ pub enum ObjectStoreIntegration {
AmazonS3(AmazonS3),
/// In memory storage for testing
InMemory(InMemory),
/// Throttled in memory storage for testing
InMemoryThrottled(ThrottledStore<InMemory>),
/// Local file system storage
File(File),
/// Microsoft Azure Blob storage

View File

@ -0,0 +1,491 @@
//! This module contains the IOx implementation for wrapping existing object store types into an artificial "sleep" wrapper.
use std::{convert::TryInto, io, sync::Arc};
use crate::{ListResult, ObjectStoreApi, Result};
use async_trait::async_trait;
use bytes::Bytes;
use futures::{stream::BoxStream, Stream, StreamExt};
use tokio::{
sync::Mutex,
time::{sleep, Duration},
};
/// Store wrapper that wraps an inner store with some `sleep` calls.
///
/// This can be used for performance testing.
///
/// **Note that the behavior of the wrapper is deterministic and might not reflect real-world conditions!**
#[derive(Debug)]
pub struct ThrottledStore<T: ObjectStoreApi> {
inner: T,
/// Sleep duration for every call to [`delete`](Self::delete).
///
/// Sleeping is done before the underlying store is called and independently of the success of the operation.
pub wait_delete_per_call: Duration,
/// Sleep duration for every byte received during [`get`](Self::get).
///
/// Sleeping is performed after the underlying store returned and only for successful gets. The sleep duration is
/// additive to [`wait_get_per_call`](Self::wait_get_per_call).
///
/// Note that the per-byte sleep only happens as the user consumes the output bytes. Should there be an
/// intermediate failure (i.e. after partly consuming the output bytes), the resulting sleep time will be partial as well.
pub wait_get_per_byte: Duration,
/// Sleep duration for every call to [`get`](Self::get).
///
/// Sleeping is done before the underlying store is called and independently of the success of the operation. The
/// sleep duration is additive to [`wait_get_per_byte`](Self::wait_get_per_byte).
pub wait_get_per_call: Duration,
/// Sleep duration for every call to [`list`](Self::list).
///
/// Sleeping is done before the underlying store is called and independently of the success of the operation. The
/// sleep duration is additive to [`wait_list_per_entry`](Self::wait_list_per_entry).
pub wait_list_per_call: Duration,
/// Sleep duration for every entry received during [`list`](Self::list).
///
/// Sleeping is performed after the underlying store returned and only for successful lists. The sleep duration is
/// additive to [`wait_list_per_call`](Self::wait_list_per_call).
///
/// Note that the per-entry sleep only happens as the user consumes the output entries. Should there be an
/// intermediate failure (i.e. after partly consuming the output entries), the resulting sleep time will be partial as well.
pub wait_list_per_entry: Duration,
/// Sleep duration for every call to [`list_with_delimiter`](Self::list_with_delimiter).
///
/// Sleeping is done before the underlying store is called and independently of the success of the operation. The
/// sleep duration is additive to [`wait_list_with_delimiter_per_entry`](Self::wait_list_with_delimiter_per_entry).
pub wait_list_with_delimiter_per_call: Duration,
/// Sleep duration for every entry received during [`list_with_delimiter`](Self::list_with_delimiter).
///
/// Sleeping is performed after the underlying store returned and only for successful gets. The sleep duration is
/// additive to [`wait_list_with_delimiter_per_call`](Self::wait_list_with_delimiter_per_call).
pub wait_list_with_delimiter_per_entry: Duration,
/// Sleep duration for every byte send during [`put`](Self::put).
///
/// Sleeping is done before the underlying store is called and independently of the complete success of the operation. The
/// sleep duration is additive to [`wait_put_per_call`](Self::wait_put_per_call).
///
/// Note that the per-byte sleep only happens as the underlying store consumes the bytes. Should there be an
/// intermediate failure (i.e. after partly consuming the input bytes), the resulting sleep time will be partial as well.
pub wait_put_per_byte: Duration,
/// Sleep duration for every call to [`put`](Self::put).
///
/// Sleeping is done before the underlying store is called and independently of the success of the operation. The
/// sleep duration is additive to [`wait_put_per_byte`](Self::wait_put_per_byte).
pub wait_put_per_call: Duration,
}
impl<T: ObjectStoreApi> ThrottledStore<T> {
/// Create new wrapper with zero waiting times.
pub fn new(inner: T) -> Self {
Self {
inner,
wait_delete_per_call: Duration::default(),
wait_get_per_byte: Duration::default(),
wait_get_per_call: Duration::default(),
wait_list_per_call: Duration::default(),
wait_list_per_entry: Duration::default(),
wait_list_with_delimiter_per_call: Duration::default(),
wait_list_with_delimiter_per_entry: Duration::default(),
wait_put_per_byte: Duration::default(),
wait_put_per_call: Duration::default(),
}
}
}
#[async_trait]
impl<T: ObjectStoreApi> ObjectStoreApi for ThrottledStore<T> {
type Path = T::Path;
type Error = T::Error;
fn new_path(&self) -> Self::Path {
self.inner.new_path()
}
async fn put<S>(
&self,
location: &Self::Path,
bytes: S,
length: Option<usize>,
) -> Result<(), Self::Error>
where
S: Stream<Item = io::Result<Bytes>> + Send + Sync + 'static,
{
sleep(self.wait_put_per_call).await;
// need to copy to avoid moving / referencing `self`
let wait_put_per_byte = self.wait_put_per_byte;
let length_remaining = Arc::new(Mutex::new(length));
let bytes = bytes.then(move |bytes_result| {
let length_remaining = Arc::clone(&length_remaining);
async move {
match bytes_result {
Ok(bytes) => {
let mut bytes_len = bytes.len();
let mut length_remaining_inner2 = length_remaining.lock().await;
if let Some(length) = length_remaining_inner2.as_mut() {
let length_new = length.saturating_sub(bytes_len);
bytes_len = bytes_len.min(*length);
*length = length_new;
};
sleep(wait_put_per_byte * usize_to_u32_saturate(bytes_len)).await;
Ok(bytes)
}
Err(err) => Err(err),
}
}
});
self.inner.put(location, bytes, length).await
}
async fn get(
&self,
location: &Self::Path,
) -> Result<BoxStream<'static, Result<Bytes, Self::Error>>, Self::Error> {
sleep(self.wait_get_per_call).await;
// need to copy to avoid moving / referencing `self`
let wait_get_per_byte = self.wait_get_per_byte;
self.inner.get(location).await.map(|stream| {
stream
.then(move |bytes_result| async move {
match bytes_result {
Ok(bytes) => {
let bytes_len: u32 = usize_to_u32_saturate(bytes.len());
sleep(wait_get_per_byte * bytes_len).await;
Ok(bytes)
}
Err(err) => Err(err),
}
})
.boxed()
})
}
async fn delete(&self, location: &Self::Path) -> Result<(), Self::Error> {
sleep(self.wait_delete_per_call).await;
self.inner.delete(location).await
}
async fn list<'a>(
&'a self,
prefix: Option<&'a Self::Path>,
) -> Result<BoxStream<'a, Result<Vec<Self::Path>, Self::Error>>, Self::Error> {
sleep(self.wait_list_per_call).await;
// need to copy to avoid moving / referencing `self`
let wait_list_per_entry = self.wait_list_per_entry;
self.inner.list(prefix).await.map(|stream| {
stream
.then(move |entries_result| async move {
match entries_result {
Ok(entries) => {
let entries_len = usize_to_u32_saturate(entries.len());
sleep(wait_list_per_entry * entries_len).await;
Ok(entries)
}
Err(err) => Err(err),
}
})
.boxed()
})
}
async fn list_with_delimiter(
&self,
prefix: &Self::Path,
) -> Result<ListResult<Self::Path>, Self::Error> {
sleep(self.wait_list_with_delimiter_per_call).await;
match self.inner.list_with_delimiter(prefix).await {
Ok(list_result) => {
let entries_len = usize_to_u32_saturate(list_result.objects.len());
sleep(self.wait_list_with_delimiter_per_entry * entries_len).await;
Ok(list_result)
}
Err(err) => Err(err),
}
}
}
/// Saturated `usize` to `u32` cast.
fn usize_to_u32_saturate(x: usize) -> u32 {
x.try_into().unwrap_or(u32::MAX)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
memory::InMemory,
path::ObjectStorePath,
tests::{list_with_delimiter, put_get_delete_list},
ObjectStore,
};
use bytes::Bytes;
use futures::TryStreamExt;
use tokio::time::Duration;
use tokio::time::Instant;
const WAIT_TIME: Duration = Duration::from_millis(100);
const ZERO: Duration = Duration::from_millis(0); // Duration::default isn't constant
macro_rules! assert_bounds {
($d:expr, $lower:expr) => {
assert_bounds!($d, $lower, $lower + 1);
};
($d:expr, $lower:expr, $upper:expr) => {
let d = $d;
let lower = $lower * WAIT_TIME;
let upper = $upper * WAIT_TIME;
assert!(d >= lower, "{:?} must be >= than {:?}", d, lower);
assert!(d < upper, "{:?} must be < than {:?}", d, upper);
};
}
#[tokio::test]
async fn throttle_test() {
let inner = InMemory::new();
let integration = ObjectStore::new_in_memory_throttled(ThrottledStore::new(inner));
put_get_delete_list(&integration).await.unwrap();
list_with_delimiter(&integration).await.unwrap();
}
#[tokio::test]
async fn delete_test() {
let inner = InMemory::new();
let mut store = ThrottledStore::new(inner);
assert_bounds!(measure_delete(&store, None).await, 0);
assert_bounds!(measure_delete(&store, Some(0)).await, 0);
assert_bounds!(measure_delete(&store, Some(10)).await, 0);
store.wait_delete_per_call = WAIT_TIME;
assert_bounds!(measure_delete(&store, None).await, 1);
assert_bounds!(measure_delete(&store, Some(0)).await, 1);
assert_bounds!(measure_delete(&store, Some(10)).await, 1);
}
#[tokio::test]
async fn get_test() {
let inner = InMemory::new();
let mut store = ThrottledStore::new(inner);
assert_bounds!(measure_get(&store, None).await, 0);
assert_bounds!(measure_get(&store, Some(0)).await, 0);
assert_bounds!(measure_get(&store, Some(10)).await, 0);
store.wait_get_per_call = WAIT_TIME;
assert_bounds!(measure_get(&store, None).await, 1);
assert_bounds!(measure_get(&store, Some(0)).await, 1);
assert_bounds!(measure_get(&store, Some(10)).await, 1);
store.wait_get_per_call = ZERO;
store.wait_get_per_byte = WAIT_TIME;
assert_bounds!(measure_get(&store, Some(2)).await, 2);
store.wait_get_per_call = WAIT_TIME;
store.wait_get_per_byte = WAIT_TIME;
assert_bounds!(measure_get(&store, Some(2)).await, 3);
}
#[tokio::test]
async fn list_test() {
let inner = InMemory::new();
let mut store = ThrottledStore::new(inner);
assert_bounds!(measure_list(&store, 0).await, 0);
assert_bounds!(measure_list(&store, 10).await, 0);
store.wait_list_per_call = WAIT_TIME;
assert_bounds!(measure_list(&store, 0).await, 1);
assert_bounds!(measure_list(&store, 10).await, 1);
store.wait_list_per_call = ZERO;
store.wait_list_per_entry = WAIT_TIME;
assert_bounds!(measure_list(&store, 2).await, 2);
store.wait_list_per_call = WAIT_TIME;
store.wait_list_per_entry = WAIT_TIME;
assert_bounds!(measure_list(&store, 2).await, 3);
}
#[tokio::test]
async fn list_with_delimiter_test() {
let inner = InMemory::new();
let mut store = ThrottledStore::new(inner);
assert_bounds!(measure_list_with_delimiter(&store, 0).await, 0);
assert_bounds!(measure_list_with_delimiter(&store, 10).await, 0);
store.wait_list_with_delimiter_per_call = WAIT_TIME;
assert_bounds!(measure_list_with_delimiter(&store, 0).await, 1);
assert_bounds!(measure_list_with_delimiter(&store, 10).await, 1);
store.wait_list_with_delimiter_per_call = ZERO;
store.wait_list_with_delimiter_per_entry = WAIT_TIME;
assert_bounds!(measure_list_with_delimiter(&store, 2).await, 2);
store.wait_list_with_delimiter_per_call = WAIT_TIME;
store.wait_list_with_delimiter_per_entry = WAIT_TIME;
assert_bounds!(measure_list_with_delimiter(&store, 2).await, 3);
}
#[tokio::test]
async fn put_test() {
let inner = InMemory::new();
let mut store = ThrottledStore::new(inner);
assert_bounds!(measure_put(&store, 0).await, 0);
assert_bounds!(measure_put(&store, 10).await, 0);
store.wait_put_per_call = WAIT_TIME;
assert_bounds!(measure_put(&store, 0).await, 1);
assert_bounds!(measure_put(&store, 10).await, 1);
store.wait_put_per_call = ZERO;
store.wait_put_per_byte = WAIT_TIME;
assert_bounds!(measure_put(&store, 2).await, 2);
store.wait_put_per_call = WAIT_TIME;
store.wait_put_per_byte = WAIT_TIME;
assert_bounds!(measure_put(&store, 2).await, 3);
}
async fn place_test_object(
store: &ThrottledStore<InMemory>,
n_bytes: Option<usize>,
) -> <ThrottledStore<InMemory> as ObjectStoreApi>::Path {
let mut path = store.new_path();
path.set_file_name("foo");
if let Some(n_bytes) = n_bytes {
let data = std::iter::repeat(1u8).take(n_bytes).collect();
let stream_data = std::io::Result::Ok(data);
let stream = futures::stream::once(async move { stream_data });
store.put(&path, stream, None).await.unwrap();
} else {
// ensure object is absent
store.delete(&path).await.unwrap();
}
path
}
async fn place_test_objects(
store: &ThrottledStore<InMemory>,
n_entries: usize,
) -> <ThrottledStore<InMemory> as ObjectStoreApi>::Path {
let mut prefix = store.new_path();
prefix.push_dir("foo");
// clean up store
for entry in store
.list(Some(&prefix))
.await
.unwrap()
.try_concat()
.await
.unwrap()
{
store.delete(&entry).await.unwrap();
}
// create new entries
for i in 0..n_entries {
let mut path = prefix.clone();
path.set_file_name(&i.to_string());
let data = Bytes::from("bar");
let stream_data = std::io::Result::Ok(data);
let stream = futures::stream::once(async move { stream_data });
store.put(&path, stream, None).await.unwrap();
}
prefix
}
async fn measure_delete(store: &ThrottledStore<InMemory>, n_bytes: Option<usize>) -> Duration {
let path = place_test_object(store, n_bytes).await;
let t0 = Instant::now();
store.delete(&path).await.unwrap();
t0.elapsed()
}
async fn measure_get(store: &ThrottledStore<InMemory>, n_bytes: Option<usize>) -> Duration {
let path = place_test_object(store, n_bytes).await;
let t0 = Instant::now();
let res = store.get(&path).await;
if n_bytes.is_some() {
// need to consume bytes to provoke sleep times
res.unwrap()
.map_ok(|b| bytes::BytesMut::from(&b[..]))
.try_concat()
.await
.unwrap();
} else {
assert!(res.is_err());
}
t0.elapsed()
}
async fn measure_list(store: &ThrottledStore<InMemory>, n_entries: usize) -> Duration {
let prefix = place_test_objects(&store, n_entries).await;
let t0 = Instant::now();
store
.list(Some(&prefix))
.await
.unwrap()
.try_concat()
.await
.unwrap();
t0.elapsed()
}
async fn measure_list_with_delimiter(
store: &ThrottledStore<InMemory>,
n_entries: usize,
) -> Duration {
let prefix = place_test_objects(&store, n_entries).await;
let t0 = Instant::now();
store.list_with_delimiter(&prefix).await.unwrap();
t0.elapsed()
}
async fn measure_put(store: &ThrottledStore<InMemory>, n_bytes: usize) -> Duration {
let mut path = store.new_path();
path.set_file_name("foo");
let data = std::iter::repeat(1u8).take(n_bytes).collect();
let stream_data = std::io::Result::Ok(data);
let stream = futures::stream::once(async move { stream_data });
let t0 = Instant::now();
store.put(&path, stream, None).await.unwrap();
t0.elapsed()
}
}