fix: Remove streaming API since we're not streaming anyway

pull/24376/head
Carol (Nichols || Goulding) 2021-09-27 12:49:20 -04:00
parent edd6c12e93
commit 92583aee82
16 changed files with 132 additions and 537 deletions

View File

@ -22,17 +22,14 @@ use data_types::{
server_id::ServerId,
DatabaseName,
};
use futures::{
stream::{self, BoxStream},
Stream, StreamExt, TryStreamExt,
};
use futures::{stream::BoxStream, StreamExt, TryStreamExt};
use object_store::{
path::{parsed::DirsAndFileName, ObjectStorePath, Path},
ObjectStore, ObjectStoreApi, Result,
};
use observability_deps::tracing::warn;
use snafu::{ensure, OptionExt, ResultExt, Snafu};
use std::{collections::BTreeMap, io, sync::Arc};
use std::{collections::BTreeMap, sync::Arc};
use tokio::sync::mpsc::channel;
use tokio_stream::wrappers::ReceiverStream;
@ -382,12 +379,7 @@ impl IoxObjectStore {
/// Write the file in the database directory that indicates this database is marked as deleted,
/// without yet actually deleting this directory or any files it contains in object storage.
pub async fn write_tombstone(&self) -> Result<()> {
let stream = move || stream::once(async move { Ok(Bytes::new()) });
let len = 0;
self.inner
.put(&self.tombstone_path(), stream, Some(len))
.await
self.inner.put(&self.tombstone_path(), Bytes::new()).await
}
/// Remove the tombstone file to restore a database generation. Will return an error if this
@ -472,19 +464,14 @@ impl IoxObjectStore {
}
/// Store the data for this parquet file in this database's object store.
pub async fn put_catalog_transaction_file<F, S>(
pub async fn put_catalog_transaction_file(
&self,
location: &TransactionFilePath,
bytes: F,
length: Option<usize>,
) -> Result<()>
where
F: Fn() -> S + Clone + Send + Sync + Unpin + 'static,
S: Stream<Item = io::Result<Bytes>> + Send + Sync + 'static,
{
bytes: Bytes,
) -> Result<()> {
let full_path = self.transactions_path.join(location);
self.inner.put(&full_path, bytes, length).await
self.inner.put(&full_path, bytes).await
}
/// Delete all catalog transaction files for this database.
@ -539,19 +526,10 @@ impl IoxObjectStore {
}
/// Store the data for this parquet file in this database's object store.
pub async fn put_parquet_file<F, S>(
&self,
location: &ParquetFilePath,
bytes: F,
length: Option<usize>,
) -> Result<()>
where
F: Fn() -> S + Clone + Send + Sync + Unpin + 'static,
S: Stream<Item = io::Result<Bytes>> + Send + Sync + 'static,
{
pub async fn put_parquet_file(&self, location: &ParquetFilePath, bytes: Bytes) -> Result<()> {
let full_path = self.data_path.join(location);
self.inner.put(&full_path, bytes, length).await
self.inner.put(&full_path, bytes).await
}
/// Remove the data for this parquet file from this database's object store
@ -586,15 +564,7 @@ impl IoxObjectStore {
/// Store the data for the database rules
pub async fn put_database_rules_file(&self, bytes: Bytes) -> Result<()> {
let len = bytes.len();
let stream = move || {
let bytes = bytes.clone();
stream::once(async move { Ok(bytes) })
};
self.inner
.put(&self.db_rules_path(), stream, Some(len))
.await
self.inner.put(&self.db_rules_path(), bytes).await
}
/// Delete the data for the database rules
@ -653,12 +623,8 @@ mod tests {
async fn add_file(object_store: &ObjectStore, location: &Path) {
let data = Bytes::from("arbitrary data");
let stream_fn = move || {
let stream_data = std::io::Result::Ok(data.clone());
futures::stream::once(async move { stream_data })
};
object_store.put(location, stream_fn, None).await.unwrap();
object_store.put(location, data).await.unwrap();
}
async fn parquet_files(iox_object_store: &IoxObjectStore) -> Vec<ParquetFilePath> {
@ -676,13 +642,9 @@ mod tests {
async fn add_parquet_file(iox_object_store: &IoxObjectStore, location: &ParquetFilePath) {
let data = Bytes::from("arbitrary data");
let stream_fn = move || {
let stream_data = std::io::Result::Ok(data.clone());
futures::stream::once(async move { stream_data })
};
iox_object_store
.put_parquet_file(location, stream_fn, None)
.put_parquet_file(location, data)
.await
.unwrap();
}
@ -778,13 +740,9 @@ mod tests {
location: &TransactionFilePath,
) {
let data = Bytes::from("arbitrary data");
let stream_fn = move || {
let stream_data = std::io::Result::Ok(data.clone());
futures::stream::once(async move { stream_data })
};
iox_object_store
.put_catalog_transaction_file(location, stream_fn, None)
.put_catalog_transaction_file(location, data)
.await
.unwrap();
}
@ -898,14 +856,9 @@ mod tests {
// GET
let updated_file_content = Bytes::from("goodbye moon");
let expected_content = updated_file_content.clone();
let updated_file_stream = move || {
stream::once({
let bytes = updated_file_content.clone();
async move { Ok(bytes) }
})
};
object_store
.put(&rules_path, updated_file_stream, None)
.put(&rules_path, updated_file_content)
.await
.unwrap();
@ -1068,11 +1021,7 @@ mod tests {
not_rules_path.push_all_dirs(&[&server_id.to_string(), not_a_db.as_str(), "0"]);
not_rules_path.set_file_name("not_rules.txt");
object_store
.put(
&not_rules_path,
move || stream::once(async move { Ok(Bytes::new()) }),
None,
)
.put(&not_rules_path, Bytes::new())
.await
.unwrap();
@ -1082,11 +1031,7 @@ mod tests {
invalid_db_name_rules_path.push_all_dirs(&[&server_id.to_string(), &invalid_db_name, "0"]);
invalid_db_name_rules_path.set_file_name("rules.pb");
object_store
.put(
&invalid_db_name_rules_path,
move || stream::once(async move { Ok(Bytes::new()) }),
None,
)
.put(&invalid_db_name_rules_path, Bytes::new())
.await
.unwrap();
@ -1126,11 +1071,7 @@ mod tests {
not_rules_path.push_all_dirs(&[&server_id.to_string(), not_a_db.as_str(), "0"]);
not_rules_path.set_file_name("not_rules.txt");
object_store
.put(
&not_rules_path,
move || stream::once(async move { Ok(Bytes::new()) }),
None,
)
.put(&not_rules_path, Bytes::new())
.await
.unwrap();
@ -1140,11 +1081,7 @@ mod tests {
invalid_db_name_rules_path.push_all_dirs(&[&server_id.to_string(), &invalid_db_name, "0"]);
invalid_db_name_rules_path.set_file_name("rules.pb");
object_store
.put(
&invalid_db_name_rules_path,
move || stream::once(async move { Ok(Bytes::new()) }),
None,
)
.put(&invalid_db_name_rules_path, Bytes::new())
.await
.unwrap();
@ -1160,11 +1097,7 @@ mod tests {
]);
no_generations_path.set_file_name("not_rules.txt");
object_store
.put(
&no_generations_path,
move || stream::once(async move { Ok(Bytes::new()) }),
None,
)
.put(&no_generations_path, Bytes::new())
.await
.unwrap();
@ -1258,11 +1191,7 @@ mod tests {
not_rules_path.push_all_dirs(&[&server_id.to_string(), not_a_db.as_str(), "0"]);
not_rules_path.set_file_name("not_rules.txt");
object_store
.put(
&not_rules_path,
move || stream::once(async move { Ok(Bytes::new()) }),
None,
)
.put(&not_rules_path, Bytes::new())
.await
.unwrap();
@ -1272,11 +1201,7 @@ mod tests {
invalid_db_name_rules_path.push_all_dirs(&[&server_id.to_string(), &invalid_db_name, "0"]);
invalid_db_name_rules_path.set_file_name("rules.pb");
object_store
.put(
&invalid_db_name_rules_path,
move || stream::once(async move { Ok(Bytes::new()) }),
None,
)
.put(&invalid_db_name_rules_path, Bytes::new())
.await
.unwrap();
@ -1291,11 +1216,7 @@ mod tests {
]);
no_generations_path.set_file_name("not_rules.txt");
object_store
.put(
&no_generations_path,
move || stream::once(async move { Ok(Bytes::new()) }),
None,
)
.put(&no_generations_path, Bytes::new())
.await
.unwrap();

View File

@ -1,7 +1,6 @@
//! This module contains the IOx implementation for using S3 as the object
//! store.
use crate::{
buffer::slurp_stream_tempfile,
path::{cloud::CloudPath, DELIMITER},
ListResult, ObjectMeta, ObjectStoreApi,
};
@ -10,14 +9,14 @@ use bytes::Bytes;
use chrono::{DateTime, Utc};
use futures::{
stream::{self, BoxStream},
Future, Stream, StreamExt, TryStreamExt,
Future, StreamExt, TryStreamExt,
};
use observability_deps::tracing::{debug, warn};
use rusoto_core::ByteStream;
use rusoto_credential::{InstanceMetadataProvider, StaticProvider};
use rusoto_s3::S3;
use snafu::{OptionExt, ResultExt, Snafu};
use std::{convert::TryFrom, fmt, io, time::Duration};
use std::{convert::TryFrom, fmt, time::Duration};
/// A specialized `Result` for object store-related errors
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -143,30 +142,21 @@ impl ObjectStoreApi for AmazonS3 {
CloudPath::default()
}
async fn put<F, S>(&self, location: &Self::Path, bytes: F, length: Option<usize>) -> Result<()>
where
F: Fn() -> S + Clone + Send + Sync + Unpin + 'static,
S: Stream<Item = io::Result<Bytes>> + Send + Sync + 'static,
{
async fn put(&self, location: &Self::Path, bytes: Bytes) -> Result<()> {
let bucket_name = self.bucket_name.clone();
let key = location.to_raw();
let request_factory = move || {
let bytes = bytes.clone();
async move {
let bytes = bytes();
let bytes = match length {
Some(length) => ByteStream::new_with_size(bytes, length),
None => {
let bytes = slurp_stream_tempfile(bytes).await.unwrap();
let length = bytes.size();
ByteStream::new_with_size(bytes, length)
}
};
let length = bytes.len();
let stream_data = std::io::Result::Ok(bytes);
let stream = futures::stream::once(async move { stream_data });
let byte_stream = ByteStream::new_with_size(stream, length);
rusoto_s3::PutObjectRequest {
bucket: bucket_name.clone(),
key: key.clone(),
body: Some(bytes),
body: Some(byte_stream),
..Default::default()
}
}
@ -783,18 +773,9 @@ mod tests {
let mut location = integration.new_path();
location.set_file_name(NON_EXISTENT_NAME);
let data = Bytes::from("arbitrary data");
let data_len = data.len();
let stream_fn = move || {
let stream_data = std::io::Result::Ok(data.clone());
futures::stream::once(async move { stream_data })
};
let err = integration
.put(&location, stream_fn, Some(data_len))
.await
.unwrap_err();
let err = integration.put(&location, data).await.unwrap_err();
if let ObjectStoreError::AwsObjectStoreError {
source:
@ -830,19 +811,9 @@ mod tests {
let mut location = integration.new_path();
location.set_file_name(NON_EXISTENT_NAME);
let data = Bytes::from("arbitrary data");
let data_len = data.len();
let stream_fn = move || {
let stream_data = std::io::Result::Ok(data.clone());
futures::stream::once(async move { stream_data })
};
let err = integration
.put(&location, stream_fn, Some(data_len))
.await
.unwrap_err();
let err = integration.put(&location, data).await.unwrap_err();
if let ObjectStoreError::AwsObjectStoreError {
source:

View File

@ -14,11 +14,10 @@ use azure_storage::{
use bytes::Bytes;
use futures::{
stream::{self, BoxStream},
FutureExt, Stream, StreamExt, TryStreamExt,
FutureExt, StreamExt,
};
use snafu::{ensure, ResultExt, Snafu};
use std::sync::Arc;
use std::{convert::TryInto, io};
use snafu::{ResultExt, Snafu};
use std::{convert::TryInto, sync::Arc};
/// A specialized `Result` for Azure object store-related errors
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -27,9 +26,6 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug, Snafu)]
#[allow(missing_docs)]
pub enum Error {
#[snafu(display("Expected streamed data to have length {}, got {}", expected, actual))]
DataDoesNotMatchLength { expected: usize, actual: usize },
#[snafu(display("Unable to DELETE data. Location: {}, Error: {}", location, source,))]
UnableToDeleteData {
source: Box<dyn std::error::Error + Send + Sync>,
@ -70,31 +66,14 @@ impl ObjectStoreApi for MicrosoftAzure {
CloudPath::default()
}
async fn put<F, S>(&self, location: &Self::Path, bytes: F, length: Option<usize>) -> Result<()>
where
F: Fn() -> S + Clone + Send + Sync + Unpin + 'static,
S: Stream<Item = io::Result<Bytes>> + Send + Sync + 'static,
{
async fn put(&self, location: &Self::Path, bytes: Bytes) -> Result<()> {
let location = location.to_raw();
let temporary_non_streaming = bytes()
.map_ok(|b| bytes::BytesMut::from(&b[..]))
.try_concat()
.await
.expect("Should have been able to collect streaming data");
if let Some(length) = length {
ensure!(
temporary_non_streaming.len() == length,
DataDoesNotMatchLength {
actual: temporary_non_streaming.len(),
expected: length,
}
);
}
let bytes = bytes::BytesMut::from(&*bytes);
self.container_client
.as_blob_client(&location)
.put_block_blob(temporary_non_streaming)
.put_block_blob(bytes)
.execute()
.await
.context(UnableToPutData {

View File

@ -7,9 +7,9 @@ use async_trait::async_trait;
use bytes::Bytes;
use futures::{
stream::{self, BoxStream},
Stream, StreamExt, TryStreamExt,
StreamExt, TryStreamExt,
};
use snafu::{ensure, OptionExt, ResultExt, Snafu};
use snafu::{OptionExt, ResultExt, Snafu};
use std::sync::Arc;
use std::{collections::BTreeSet, convert::TryFrom, io, path::PathBuf};
use tokio::fs;
@ -23,9 +23,6 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug, Snafu)]
#[allow(missing_docs)]
pub enum Error {
#[snafu(display("Expected streamed data to have length {}, got {}", expected, actual))]
DataDoesNotMatchLength { expected: usize, actual: usize },
#[snafu(display("File size for {} did not fit in a usize: {}", path.display(), source))]
FileSizeOverflowedUsize {
source: std::num::TryFromIntError,
@ -58,9 +55,6 @@ pub enum Error {
#[snafu(display("Unable to read data from file {}: {}", path.display(), source))]
UnableToReadBytes { source: io::Error, path: PathBuf },
#[snafu(display("Unable to stream data from the request into memory: {}", source))]
UnableToStreamDataIntoMemory { source: std::io::Error },
}
/// Local filesystem storage suitable for testing or for opting out of using a
@ -79,26 +73,8 @@ impl ObjectStoreApi for File {
FilePath::default()
}
async fn put<F, S>(&self, location: &Self::Path, bytes: F, length: Option<usize>) -> Result<()>
where
F: Fn() -> S + Clone + Send + Sync + Unpin + 'static,
S: Stream<Item = io::Result<Bytes>> + Send + Sync + 'static,
{
let content = bytes()
.map_ok(|b| bytes::BytesMut::from(&b[..]))
.try_concat()
.await
.context(UnableToStreamDataIntoMemory)?;
if let Some(length) = length {
ensure!(
content.len() == length,
DataDoesNotMatchLength {
actual: content.len(),
expected: length,
}
);
}
async fn put(&self, location: &Self::Path, bytes: Bytes) -> Result<()> {
let content = bytes::BytesMut::from(&*bytes);
let path = self.path(location);
@ -323,9 +299,8 @@ mod tests {
use crate::{
tests::{list_with_delimiter, put_get_delete_list},
Error as ObjectStoreError, ObjectStore, ObjectStoreApi, ObjectStorePath,
ObjectStore, ObjectStoreApi, ObjectStorePath,
};
use futures::stream;
use tempfile::TempDir;
#[tokio::test]
@ -337,29 +312,6 @@ mod tests {
list_with_delimiter(&integration).await.unwrap();
}
#[tokio::test]
async fn length_mismatch_is_an_error() {
let root = TempDir::new().unwrap();
let integration = ObjectStore::new_file(root.path());
let bytes = move || stream::once(async { Ok(Bytes::from("hello world")) });
let mut location = integration.new_path();
location.set_file_name("junk");
let res = integration.put(&location, bytes, Some(0)).await;
assert!(matches!(
res.err().unwrap(),
ObjectStoreError::FileObjectStoreError {
source: Error::DataDoesNotMatchLength {
expected: 0,
actual: 11,
}
}
));
}
#[tokio::test]
async fn creates_dir_if_not_present() {
let root = TempDir::new().unwrap();
@ -370,16 +322,8 @@ mod tests {
let data = Bytes::from("arbitrary data");
let expected_data = data.clone();
let data_len = data.len();
let stream_fn = move || {
let stream_data = std::io::Result::Ok(data.clone());
futures::stream::once(async move { stream_data })
};
integration
.put(&location, stream_fn, Some(data_len))
.await
.unwrap();
integration.put(&location, data).await.unwrap();
let read_data = integration
.get(&location)
@ -402,12 +346,8 @@ mod tests {
let data = Bytes::from("arbitrary data");
let expected_data = data.clone();
let stream_fn = move || {
let stream_data = std::io::Result::Ok(data.clone());
futures::stream::once(async move { stream_data })
};
integration.put(&location, stream_fn, None).await.unwrap();
integration.put(&location, data).await.unwrap();
let read_data = integration
.get(&location)

View File

@ -1,6 +1,7 @@
//! Crate that mimics the interface of the the various object stores
//! but does nothing if they are not enabled.
use async_trait::async_trait;
use bytes::Bytes;
use snafu::Snafu;
use crate::{path::cloud::CloudPath, ObjectStoreApi};
@ -42,16 +43,7 @@ impl ObjectStoreApi for DummyObjectStore {
CloudPath::default()
}
async fn put<F, S>(
&self,
_location: &Self::Path,
_bytes: F,
_length: Option<usize>,
) -> crate::Result<(), Self::Error>
where
F: Fn() -> S + Clone + Send + Sync + Unpin + 'static,
S: futures::Stream<Item = std::io::Result<bytes::Bytes>> + Send + Sync + 'static,
{
async fn put(&self, _location: &Self::Path, _bytes: Bytes) -> crate::Result<(), Self::Error> {
NotSupported { name: &self.name }.fail()
}

View File

@ -7,9 +7,9 @@ use crate::{
use async_trait::async_trait;
use bytes::Bytes;
use cloud_storage::Client;
use futures::{stream::BoxStream, Stream, StreamExt, TryStreamExt};
use snafu::{ensure, ResultExt, Snafu};
use std::{convert::TryFrom, env, io};
use futures::{stream::BoxStream, StreamExt, TryStreamExt};
use snafu::{ResultExt, Snafu};
use std::{convert::TryFrom, env};
/// A specialized `Result` for Google Cloud Storage object store-related errors
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -86,28 +86,7 @@ impl ObjectStoreApi for GoogleCloudStorage {
CloudPath::default()
}
async fn put<F, S>(&self, location: &Self::Path, bytes: F, length: Option<usize>) -> Result<()>
where
F: Fn() -> S + Clone + Send + Sync + Unpin + 'static,
S: Stream<Item = io::Result<Bytes>> + Send + Sync + 'static,
{
let temporary_non_streaming = bytes()
.map_ok(|b| bytes::BytesMut::from(&b[..]))
.try_concat()
.await
.expect("Should have been able to collect streaming data")
.to_vec();
if let Some(length) = length {
ensure!(
temporary_non_streaming.len() == length,
DataDoesNotMatchLength {
actual: temporary_non_streaming.len(),
expected: length,
}
);
}
async fn put(&self, location: &Self::Path, bytes: Bytes) -> Result<()> {
let location = location.to_raw();
let location_copy = location.clone();
let bucket_name = self.bucket_name.clone();
@ -116,7 +95,7 @@ impl ObjectStoreApi for GoogleCloudStorage {
.object()
.create(
&bucket_name,
temporary_non_streaming,
bytes.to_vec(),
&location_copy,
"application/octet-stream",
)
@ -462,18 +441,9 @@ mod test {
let mut location = integration.new_path();
location.set_file_name(NON_EXISTENT_NAME);
let data = Bytes::from("arbitrary data");
let stream_data = std::io::Result::Ok(data.clone());
let err = integration
.put(
&location,
futures::stream::once(async move { stream_data }),
Some(data.len()),
)
.await
.unwrap_err();
let err = integration.put(&location, data).await.unwrap_err();
if let ObjectStoreError::GcsObjectStoreError {
source:

View File

@ -50,15 +50,16 @@ use throttle::ThrottledStore;
/// Publically expose throttling configuration
pub use throttle::ThrottleConfig;
use crate::cache::{Cache, LocalFSCache};
use crate::path::Path;
use crate::{
cache::{Cache, LocalFSCache},
path::Path,
};
use async_trait::async_trait;
use bytes::Bytes;
use chrono::{DateTime, Utc};
use futures::{stream::BoxStream, Stream, StreamExt, TryFutureExt, TryStreamExt};
use futures::{stream::BoxStream, StreamExt, TryFutureExt, TryStreamExt};
use snafu::{ResultExt, Snafu};
use std::sync::Arc;
use std::{io, path::PathBuf};
use std::{path::PathBuf, sync::Arc};
/// Universal API to multiple object store services.
#[async_trait]
@ -73,15 +74,7 @@ pub trait ObjectStoreApi: Send + Sync + 'static {
fn new_path(&self) -> Self::Path;
/// Save the provided bytes to the specified location.
async fn put<F, S>(
&self,
location: &Self::Path,
bytes: F,
length: Option<usize>,
) -> Result<(), Self::Error>
where
F: Fn() -> S + Clone + Send + Sync + Unpin + 'static,
S: Stream<Item = io::Result<Bytes>> + Send + Sync + 'static;
async fn put(&self, location: &Self::Path, bytes: Bytes) -> Result<(), Self::Error>;
/// Return the bytes that are stored at the specified location.
async fn get(
@ -240,32 +233,26 @@ impl ObjectStoreApi for ObjectStore {
}
}
async fn put<F, S>(&self, location: &Self::Path, bytes: F, length: Option<usize>) -> Result<()>
where
F: Fn() -> S + Clone + Send + Sync + Unpin + 'static,
S: Stream<Item = io::Result<Bytes>> + Send + Sync + 'static,
{
async fn put(&self, location: &Self::Path, bytes: Bytes) -> Result<()> {
use ObjectStoreIntegration::*;
match (&self.integration, location) {
(AmazonS3(s3), path::Path::AmazonS3(location)) => {
s3.put(location, bytes, length).await?
}
(AmazonS3(s3), path::Path::AmazonS3(location)) => s3.put(location, bytes).await?,
(GoogleCloudStorage(gcs), path::Path::GoogleCloudStorage(location)) => gcs
.put(location, bytes, length)
.put(location, bytes)
.await
.context(GcsObjectStoreError)?,
(InMemory(in_mem), path::Path::InMemory(location)) => {
in_mem.put(location, bytes, length).await?
in_mem.put(location, bytes).await?
}
(InMemoryThrottled(in_mem_throttled), path::Path::InMemory(location)) => {
in_mem_throttled.put(location, bytes, length).await?
in_mem_throttled.put(location, bytes).await?
}
(File(file), path::Path::File(location)) => file
.put(location, bytes, length)
.put(location, bytes)
.await
.context(FileObjectStoreError)?,
(MicrosoftAzure(azure), path::Path::MicrosoftAzure(location)) => {
azure.put(location, bytes, length).await?
azure.put(location, bytes).await?
}
_ => unreachable!(),
}
@ -685,12 +672,7 @@ mod tests {
let data = Bytes::from("arbitrary data");
let expected_data = data.clone();
let data_len = data.len();
let stream_fn = move || {
let stream_data = std::io::Result::Ok(data.clone());
futures::stream::once(async move { stream_data })
};
storage.put(&location, stream_fn, Some(data_len)).await?;
storage.put(&location, data).await?;
// List everything
let content_list = flatten_list_stream(storage, None).await?;
@ -733,7 +715,6 @@ mod tests {
// ==================== do: create files ====================
let data = Bytes::from("arbitrary data");
let data_len = data.len();
let files: Vec<_> = [
"test_file",
@ -751,11 +732,7 @@ mod tests {
for f in &files {
let data = data.clone();
let stream_fn = move || {
let stream_data = std::io::Result::Ok(data.clone());
futures::stream::once(async move { stream_data })
};
storage.put(f, stream_fn, Some(data_len)).await.unwrap();
storage.put(f, data).await.unwrap();
}
// ==================== check: prefix-list `mydb/wb` (directory) ====================

View File

@ -4,10 +4,10 @@ use crate::{path::parsed::DirsAndFileName, ListResult, ObjectMeta, ObjectStoreAp
use async_trait::async_trait;
use bytes::Bytes;
use chrono::Utc;
use futures::{stream::BoxStream, Stream, StreamExt, TryStreamExt};
use snafu::{ensure, OptionExt, ResultExt, Snafu};
use futures::{stream::BoxStream, StreamExt};
use snafu::{OptionExt, Snafu};
use std::collections::BTreeMap;
use std::collections::BTreeSet;
use std::{collections::BTreeMap, io};
use tokio::sync::RwLock;
/// A specialized `Result` for in-memory object store-related errors
@ -17,12 +17,6 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug, Snafu)]
#[allow(missing_docs)]
pub enum Error {
#[snafu(display("Expected streamed data to have length {}, got {}", expected, actual))]
DataDoesNotMatchLength { expected: usize, actual: usize },
#[snafu(display("Unable to stream data from the request into memory: {}", source))]
UnableToStreamDataIntoMemory { source: std::io::Error },
#[snafu(display("No data in memory found. Location: {}", location))]
NoDataInMemory { location: String },
}
@ -43,33 +37,11 @@ impl ObjectStoreApi for InMemory {
DirsAndFileName::default()
}
async fn put<F, S>(&self, location: &Self::Path, bytes: F, length: Option<usize>) -> Result<()>
where
F: Fn() -> S + Clone + Send + Sync + Unpin + 'static,
S: Stream<Item = io::Result<Bytes>> + Send + Sync + 'static,
{
let content = bytes()
.map_ok(|b| bytes::BytesMut::from(&b[..]))
.try_concat()
.await
.context(UnableToStreamDataIntoMemory)?;
if let Some(length) = length {
ensure!(
content.len() == length,
DataDoesNotMatchLength {
actual: content.len(),
expected: length,
}
);
}
let content = content.freeze();
async fn put(&self, location: &Self::Path, bytes: Bytes) -> Result<()> {
self.storage
.write()
.await
.insert(location.to_owned(), content);
.insert(location.to_owned(), bytes);
Ok(())
}
@ -178,9 +150,9 @@ mod tests {
use crate::{
tests::{list_with_delimiter, put_get_delete_list},
Error as ObjectStoreError, ObjectStore, ObjectStoreApi, ObjectStorePath,
ObjectStore, ObjectStoreApi, ObjectStorePath,
};
use futures::stream;
use futures::TryStreamExt;
#[tokio::test]
async fn in_memory_test() {
@ -190,28 +162,6 @@ mod tests {
list_with_delimiter(&integration).await.unwrap();
}
#[tokio::test]
async fn length_mismatch_is_an_error() {
let integration = ObjectStore::new_in_memory();
let mut location = integration.new_path();
location.set_file_name("junk");
let bytes = move || stream::once(async { Ok(Bytes::from("hello world")) });
let res = integration.put(&location, bytes, Some(0)).await;
assert!(matches!(
res.err().unwrap(),
ObjectStoreError::InMemoryObjectStoreError {
source: Error::DataDoesNotMatchLength {
expected: 0,
actual: 11,
}
}
));
}
#[tokio::test]
async fn unknown_length() {
let integration = ObjectStore::new_in_memory();
@ -221,12 +171,8 @@ mod tests {
let data = Bytes::from("arbitrary data");
let expected_data = data.clone();
let stream_fn = move || {
let stream_data = std::io::Result::Ok(data.clone());
futures::stream::once(async move { stream_data })
};
integration.put(&location, stream_fn, None).await.unwrap();
integration.put(&location, data).await.unwrap();
let read_data = integration
.get(&location)

View File

@ -1,78 +1,75 @@
//! This module contains the IOx implementation for wrapping existing object store types into an artificial "sleep" wrapper.
use std::{convert::TryInto, io, sync::Arc};
use std::convert::TryInto;
use crate::{ListResult, ObjectStoreApi, Result};
use async_trait::async_trait;
use bytes::Bytes;
use futures::{stream::BoxStream, Stream, StreamExt};
use tokio::{
sync::Mutex,
time::{sleep, Duration},
};
use futures::{stream::BoxStream, StreamExt};
use tokio::time::{sleep, Duration};
/// Configuration settings for throttled store
#[derive(Debug, Default, Clone, Copy)]
pub struct ThrottleConfig {
/// Sleep duration for every call to [`delete`](ThrottledStore::delete).
///
/// Sleeping is done before the underlying store is called and independently of the success of the operation.
/// Sleeping is done before the underlying store is called and independently of the success of
/// the operation.
pub wait_delete_per_call: Duration,
/// Sleep duration for every byte received during [`get`](ThrottledStore::get).
///
/// Sleeping is performed after the underlying store returned and only for successful gets. The sleep duration is
/// additive to [`wait_get_per_call`](Self::wait_get_per_call).
/// Sleeping is performed after the underlying store returned and only for successful gets. The
/// sleep duration is additive to [`wait_get_per_call`](Self::wait_get_per_call).
///
/// Note that the per-byte sleep only happens as the user consumes the output bytes. Should there be an
/// intermediate failure (i.e. after partly consuming the output bytes), the resulting sleep time will be partial as well.
/// Note that the per-byte sleep only happens as the user consumes the output bytes. Should
/// there be an intermediate failure (i.e. after partly consuming the output bytes), the
/// resulting sleep time will be partial as well.
pub wait_get_per_byte: Duration,
/// Sleep duration for every call to [`get`](ThrottledStore::get).
///
/// Sleeping is done before the underlying store is called and independently of the success of the operation. The
/// sleep duration is additive to [`wait_get_per_byte`](Self::wait_get_per_byte).
/// Sleeping is done before the underlying store is called and independently of the success of
/// the operation. The sleep duration is additive to
/// [`wait_get_per_byte`](Self::wait_get_per_byte).
pub wait_get_per_call: Duration,
/// Sleep duration for every call to [`list`](ThrottledStore::list).
///
/// Sleeping is done before the underlying store is called and independently of the success of the operation. The
/// sleep duration is additive to [`wait_list_per_entry`](Self::wait_list_per_entry).
/// Sleeping is done before the underlying store is called and independently of the success of
/// the operation. The sleep duration is additive to
/// [`wait_list_per_entry`](Self::wait_list_per_entry).
pub wait_list_per_call: Duration,
/// Sleep duration for every entry received during [`list`](ThrottledStore::list).
///
/// Sleeping is performed after the underlying store returned and only for successful lists. The sleep duration is
/// additive to [`wait_list_per_call`](Self::wait_list_per_call).
/// Sleeping is performed after the underlying store returned and only for successful lists.
/// The sleep duration is additive to [`wait_list_per_call`](Self::wait_list_per_call).
///
/// Note that the per-entry sleep only happens as the user consumes the output entries. Should there be an
/// intermediate failure (i.e. after partly consuming the output entries), the resulting sleep time will be partial as well.
/// Note that the per-entry sleep only happens as the user consumes the output entries. Should
/// there be an intermediate failure (i.e. after partly consuming the output entries), the
/// resulting sleep time will be partial as well.
pub wait_list_per_entry: Duration,
/// Sleep duration for every call to [`list_with_delimiter`](ThrottledStore::list_with_delimiter).
/// Sleep duration for every call to
/// [`list_with_delimiter`](ThrottledStore::list_with_delimiter).
///
/// Sleeping is done before the underlying store is called and independently of the success of the operation. The
/// sleep duration is additive to [`wait_list_with_delimiter_per_entry`](Self::wait_list_with_delimiter_per_entry).
/// Sleeping is done before the underlying store is called and independently of the success of
/// the operation. The sleep duration is additive to
/// [`wait_list_with_delimiter_per_entry`](Self::wait_list_with_delimiter_per_entry).
pub wait_list_with_delimiter_per_call: Duration,
/// Sleep duration for every entry received during [`list_with_delimiter`](ThrottledStore::list_with_delimiter).
/// Sleep duration for every entry received during
/// [`list_with_delimiter`](ThrottledStore::list_with_delimiter).
///
/// Sleeping is performed after the underlying store returned and only for successful gets. The sleep duration is
/// additive to [`wait_list_with_delimiter_per_call`](Self::wait_list_with_delimiter_per_call).
/// Sleeping is performed after the underlying store returned and only for successful gets. The
/// sleep duration is additive to
/// [`wait_list_with_delimiter_per_call`](Self::wait_list_with_delimiter_per_call).
pub wait_list_with_delimiter_per_entry: Duration,
/// Sleep duration for every byte send during [`put`](ThrottledStore::put).
///
/// Sleeping is done before the underlying store is called and independently of the complete success of the operation. The
/// sleep duration is additive to [`wait_put_per_call`](Self::wait_put_per_call).
///
/// Note that the per-byte sleep only happens as the underlying store consumes the bytes. Should there be an
/// intermediate failure (i.e. after partly consuming the input bytes), the resulting sleep time will be partial as well.
pub wait_put_per_byte: Duration,
/// Sleep duration for every call to [`put`](ThrottledStore::put).
///
/// Sleeping is done before the underlying store is called and independently of the success of the operation. The
/// sleep duration is additive to [`wait_put_per_byte`](Self::wait_put_per_byte).
/// Sleeping is done before the underlying store is called and independently of the success of
/// the operation.
pub wait_put_per_call: Duration,
}
@ -80,7 +77,8 @@ pub struct ThrottleConfig {
///
/// This can be used for performance testing.
///
/// **Note that the behavior of the wrapper is deterministic and might not reflect real-world conditions!**
/// **Note that the behavior of the wrapper is deterministic and might not reflect real-world
/// conditions!**
#[derive(Debug)]
pub struct ThrottledStore<T: ObjectStoreApi> {
inner: T,
@ -114,47 +112,10 @@ impl<T: ObjectStoreApi> ObjectStoreApi for ThrottledStore<T> {
self.inner.new_path()
}
async fn put<F, S>(
&self,
location: &Self::Path,
bytes: F,
length: Option<usize>,
) -> Result<(), Self::Error>
where
F: Fn() -> S + Clone + Send + Sync + Unpin + 'static,
S: Stream<Item = io::Result<Bytes>> + Send + Sync + 'static,
{
async fn put(&self, location: &Self::Path, bytes: Bytes) -> Result<(), Self::Error> {
sleep(self.config.wait_put_per_call).await;
// need to copy to avoid moving / referencing `self`
let wait_put_per_byte = self.config.wait_put_per_byte;
let length_remaining = Arc::new(Mutex::new(length));
let bytes = move || {
let length_remaining = Arc::clone(&length_remaining);
bytes().then(move |bytes_result| {
let length_remaining = Arc::clone(&length_remaining);
async move {
match bytes_result {
Ok(bytes) => {
let mut bytes_len = bytes.len();
let mut length_remaining_inner2 = length_remaining.lock().await;
if let Some(length) = length_remaining_inner2.as_mut() {
let length_new = length.saturating_sub(bytes_len);
bytes_len = bytes_len.min(*length);
*length = length_new;
};
sleep(wait_put_per_byte * usize_to_u32_saturate(bytes_len)).await;
Ok(bytes)
}
Err(err) => Err(err),
}
}
})
};
self.inner.put(location, bytes, length).await
self.inner.put(location, bytes).await
}
async fn get(
@ -367,12 +328,7 @@ mod tests {
assert_bounds!(measure_put(&store, 10).await, 1);
store.config_mut().wait_put_per_call = ZERO;
store.config_mut().wait_put_per_byte = WAIT_TIME;
assert_bounds!(measure_put(&store, 2).await, 2);
store.config_mut().wait_put_per_call = WAIT_TIME;
store.config_mut().wait_put_per_byte = WAIT_TIME;
assert_bounds!(measure_put(&store, 2).await, 3);
assert_bounds!(measure_put(&store, 0).await, 0);
}
async fn place_test_object(
@ -383,12 +339,9 @@ mod tests {
path.set_file_name("foo");
if let Some(n_bytes) = n_bytes {
let stream_fn = move || {
let data = std::iter::repeat(1u8).take(n_bytes).collect();
let stream_data = std::io::Result::Ok(data);
futures::stream::once(async move { stream_data })
};
store.put(&path, stream_fn, None).await.unwrap();
let data: Vec<_> = std::iter::repeat(1u8).take(n_bytes).collect();
let bytes = Bytes::from(data);
store.put(&path, bytes).await.unwrap();
} else {
// ensure object is absent
store.delete(&path).await.unwrap();
@ -421,13 +374,8 @@ mod tests {
let mut path = prefix.clone();
path.set_file_name(&i.to_string());
let stream_fn = move || {
let data = Bytes::from("bar");
let stream_data = std::io::Result::Ok(data);
futures::stream::once(async move { stream_data })
};
store.put(&path, stream_fn, None).await.unwrap();
let data = Bytes::from("bar");
store.put(&path, data).await.unwrap();
}
prefix
@ -492,14 +440,11 @@ mod tests {
let mut path = store.new_path();
path.set_file_name("foo");
let stream_fn = move || {
let data = std::iter::repeat(1u8).take(n_bytes).collect();
let stream_data = std::io::Result::Ok(data);
futures::stream::once(async move { stream_data })
};
let data: Vec<_> = std::iter::repeat(1u8).take(n_bytes).collect();
let bytes = Bytes::from(data);
let t0 = Instant::now();
store.put(&path, stream_fn, None).await.unwrap();
store.put(&path, bytes).await.unwrap();
t0.elapsed()
}

View File

@ -1325,16 +1325,9 @@ mod tests {
let tkey = trace.tkeys[0];
let path = TransactionFilePath::new_transaction(tkey.revision_counter, tkey.uuid);
let data = Bytes::from("foo");
let len = data.len();
iox_object_store
.put_catalog_transaction_file(
&path,
move || {
let data = data.clone();
futures::stream::once(async move { Ok(data) })
},
Some(len),
)
.put_catalog_transaction_file(&path, data)
.await
.unwrap();
@ -2189,16 +2182,9 @@ mod tests {
let tkey = trace.tkeys[0];
let path = TransactionFilePath::new_transaction(tkey.revision_counter, tkey.uuid);
let data = Bytes::from("foo");
let len = data.len();
iox_object_store
.put_catalog_transaction_file(
&path,
move || {
let data = data.clone();
futures::stream::once(async move { Ok(data) })
},
Some(len),
)
.put_catalog_transaction_file(&path, data)
.await
.unwrap();

View File

@ -36,17 +36,9 @@ pub async fn store_transaction_proto(
let mut data = Vec::new();
proto.encode(&mut data).context(Serialization {})?;
let data = Bytes::from(data);
let len = data.len();
iox_object_store
.put_catalog_transaction_file(
path,
move || {
let data = data.clone();
futures::stream::once(async move { Ok(data) })
},
Some(len),
)
.put_catalog_transaction_file(path, data)
.await
.context(Write {})?;

View File

@ -210,18 +210,10 @@ impl Storage {
/// Put the given vector of bytes to the specified location
pub async fn to_object_store(&self, data: Vec<u8>, path: &ParquetFilePath) -> Result<()> {
let len = data.len();
let data = Bytes::from(data);
self.iox_object_store
.put_parquet_file(
path,
move || {
let data = data.clone();
futures::stream::once(async move { Result::Ok(data) })
},
Some(len),
)
.put_parquet_file(path, data)
.await
.context(WritingToObjectStore)
}

View File

@ -4087,18 +4087,8 @@ mod tests {
}
async fn create_empty_file(iox_object_store: &IoxObjectStore, path: &ParquetFilePath) {
let data = Bytes::default();
let len = data.len();
iox_object_store
.put_parquet_file(
path,
move || {
let data = data.clone();
futures::stream::once(async move { Ok(data) })
},
Some(len),
)
.put_parquet_file(path, Bytes::new())
.await
.unwrap();
}

View File

@ -1947,11 +1947,7 @@ mod tests {
fake_db_path.set_file_name("not-a-generation");
application
.object_store()
.put(
&fake_db_path,
move || futures::stream::once(async move { Ok(Bytes::new()) }),
None,
)
.put(&fake_db_path, Bytes::new())
.await
.unwrap();

View File

@ -160,7 +160,6 @@ fn create_throttled_store() -> Arc<ObjectStore> {
// for upload/download: assume 1GByte/s
wait_get_per_byte: Duration::from_secs(1) / 1_000_000_000,
wait_put_per_byte: Duration::from_secs(1) / 1_000_000_000,
};
Arc::new(ObjectStore::new_in_memory_throttled(config))

View File

@ -218,7 +218,6 @@ impl TryFrom<&ObjectStoreConfig> for ObjectStore {
// for upload/download: assume 1GByte/s
wait_get_per_byte: Duration::from_secs(1) / 1_000_000_000,
wait_put_per_byte: Duration::from_secs(1) / 1_000_000_000,
};
Ok(Self::new_in_memory_throttled(config))