feat: update to crates.io object_store version (#4595)

* feat: update to crates.io object_store version

* chore: Run cargo hakari tasks

* fix: tests

* chore: remove object store integration test plumbing

Co-authored-by: CircleCI[bot] <circleci@influxdata.com>
pull/24376/head
Raphael Taylor-Davies 2022-05-13 17:26:07 +01:00 committed by GitHub
parent 35d80fc512
commit f2bb0fdf77
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
35 changed files with 55 additions and 3858 deletions

View File

@ -201,8 +201,6 @@ jobs:
# setup multiple docker images (see https://circleci.com/docs/2.0/configuration-reference/#docker)
docker:
- image: quay.io/influxdb/rust:ci
- image: localstack/localstack
- image: mcr.microsoft.com/azure-storage/azurite
- image: vectorized/redpanda:v21.9.2
command: redpanda start --overprovisioned --smp 1 --memory 1G --reserve-memory 0M
- image: postgres
@ -222,38 +220,18 @@ jobs:
TEST_INTEGRATION: 1
INFLUXDB_IOX_INTEGRATION_LOCAL: 1
KAFKA_CONNECT: "localhost:9092"
AWS_DEFAULT_REGION: "us-east-1"
AWS_ACCESS_KEY_ID: test
AWS_SECRET_ACCESS_KEY: test
AWS_ENDPOINT: http://127.0.0.1:4566
AZURE_USE_EMULATOR: "1"
OBJECT_STORE_BUCKET: iox-test
POSTGRES_USER: postgres
TEST_INFLUXDB_IOX_CATALOG_DSN: "postgres://postgres@localhost/iox_shared"
# When removing this, also remove the ignore on the test in trogging/src/cli.rs
RUST_LOG: debug
LOG_FILTER: debug
steps:
- run:
name: Setup localstack (AWS emulation)
command: |
cd /tmp
curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip"
unzip awscliv2.zip
sudo ./aws/install
aws --endpoint-url=http://localhost:4566 s3 mb s3://iox-test
- run:
name: Setup Azurite (Azure emulation)
# the magical connection string is from https://docs.microsoft.com/en-us/azure/storage/common/storage-use-azurite?tabs=visual-studio#http-connection-strings
command: |
curl -sL https://aka.ms/InstallAzureCLIDeb | sudo bash
az storage container create -n iox-test --connection-string 'DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;QueueEndpoint=http://127.0.0.1:10001/devstoreaccount1;'
- checkout
- rust_components
- cache_restore
- run:
name: Cargo test
command: cargo test --workspace --features=aws,azure,azure_test
command: cargo test --workspace
- cache_save
# end to end tests with Heappy (heap profiling enabled)

10
Cargo.lock generated
View File

@ -3399,7 +3399,9 @@ dependencies = [
[[package]]
name = "object_store"
version = "0.1.0"
version = "0.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "71fc15440e94ba9b3a8bc83c8f94a0d5a2861d72254cdb9ede72fbbd18e015db"
dependencies = [
"async-trait",
"azure_core",
@ -3408,13 +3410,10 @@ dependencies = [
"bytes",
"chrono",
"cloud-storage",
"dotenv",
"futures",
"futures-test",
"hyper",
"hyper-rustls",
"itertools",
"observability_deps",
"percent-encoding",
"reqwest",
"rusoto_core",
@ -3424,8 +3423,8 @@ dependencies = [
"tempfile",
"tokio",
"tokio-util 0.7.1",
"tracing",
"walkdir",
"workspace-hack",
]
[[package]]
@ -6625,6 +6624,7 @@ dependencies = [
"num-bigint 0.4.3",
"num-integer",
"num-traits",
"object_store",
"once_cell",
"parquet",
"predicates",

View File

@ -41,7 +41,6 @@ members = [
"mutable_batch_lp",
"mutable_batch_pb",
"mutable_batch_tests",
"object_store",
"object_store_metrics",
"observability_deps",
"packers",

View File

@ -10,7 +10,7 @@ humantime = "2.1.0"
iox_catalog = { path = "../iox_catalog" }
iox_time = { path = "../iox_time" }
metric = { path = "../metric" }
object_store = { path = "../object_store" }
object_store = "0.0.1"
observability_deps = { path = "../observability_deps" }
snafu = "0.7"
tempfile = "3.1.0"

View File

@ -369,7 +369,7 @@ pub fn make_object_store(config: &ObjectStoreConfig) -> Result<Arc<DynObjectStor
Some(db_dir) => {
fs::create_dir_all(db_dir)
.context(CreatingDatabaseDirectorySnafu { path: db_dir })?;
Ok(Arc::new(object_store::disk::LocalFileSystem::new(&db_dir)))
Ok(Arc::new(object_store::local::LocalFileSystem::new(&db_dir)))
}
None => MissingObjectStoreConfigSnafu {
object_store: ObjectStoreType::File,

View File

@ -14,7 +14,7 @@ datafusion = { path = "../datafusion" }
futures = "0.3"
iox_catalog = { path = "../iox_catalog" }
metric = { path = "../metric" }
object_store = { path = "../object_store" }
object_store = "0.0.1"
observability_deps = { path = "../observability_deps" }
parquet_file = { path = "../parquet_file" }
predicate = { path = "../predicate" }

View File

@ -1108,10 +1108,10 @@ mod tests {
use super::*;
use arrow_util::assert_batches_sorted_eq;
use data_types::{ChunkId, KafkaPartition, NamespaceId, ParquetFileParams, SequenceNumber};
use futures::TryStreamExt;
use iox_catalog::interface::INITIAL_COMPACTION_LEVEL;
use iox_tests::util::TestCatalog;
use iox_time::SystemProvider;
use object_store::ObjectStoreTestConvenience;
use querier::{
cache::CatalogCache,
chunk::{collect_read_filter, ParquetChunkAdapter},
@ -2488,7 +2488,8 @@ mod tests {
.await
.unwrap();
let object_store_files = compactor.object_store.list_all().await.unwrap();
let list = compactor.object_store.list(None).await.unwrap();
let object_store_files: Vec<_> = list.try_collect().await.unwrap();
assert_eq!(object_store_files.len(), 1);
}

View File

@ -94,9 +94,9 @@ impl GarbageCollector {
mod tests {
use super::*;
use data_types::{KafkaPartition, ParquetFile, ParquetFileParams, SequenceNumber};
use futures::{StreamExt, TryStreamExt};
use iox_catalog::interface::INITIAL_COMPACTION_LEVEL;
use iox_tests::util::TestCatalog;
use object_store::ObjectStoreTestConvenience;
use std::time::Duration;
use uuid::Uuid;
@ -211,7 +211,9 @@ mod tests {
1
);
assert_eq!(catalog.object_store.list_all().await.unwrap().len(), 1);
let list = catalog.object_store.list(None).await.unwrap();
let obj_store_paths: Vec<_> = list.try_collect().await.unwrap();
assert_eq!(obj_store_paths.len(), 1);
}
#[tokio::test]
@ -294,7 +296,10 @@ mod tests {
.unwrap(),
1
);
assert_eq!(catalog.object_store.list_all().await.unwrap().len(), 1);
let list = catalog.object_store.list(None).await.unwrap();
let obj_store_paths: Vec<_> = list.try_collect().await.unwrap();
assert_eq!(obj_store_paths.len(), 1);
}
#[tokio::test]
@ -377,6 +382,7 @@ mod tests {
.unwrap(),
0
);
assert!(catalog.object_store.list_all().await.unwrap().is_empty());
let mut list = catalog.object_store.list(None).await.unwrap();
assert!(list.next().await.is_none());
}
}

View File

@ -22,7 +22,7 @@ ioxd_router = { path = "../ioxd_router"}
ioxd_querier = { path = "../ioxd_querier"}
ioxd_test = { path = "../ioxd_test"}
metric = { path = "../metric" }
object_store = { path = "../object_store" }
object_store = "0.0.1"
object_store_metrics = { path = "../object_store_metrics" }
observability_deps = { path = "../observability_deps" }
panic_logging = { path = "../panic_logging" }

View File

@ -23,7 +23,7 @@ iox_catalog = { path = "../iox_catalog" }
metric = { path = "../metric" }
mutable_batch = { path = "../mutable_batch"}
mutable_batch_lp = { path = "../mutable_batch_lp" }
object_store = { path = "../object_store" }
object_store = "0.0.1"
observability_deps = { path = "../observability_deps" }
parking_lot = "0.12"
parquet_file = { path = "../parquet_file" }

View File

@ -82,10 +82,10 @@ pub async fn persist(
mod tests {
use super::*;
use data_types::{NamespaceId, PartitionId, SequenceNumber, SequencerId, TableId};
use futures::{StreamExt, TryStreamExt};
use iox_catalog::interface::INITIAL_COMPACTION_LEVEL;
use iox_time::Time;
use object_store::memory::InMemory;
use object_store::ObjectStoreTestConvenience;
use query::test::{raw_data, TestChunk};
use std::sync::Arc;
use uuid::Uuid;
@ -124,7 +124,8 @@ mod tests {
.await
.unwrap();
assert!(object_store.list_all().await.unwrap().is_empty());
let mut list = object_store.list(None).await.unwrap();
assert!(list.next().await.is_none());
}
#[tokio::test]
@ -165,7 +166,8 @@ mod tests {
.await
.unwrap();
let obj_store_paths = object_store.list_all().await.unwrap();
let list = object_store.list(None).await.unwrap();
let obj_store_paths: Vec<_> = list.try_collect().await.unwrap();
assert_eq!(obj_store_paths.len(), 1);
}
}

View File

@ -14,7 +14,7 @@ iox_catalog = { path = "../iox_catalog" }
iox_time = { path = "../iox_time" }
metric = { path = "../metric" }
mutable_batch_lp = { path = "../mutable_batch_lp" }
object_store = { path = "../object_store" }
object_store = "0.0.1"
parquet_file = { path = "../parquet_file" }
query = { path = "../query" }
schema = { path = "../schema" }

View File

@ -14,7 +14,7 @@ iox_catalog = { path = "../iox_catalog" }
ioxd_common = { path = "../ioxd_common" }
metric = { path = "../metric" }
query = { path = "../query" }
object_store = { path = "../object_store" }
object_store = "0.0.1"
iox_time = { path = "../iox_time" }
trace = { path = "../trace" }

View File

@ -11,7 +11,7 @@ ingester = { path = "../ingester" }
iox_catalog = { path = "../iox_catalog" }
ioxd_common = { path = "../ioxd_common" }
metric = { path = "../metric" }
object_store = { path = "../object_store" }
object_store = "0.0.1"
query = { path = "../query" }
trace = { path = "../trace" }
write_buffer = { path = "../write_buffer" }

View File

@ -12,7 +12,7 @@ generated_types = { path = "../generated_types" }
iox_catalog = { path = "../iox_catalog" }
ioxd_common = { path = "../ioxd_common" }
metric = { path = "../metric" }
object_store = { path = "../object_store" }
object_store = "0.0.1"
querier = { path = "../querier" }
query = { path = "../query" }
service_grpc_flight = { path = "../service_grpc_flight" }

View File

@ -11,7 +11,7 @@ iox_catalog = { path = "../iox_catalog" }
ioxd_common = { path = "../ioxd_common" }
metric = { path = "../metric" }
mutable_batch = { path = "../mutable_batch" }
object_store = { path = "../object_store" }
object_store = "0.0.1"
observability_deps = { path = "../observability_deps" }
router = { path = "../router" }
trace = { path = "../trace" }

View File

@ -1,48 +0,0 @@
[package]
name = "object_store"
version = "0.1.0"
authors = ["Paul Dix <paul@pauldix.net>"]
edition = "2021"
[dependencies] # In alphabetical order
async-trait = "0.1.53"
# Microsoft Azure Blob storage integration
azure_core = { version = "0.2", optional = true, default-features = false, features = ["enable_reqwest_rustls"] }
azure_storage = { version = "0.2", optional = true, default-features = false, features = ["account"] }
azure_storage_blobs = { version = "0.2", optional = true, default-features = false, features = ["enable_reqwest_rustls"] }
bytes = "1.0"
chrono = { version = "0.4", default-features = false, features = ["clock"] }
# Google Cloud Storage integration
cloud-storage = { version = "0.11.1", optional = true, default-features = false, features = ["rustls-tls"] }
futures = "0.3"
# for rusoto
hyper = { version = "0.14", optional = true, default-features = false }
# for rusoto
hyper-rustls = { version = "0.23.0", optional = true, default-features = false, features = ["webpki-tokio", "http1", "http2", "tls12"] }
itertools = "0.10.1"
observability_deps = { path = "../observability_deps", optional = true }
percent-encoding = "2.1"
# rusoto crates are for Amazon S3 integration
rusoto_core = { version = "0.48.0", optional = true, default-features = false, features = ["rustls"] }
rusoto_credential = { version = "0.48.0", optional = true, default-features = false }
rusoto_s3 = { version = "0.48.0", optional = true, default-features = false, features = ["rustls"] }
snafu = "0.7"
tokio = { version = "1.18", features = ["fs", "io-util", "macros", "parking_lot", "rt-multi-thread", "time"] }
# Filesystem integration
tokio-util = { version = "0.7.1", features = ["codec", "io"] }
reqwest = { version = "0.11", optional = true, default-features = false, features = ["rustls-tls"] }
# Filesystem integration
walkdir = "2"
tempfile = "3.1.0"
workspace-hack = { path = "../workspace-hack" }
[features]
azure = ["azure_core", "azure_storage_blobs", "azure_storage", "reqwest"]
azure_test = ["azure", "azure_core/azurite_workaround", "azure_storage/azurite_workaround", "azure_storage_blobs/azurite_workaround"]
gcp = ["cloud-storage"]
aws = ["rusoto_core", "rusoto_credential", "rusoto_s3", "hyper", "hyper-rustls", "observability_deps"]
[dev-dependencies] # In alphabetical order
dotenv = "0.15.0"
tempfile = "3.1.0"
futures-test = "0.3"

View File

@ -1,940 +0,0 @@
//! This module contains the IOx implementation for using S3 as the object
//! store.
use crate::{
path::{Path, DELIMITER},
GetResult, ListResult, ObjectMeta, ObjectStoreApi, Result,
};
use async_trait::async_trait;
use bytes::Bytes;
use chrono::{DateTime, Utc};
use futures::{
stream::{self, BoxStream},
Future, StreamExt, TryStreamExt,
};
use hyper::client::Builder as HyperBuilder;
use observability_deps::tracing::{debug, warn};
use rusoto_core::ByteStream;
use rusoto_credential::{InstanceMetadataProvider, StaticProvider};
use rusoto_s3::S3;
use snafu::{OptionExt, ResultExt, Snafu};
use std::{convert::TryFrom, fmt, num::NonZeroUsize, ops::Deref, sync::Arc, time::Duration};
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
/// The maximum number of times a request will be retried in the case of an AWS server error
pub const MAX_NUM_RETRIES: u32 = 3;
/// A specialized `Error` for object store-related errors
#[derive(Debug, Snafu)]
#[allow(missing_docs)]
enum Error {
#[snafu(display("Expected streamed data to have length {}, got {}", expected, actual))]
DataDoesNotMatchLength { expected: usize, actual: usize },
#[snafu(display("Did not receive any data. Bucket: {}, Location: {}", bucket, path))]
NoData { bucket: String, path: String },
#[snafu(display(
"Unable to DELETE data. Bucket: {}, Location: {}, Error: {} ({:?})",
bucket,
path,
source,
source,
))]
UnableToDeleteData {
source: rusoto_core::RusotoError<rusoto_s3::DeleteObjectError>,
bucket: String,
path: String,
},
#[snafu(display(
"Unable to GET data. Bucket: {}, Location: {}, Error: {} ({:?})",
bucket,
path,
source,
source,
))]
UnableToGetData {
source: rusoto_core::RusotoError<rusoto_s3::GetObjectError>,
bucket: String,
path: String,
},
#[snafu(display(
"Unable to HEAD data. Bucket: {}, Location: {}, Error: {} ({:?})",
bucket,
path,
source,
source,
))]
UnableToHeadData {
source: rusoto_core::RusotoError<rusoto_s3::HeadObjectError>,
bucket: String,
path: String,
},
#[snafu(display(
"Unable to GET part of the data. Bucket: {}, Location: {}, Error: {} ({:?})",
bucket,
path,
source,
source,
))]
UnableToGetPieceOfData {
source: std::io::Error,
bucket: String,
path: String,
},
#[snafu(display(
"Unable to PUT data. Bucket: {}, Location: {}, Error: {} ({:?})",
bucket,
path,
source,
source,
))]
UnableToPutData {
source: rusoto_core::RusotoError<rusoto_s3::PutObjectError>,
bucket: String,
path: String,
},
#[snafu(display(
"Unable to list data. Bucket: {}, Error: {} ({:?})",
bucket,
source,
source,
))]
UnableToListData {
source: rusoto_core::RusotoError<rusoto_s3::ListObjectsV2Error>,
bucket: String,
},
#[snafu(display(
"Unable to parse last modified date. Bucket: {}, Error: {} ({:?})",
bucket,
source,
source,
))]
UnableToParseLastModified {
source: chrono::ParseError,
bucket: String,
},
#[snafu(display(
"Unable to buffer data into temporary file, Error: {} ({:?})",
source,
source,
))]
UnableToBufferStream { source: std::io::Error },
#[snafu(display(
"Could not parse `{}` as an AWS region. Regions should look like `us-east-2`. {} ({:?})",
region,
source,
source,
))]
InvalidRegion {
region: String,
source: rusoto_core::region::ParseRegionError,
},
#[snafu(display("Missing aws-access-key"))]
MissingAccessKey,
#[snafu(display("Missing aws-secret-access-key"))]
MissingSecretAccessKey,
NotFound {
path: String,
source: Box<dyn std::error::Error + Send + Sync + 'static>,
},
}
impl From<Error> for super::Error {
fn from(source: Error) -> Self {
match source {
Error::NotFound { path, source } => Self::NotFound { path, source },
_ => Self::Generic {
store: "S3",
source: Box::new(source),
},
}
}
}
/// Configuration for connecting to [Amazon S3](https://aws.amazon.com/s3/).
pub struct AmazonS3 {
/// S3 client w/o any connection limit.
///
/// You should normally use [`Self::client`] instead.
client_unrestricted: rusoto_s3::S3Client,
/// Semaphore that limits the usage of [`client_unrestricted`](Self::client_unrestricted).
connection_semaphore: Arc<Semaphore>,
/// Bucket name used by this object store client.
bucket_name: String,
}
impl fmt::Debug for AmazonS3 {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("AmazonS3")
.field("client", &"rusoto_s3::S3Client")
.field("bucket_name", &self.bucket_name)
.finish()
}
}
impl fmt::Display for AmazonS3 {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "AmazonS3({})", self.bucket_name)
}
}
#[async_trait]
impl ObjectStoreApi for AmazonS3 {
async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
let bucket_name = self.bucket_name.clone();
let key = location.to_raw();
let request_factory = move || {
let bytes = bytes.clone();
let length = bytes.len();
let stream_data = std::io::Result::Ok(bytes);
let stream = futures::stream::once(async move { stream_data });
let byte_stream = ByteStream::new_with_size(stream, length);
rusoto_s3::PutObjectRequest {
bucket: bucket_name.clone(),
key: key.to_string(),
body: Some(byte_stream),
..Default::default()
}
};
let s3 = self.client().await;
s3_request(move || {
let (s3, request_factory) = (s3.clone(), request_factory.clone());
async move { s3.put_object(request_factory()).await }
})
.await
.context(UnableToPutDataSnafu {
bucket: &self.bucket_name,
path: location.to_raw(),
})?;
Ok(())
}
async fn get(&self, location: &Path) -> Result<GetResult> {
let key = location.to_raw().to_string();
let get_request = rusoto_s3::GetObjectRequest {
bucket: self.bucket_name.clone(),
key: key.clone(),
..Default::default()
};
let bucket_name = self.bucket_name.clone();
let s = self
.client()
.await
.get_object(get_request)
.await
.map_err(|e| match e {
rusoto_core::RusotoError::Service(rusoto_s3::GetObjectError::NoSuchKey(_)) => {
Error::NotFound {
path: key.clone(),
source: e.into(),
}
}
_ => Error::UnableToGetData {
bucket: self.bucket_name.to_owned(),
path: key.clone(),
source: e,
},
})?
.body
.context(NoDataSnafu {
bucket: self.bucket_name.to_owned(),
path: key.clone(),
})?
.map_err(move |source| Error::UnableToGetPieceOfData {
source,
bucket: bucket_name.clone(),
path: key.clone(),
})
.err_into()
.boxed();
Ok(GetResult::Stream(s))
}
async fn head(&self, location: &Path) -> Result<ObjectMeta> {
let key = location.to_raw().to_string();
let head_request = rusoto_s3::HeadObjectRequest {
bucket: self.bucket_name.clone(),
key: key.clone(),
..Default::default()
};
let s = self
.client()
.await
.head_object(head_request)
.await
.map_err(|e| match e {
rusoto_core::RusotoError::Service(rusoto_s3::HeadObjectError::NoSuchKey(_)) => {
Error::NotFound {
path: key.clone(),
source: e.into(),
}
}
rusoto_core::RusotoError::Unknown(h) if h.status.as_u16() == 404 => {
Error::NotFound {
path: key.clone(),
source: "resource not found".into(),
}
}
_ => Error::UnableToHeadData {
bucket: self.bucket_name.to_owned(),
path: key.clone(),
source: e,
},
})?;
// Note: GetObject and HeadObject return a different date format from ListObjects
//
// S3 List returns timestamps in the form
// <LastModified>2013-09-17T18:07:53.000Z</LastModified>
// S3 GetObject returns timestamps in the form
// Last-Modified: Sun, 1 Jan 2006 12:00:00 GMT
let last_modified = match s.last_modified {
Some(lm) => DateTime::parse_from_rfc2822(&lm)
.context(UnableToParseLastModifiedSnafu {
bucket: &self.bucket_name,
})?
.with_timezone(&Utc),
None => Utc::now(),
};
Ok(ObjectMeta {
last_modified,
location: location.clone(),
size: usize::try_from(s.content_length.unwrap_or(0))
.expect("unsupported size on this platform"),
})
}
async fn delete(&self, location: &Path) -> Result<()> {
let key = location.to_raw();
let bucket_name = self.bucket_name.clone();
let request_factory = move || rusoto_s3::DeleteObjectRequest {
bucket: bucket_name.clone(),
key: key.to_string(),
..Default::default()
};
let s3 = self.client().await;
s3_request(move || {
let (s3, request_factory) = (s3.clone(), request_factory.clone());
async move { s3.delete_object(request_factory()).await }
})
.await
.context(UnableToDeleteDataSnafu {
bucket: &self.bucket_name,
path: location.to_raw(),
})?;
Ok(())
}
async fn list<'a>(
&'a self,
prefix: Option<&'a Path>,
) -> Result<BoxStream<'a, Result<ObjectMeta>>> {
Ok(self
.list_objects_v2(prefix, None)
.await?
.map_ok(move |list_objects_v2_result| {
let contents = list_objects_v2_result.contents.unwrap_or_default();
let iter = contents
.into_iter()
.map(|object| convert_object_meta(object, &self.bucket_name));
futures::stream::iter(iter)
})
.try_flatten()
.boxed())
}
async fn list_with_delimiter(&self, prefix: &Path) -> Result<ListResult> {
Ok(self
.list_objects_v2(Some(prefix), Some(DELIMITER.to_string()))
.await?
.try_fold(
ListResult {
next_token: None,
common_prefixes: vec![],
objects: vec![],
},
|acc, list_objects_v2_result| async move {
let mut res = acc;
let contents = list_objects_v2_result.contents.unwrap_or_default();
let mut objects = contents
.into_iter()
.map(|object| convert_object_meta(object, &self.bucket_name))
.collect::<Result<Vec<_>>>()?;
res.objects.append(&mut objects);
res.common_prefixes.extend(
list_objects_v2_result
.common_prefixes
.unwrap_or_default()
.into_iter()
.map(|p| {
Path::from_raw(
p.prefix.expect("can't have a prefix without a value"),
)
}),
);
Ok(res)
},
)
.await?)
}
}
fn convert_object_meta(object: rusoto_s3::Object, bucket: &str) -> Result<ObjectMeta> {
let location = Path::from_raw(object.key.expect("object doesn't exist without a key"));
let last_modified = match object.last_modified {
Some(lm) => DateTime::parse_from_rfc3339(&lm)
.context(UnableToParseLastModifiedSnafu { bucket })?
.with_timezone(&Utc),
None => Utc::now(),
};
let size =
usize::try_from(object.size.unwrap_or(0)).expect("unsupported size on this platform");
Ok(ObjectMeta {
location,
last_modified,
size,
})
}
/// Configure a connection to Amazon S3 using the specified credentials in
/// the specified Amazon region and bucket.
#[allow(clippy::too_many_arguments)]
pub fn new_s3(
access_key_id: Option<impl Into<String>>,
secret_access_key: Option<impl Into<String>>,
region: impl Into<String>,
bucket_name: impl Into<String>,
endpoint: Option<impl Into<String>>,
session_token: Option<impl Into<String>>,
max_connections: NonZeroUsize,
allow_http: bool,
) -> Result<AmazonS3> {
let region = region.into();
let region: rusoto_core::Region = match endpoint {
None => region.parse().context(InvalidRegionSnafu { region })?,
Some(endpoint) => rusoto_core::Region::Custom {
name: region,
endpoint: endpoint.into(),
},
};
let mut builder = HyperBuilder::default();
builder.pool_max_idle_per_host(max_connections.get());
let connector = if allow_http {
hyper_rustls::HttpsConnectorBuilder::new()
.with_webpki_roots()
.https_or_http()
.enable_http1()
.enable_http2()
.build()
} else {
hyper_rustls::HttpsConnectorBuilder::new()
.with_webpki_roots()
.https_only()
.enable_http1()
.enable_http2()
.build()
};
let http_client = rusoto_core::request::HttpClient::from_builder(builder, connector);
let client = match (access_key_id, secret_access_key, session_token) {
(Some(access_key_id), Some(secret_access_key), Some(session_token)) => {
let credentials_provider = StaticProvider::new(
access_key_id.into(),
secret_access_key.into(),
Some(session_token.into()),
None,
);
rusoto_s3::S3Client::new_with(http_client, credentials_provider, region)
}
(Some(access_key_id), Some(secret_access_key), None) => {
let credentials_provider =
StaticProvider::new_minimal(access_key_id.into(), secret_access_key.into());
rusoto_s3::S3Client::new_with(http_client, credentials_provider, region)
}
(None, Some(_), _) => return Err(Error::MissingAccessKey.into()),
(Some(_), None, _) => return Err(Error::MissingSecretAccessKey.into()),
_ => {
let credentials_provider = InstanceMetadataProvider::new();
rusoto_s3::S3Client::new_with(http_client, credentials_provider, region)
}
};
Ok(AmazonS3 {
client_unrestricted: client,
connection_semaphore: Arc::new(Semaphore::new(max_connections.get())),
bucket_name: bucket_name.into(),
})
}
/// Create a new [`AmazonS3`] that always errors
pub fn new_failing_s3() -> Result<AmazonS3> {
new_s3(
Some("foo"),
Some("bar"),
"us-east-1",
"bucket",
None as Option<&str>,
None as Option<&str>,
NonZeroUsize::new(16).unwrap(),
true,
)
}
/// S3 client bundled w/ a semaphore permit.
#[derive(Clone)]
struct SemaphoreClient {
/// Permit for this specific use of the client.
///
/// Note that this field is never read and therefore considered "dead code" by rustc.
#[allow(dead_code)]
permit: Arc<OwnedSemaphorePermit>,
inner: rusoto_s3::S3Client,
}
impl Deref for SemaphoreClient {
type Target = rusoto_s3::S3Client;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl AmazonS3 {
/// Get a client according to the current connection limit.
async fn client(&self) -> SemaphoreClient {
let permit = Arc::clone(&self.connection_semaphore)
.acquire_owned()
.await
.expect("semaphore shouldn't be closed yet");
SemaphoreClient {
permit: Arc::new(permit),
inner: self.client_unrestricted.clone(),
}
}
async fn list_objects_v2(
&self,
prefix: Option<&Path>,
delimiter: Option<String>,
) -> Result<BoxStream<'_, Result<rusoto_s3::ListObjectsV2Output>>> {
#[derive(Clone)]
enum ListState {
Start,
HasMore(String),
Done,
}
use ListState::*;
let raw_prefix = prefix.map(|p| format!("{}{}", p.to_raw(), DELIMITER));
let bucket = self.bucket_name.clone();
let request_factory = move || rusoto_s3::ListObjectsV2Request {
bucket,
prefix: raw_prefix,
delimiter,
..Default::default()
};
let s3 = self.client().await;
Ok(stream::unfold(ListState::Start, move |state| {
let request_factory = request_factory.clone();
let s3 = s3.clone();
async move {
let continuation_token = match state.clone() {
HasMore(continuation_token) => Some(continuation_token),
Done => {
return None;
}
// If this is the first request we've made, we don't need to make any
// modifications to the request
Start => None,
};
let resp = s3_request(move || {
let (s3, request_factory, continuation_token) = (
s3.clone(),
request_factory.clone(),
continuation_token.clone(),
);
async move {
s3.list_objects_v2(rusoto_s3::ListObjectsV2Request {
continuation_token,
..request_factory()
})
.await
}
})
.await;
let resp = match resp {
Ok(resp) => resp,
Err(e) => return Some((Err(e), state)),
};
// The AWS response contains a field named `is_truncated` as well as
// `next_continuation_token`, and we're assuming that `next_continuation_token`
// is only set when `is_truncated` is true (and therefore not
// checking `is_truncated`).
let next_state =
if let Some(next_continuation_token) = &resp.next_continuation_token {
ListState::HasMore(next_continuation_token.to_string())
} else {
ListState::Done
};
Some((Ok(resp), next_state))
}
})
.map_err(move |e| {
Error::UnableToListData {
source: e,
bucket: self.bucket_name.clone(),
}
.into()
})
.boxed())
}
}
/// Handles retrying a request to S3 up to `MAX_NUM_RETRIES` times if S3 returns 5xx server errors.
///
/// The `future_factory` argument is a function `F` that takes no arguments and, when called, will
/// return a `Future` (type `G`) that, when `await`ed, will perform a request to S3 through
/// `rusoto` and return a `Result` that returns some type `R` on success and some
/// `rusoto_core::RusotoError<E>` on error.
///
/// If the executed `Future` returns success, this function will return that success.
/// If the executed `Future` returns a 5xx server error, this function will wait an amount of
/// time that increases exponentially with the number of times it has retried, get a new `Future` by
/// calling `future_factory` again, and retry the request by `await`ing the `Future` again.
/// The retries will continue until the maximum number of retries has been attempted. In that case,
/// this function will return the last encountered error.
///
/// Client errors (4xx) will never be retried by this function.
async fn s3_request<E, F, G, R>(future_factory: F) -> Result<R, rusoto_core::RusotoError<E>>
where
E: std::error::Error + Send,
F: Fn() -> G + Send,
G: Future<Output = Result<R, rusoto_core::RusotoError<E>>> + Send,
R: Send,
{
let mut attempts = 0;
loop {
let request = future_factory();
let result = request.await;
match result {
Ok(r) => return Ok(r),
Err(error) => {
attempts += 1;
let should_retry = matches!(
error,
rusoto_core::RusotoError::Unknown(ref response)
if response.status.is_server_error()
);
if attempts > MAX_NUM_RETRIES {
warn!(
?error,
attempts, "maximum number of retries exceeded for AWS S3 request"
);
return Err(error);
} else if !should_retry {
return Err(error);
} else {
debug!(?error, attempts, "retrying AWS S3 request");
let wait_time = Duration::from_millis(2u64.pow(attempts) * 50);
tokio::time::sleep(wait_time).await;
}
}
}
}
}
impl Error {
#[cfg(test)]
fn s3_error_due_to_credentials(&self) -> bool {
use rusoto_core::RusotoError;
use Error::*;
matches!(
self,
UnableToPutData {
source: RusotoError::Credentials(_),
bucket: _,
path: _,
} | UnableToGetData {
source: RusotoError::Credentials(_),
bucket: _,
path: _,
} | UnableToDeleteData {
source: RusotoError::Credentials(_),
bucket: _,
path: _,
} | UnableToListData {
source: RusotoError::Credentials(_),
bucket: _,
}
)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
tests::{
get_nonexistent_object, list_uses_directories_correctly, list_with_delimiter,
put_get_delete_list,
},
Error as ObjectStoreError, ObjectStoreApi,
};
use bytes::Bytes;
use std::env;
type TestError = Box<dyn std::error::Error + Send + Sync + 'static>;
type Result<T, E = TestError> = std::result::Result<T, E>;
const NON_EXISTENT_NAME: &str = "nonexistentname";
#[derive(Debug)]
struct AwsConfig {
access_key_id: String,
secret_access_key: String,
region: String,
bucket: String,
endpoint: Option<String>,
token: Option<String>,
}
// Helper macro to skip tests if TEST_INTEGRATION and the AWS environment variables are not set.
macro_rules! maybe_skip_integration {
() => {{
dotenv::dotenv().ok();
let required_vars = [
"AWS_DEFAULT_REGION",
"OBJECT_STORE_BUCKET",
"AWS_ACCESS_KEY_ID",
"AWS_SECRET_ACCESS_KEY",
];
let unset_vars: Vec<_> = required_vars
.iter()
.filter_map(|&name| match env::var(name) {
Ok(_) => None,
Err(_) => Some(name),
})
.collect();
let unset_var_names = unset_vars.join(", ");
let force = env::var("TEST_INTEGRATION");
if force.is_ok() && !unset_var_names.is_empty() {
panic!(
"TEST_INTEGRATION is set, \
but variable(s) {} need to be set",
unset_var_names
);
} else if force.is_err() {
eprintln!(
"skipping AWS integration test - set {}TEST_INTEGRATION to run",
if unset_var_names.is_empty() {
String::new()
} else {
format!("{} and ", unset_var_names)
}
);
return;
} else {
AwsConfig {
access_key_id: env::var("AWS_ACCESS_KEY_ID")
.expect("already checked AWS_ACCESS_KEY_ID"),
secret_access_key: env::var("AWS_SECRET_ACCESS_KEY")
.expect("already checked AWS_SECRET_ACCESS_KEY"),
region: env::var("AWS_DEFAULT_REGION")
.expect("already checked AWS_DEFAULT_REGION"),
bucket: env::var("OBJECT_STORE_BUCKET")
.expect("already checked OBJECT_STORE_BUCKET"),
endpoint: env::var("AWS_ENDPOINT").ok(),
token: env::var("AWS_SESSION_TOKEN").ok(),
}
}
}};
}
fn check_credentials<T>(r: Result<T>) -> Result<T> {
if let Err(e) = &r {
let e = &**e;
if let Some(e) = e.downcast_ref::<Error>() {
if e.s3_error_due_to_credentials() {
eprintln!(
"Try setting the AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY \
environment variables"
);
}
}
}
r
}
fn make_integration(config: AwsConfig) -> AmazonS3 {
new_s3(
Some(config.access_key_id),
Some(config.secret_access_key),
config.region,
config.bucket,
config.endpoint,
config.token,
NonZeroUsize::new(16).unwrap(),
true,
)
.expect("Valid S3 config")
}
#[tokio::test]
async fn s3_test() {
let config = maybe_skip_integration!();
let integration = make_integration(config);
check_credentials(put_get_delete_list(&integration).await).unwrap();
check_credentials(list_uses_directories_correctly(&integration).await).unwrap();
check_credentials(list_with_delimiter(&integration).await).unwrap();
}
#[tokio::test]
async fn s3_test_get_nonexistent_location() {
let config = maybe_skip_integration!();
let integration = make_integration(config);
let location = Path::from_iter([NON_EXISTENT_NAME]);
let err = get_nonexistent_object(&integration, Some(location))
.await
.unwrap_err();
if let Some(ObjectStoreError::NotFound { path, source }) =
err.downcast_ref::<ObjectStoreError>()
{
let source_variant = source.downcast_ref::<rusoto_core::RusotoError<_>>();
assert!(
matches!(
source_variant,
Some(rusoto_core::RusotoError::Service(
rusoto_s3::GetObjectError::NoSuchKey(_)
)),
),
"got: {:?}",
source_variant
);
assert_eq!(path, NON_EXISTENT_NAME);
} else {
panic!("unexpected error type: {:?}", err);
}
}
#[tokio::test]
async fn s3_test_get_nonexistent_bucket() {
let mut config = maybe_skip_integration!();
config.bucket = NON_EXISTENT_NAME.into();
let integration = make_integration(config);
let location = Path::from_iter([NON_EXISTENT_NAME]);
let err = integration.get(&location).await.unwrap_err().to_string();
assert!(
err.contains("The specified bucket does not exist"),
"{}",
err
)
}
#[tokio::test]
async fn s3_test_put_nonexistent_bucket() {
let mut config = maybe_skip_integration!();
config.bucket = NON_EXISTENT_NAME.into();
let integration = make_integration(config);
let location = Path::from_iter([NON_EXISTENT_NAME]);
let data = Bytes::from("arbitrary data");
let err = integration
.put(&location, data)
.await
.unwrap_err()
.to_string();
assert!(
err.contains("The specified bucket does not exist")
&& err.contains("Unable to PUT data"),
"{}",
err
)
}
#[tokio::test]
async fn s3_test_delete_nonexistent_location() {
let config = maybe_skip_integration!();
let integration = make_integration(config);
let location = Path::from_iter([NON_EXISTENT_NAME]);
integration.delete(&location).await.unwrap();
}
#[tokio::test]
async fn s3_test_delete_nonexistent_bucket() {
let mut config = maybe_skip_integration!();
config.bucket = NON_EXISTENT_NAME.into();
let integration = make_integration(config);
let location = Path::from_iter([NON_EXISTENT_NAME]);
let err = integration.delete(&location).await.unwrap_err().to_string();
assert!(
err.contains("The specified bucket does not exist")
&& err.contains("Unable to DELETE data"),
"{}",
err
)
}
}

View File

@ -1,376 +0,0 @@
//! This module contains the IOx implementation for using Azure Blob storage as
//! the object store.
use crate::{
path::{Path, DELIMITER},
GetResult, ListResult, ObjectMeta, ObjectStoreApi, Result,
};
use async_trait::async_trait;
use azure_core::{prelude::*, HttpClient};
use azure_storage::core::prelude::*;
use azure_storage_blobs::blob::Blob;
use azure_storage_blobs::{
prelude::{AsBlobClient, AsContainerClient, ContainerClient},
DeleteSnapshotsMethod,
};
use bytes::Bytes;
use futures::{
stream::{self, BoxStream},
StreamExt, TryStreamExt,
};
use snafu::{ResultExt, Snafu};
use std::{convert::TryInto, sync::Arc};
/// A specialized `Error` for Azure object store-related errors
#[derive(Debug, Snafu)]
#[allow(missing_docs)]
enum Error {
#[snafu(display("Unable to DELETE data. Location: {}, Error: {}", path, source,))]
Delete {
source: Box<dyn std::error::Error + Send + Sync>,
path: String,
},
#[snafu(display("Unable to GET data. Location: {}, Error: {}", path, source,))]
Get {
source: Box<dyn std::error::Error + Send + Sync>,
path: String,
},
#[snafu(display("Unable to HEAD data. Location: {}, Error: {}", path, source,))]
Head {
source: Box<dyn std::error::Error + Send + Sync>,
path: String,
},
#[snafu(display("Unable to PUT data. Location: {}, Error: {}", path, source,))]
Put {
source: Box<dyn std::error::Error + Send + Sync>,
path: String,
},
#[snafu(display("Unable to list data. Error: {}", source))]
List {
source: Box<dyn std::error::Error + Send + Sync>,
},
#[cfg(not(feature = "azure_test"))]
#[snafu(display(
"Azurite (azure emulator) support not compiled in, please add `azure_test` feature"
))]
NoEmulatorFeature,
}
impl From<Error> for super::Error {
fn from(source: Error) -> Self {
Self::Generic {
store: "Azure Blob Storage",
source: Box::new(source),
}
}
}
/// Configuration for connecting to [Microsoft Azure Blob Storage](https://azure.microsoft.com/en-us/services/storage/blobs/).
#[derive(Debug)]
pub struct MicrosoftAzure {
container_client: Arc<ContainerClient>,
#[allow(dead_code)]
container_name: String,
}
impl std::fmt::Display for MicrosoftAzure {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "MicrosoftAzure({})", self.container_name)
}
}
#[async_trait]
impl ObjectStoreApi for MicrosoftAzure {
async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
let location = location.to_raw();
let bytes = bytes::BytesMut::from(&*bytes);
self.container_client
.as_blob_client(location)
.put_block_blob(bytes)
.execute()
.await
.context(PutSnafu {
path: location.to_owned(),
})?;
Ok(())
}
async fn get(&self, location: &Path) -> Result<GetResult> {
let blob = self
.container_client
.as_blob_client(location.to_raw())
.get()
.execute()
.await
.context(GetSnafu {
path: location.to_raw(),
})?;
Ok(GetResult::Stream(
futures::stream::once(async move { Ok(blob.data) }).boxed(),
))
}
async fn head(&self, location: &Path) -> Result<ObjectMeta> {
let s = self
.container_client
.as_blob_client(location.to_raw())
.get_properties()
.execute()
.await
.context(HeadSnafu {
path: location.to_raw(),
})?;
convert_object_meta(s.blob)
}
async fn delete(&self, location: &Path) -> Result<()> {
let location = location.to_raw();
self.container_client
.as_blob_client(location)
.delete()
.delete_snapshots_method(DeleteSnapshotsMethod::Include)
.execute()
.await
.context(DeleteSnafu {
path: location.to_owned(),
})?;
Ok(())
}
async fn list<'a>(
&'a self,
prefix: Option<&'a Path>,
) -> Result<BoxStream<'a, Result<ObjectMeta>>> {
#[derive(Clone)]
enum ListState {
Start,
HasMore(String),
Done,
}
Ok(stream::unfold(ListState::Start, move |state| async move {
let mut request = self.container_client.list_blobs();
let prefix_raw = prefix.map(|p| format!("{}{}", p.to_raw(), DELIMITER));
if let Some(ref p) = prefix_raw {
request = request.prefix(p as &str);
}
match state {
ListState::HasMore(ref marker) => {
request = request.next_marker(marker as &str);
}
ListState::Done => {
return None;
}
ListState::Start => {}
}
let resp = match request.execute().await.context(ListSnafu) {
Ok(resp) => resp,
Err(err) => return Some((Err(crate::Error::from(err)), state)),
};
let next_state = if let Some(marker) = resp.next_marker {
ListState::HasMore(marker.as_str().to_string())
} else {
ListState::Done
};
let names = resp.blobs.blobs.into_iter().map(convert_object_meta);
Some((Ok(futures::stream::iter(names)), next_state))
})
.try_flatten()
.boxed())
}
async fn list_with_delimiter(&self, prefix: &Path) -> Result<ListResult> {
let mut request = self.container_client.list_blobs();
let prefix_raw = format!("{}{}", prefix, DELIMITER);
request = request.delimiter(Delimiter::new(DELIMITER));
request = request.prefix(&*prefix_raw);
let resp = request.execute().await.context(ListSnafu)?;
let next_token = resp.next_marker.as_ref().map(|m| m.as_str().to_string());
let common_prefixes = resp
.blobs
.blob_prefix
.map(|prefixes| {
prefixes
.iter()
.map(|prefix| Path::from_raw(&prefix.name))
.collect()
})
.unwrap_or_else(Vec::new);
let objects = resp
.blobs
.blobs
.into_iter()
.map(convert_object_meta)
.collect::<Result<_>>()?;
Ok(ListResult {
next_token,
common_prefixes,
objects,
})
}
}
fn convert_object_meta(blob: Blob) -> Result<ObjectMeta> {
let location = Path::from_raw(blob.name);
let last_modified = blob.properties.last_modified;
let size = blob
.properties
.content_length
.try_into()
.expect("unsupported size on this platform");
Ok(ObjectMeta {
location,
last_modified,
size,
})
}
#[cfg(feature = "azure_test")]
fn check_if_emulator_works() -> Result<()> {
Ok(())
}
#[cfg(not(feature = "azure_test"))]
fn check_if_emulator_works() -> Result<()> {
Err(Error::NoEmulatorFeature.into())
}
/// Configure a connection to container with given name on Microsoft Azure
/// Blob store.
///
/// The credentials `account` and `access_key` must provide access to the
/// store.
pub fn new_azure(
account: impl Into<String>,
access_key: impl Into<String>,
container_name: impl Into<String>,
use_emulator: bool,
) -> Result<MicrosoftAzure> {
let account = account.into();
let access_key = access_key.into();
let http_client: Arc<dyn HttpClient> = Arc::new(reqwest::Client::new());
let storage_account_client = if use_emulator {
check_if_emulator_works()?;
StorageAccountClient::new_emulator_default()
} else {
StorageAccountClient::new_access_key(Arc::clone(&http_client), &account, &access_key)
};
let storage_client = storage_account_client.as_storage_client();
let container_name = container_name.into();
let container_client = storage_client.as_container_client(&container_name);
Ok(MicrosoftAzure {
container_client,
container_name,
})
}
#[cfg(test)]
mod tests {
use crate::azure::new_azure;
use crate::tests::{list_uses_directories_correctly, list_with_delimiter, put_get_delete_list};
use std::env;
#[derive(Debug)]
struct AzureConfig {
storage_account: String,
access_key: String,
bucket: String,
use_emulator: bool,
}
// Helper macro to skip tests if TEST_INTEGRATION and the Azure environment
// variables are not set.
macro_rules! maybe_skip_integration {
() => {{
dotenv::dotenv().ok();
let use_emulator = std::env::var("AZURE_USE_EMULATOR").is_ok();
let mut required_vars = vec!["OBJECT_STORE_BUCKET"];
if !use_emulator {
required_vars.push("AZURE_STORAGE_ACCOUNT");
required_vars.push("AZURE_STORAGE_ACCESS_KEY");
}
let unset_vars: Vec<_> = required_vars
.iter()
.filter_map(|&name| match env::var(name) {
Ok(_) => None,
Err(_) => Some(name),
})
.collect();
let unset_var_names = unset_vars.join(", ");
let force = std::env::var("TEST_INTEGRATION");
if force.is_ok() && !unset_var_names.is_empty() {
panic!(
"TEST_INTEGRATION is set, \
but variable(s) {} need to be set",
unset_var_names
)
} else if force.is_err() {
eprintln!(
"skipping Azure integration test - set {}TEST_INTEGRATION to run",
if unset_var_names.is_empty() {
String::new()
} else {
format!("{} and ", unset_var_names)
}
);
return;
} else {
AzureConfig {
storage_account: env::var("AZURE_STORAGE_ACCOUNT").unwrap_or_default(),
access_key: env::var("AZURE_STORAGE_ACCESS_KEY").unwrap_or_default(),
bucket: env::var("OBJECT_STORE_BUCKET")
.expect("already checked OBJECT_STORE_BUCKET"),
use_emulator,
}
}
}};
}
#[tokio::test]
async fn azure_blob_test() {
let config = maybe_skip_integration!();
let integration = new_azure(
config.storage_account,
config.access_key,
config.bucket,
config.use_emulator,
)
.unwrap();
put_get_delete_list(&integration).await.unwrap();
list_uses_directories_correctly(&integration).await.unwrap();
list_with_delimiter(&integration).await.unwrap();
}
}

View File

@ -1,452 +0,0 @@
//! This module contains the IOx implementation for using local disk as the
//! object store.
use crate::path::{Path, DELIMITER};
use crate::{GetResult, ListResult, ObjectMeta, ObjectStoreApi, Result};
use async_trait::async_trait;
use bytes::Bytes;
use futures::{
stream::{self, BoxStream},
StreamExt,
};
use snafu::{OptionExt, ResultExt, Snafu};
use std::{collections::BTreeSet, convert::TryFrom, io};
use tokio::fs;
use walkdir::{DirEntry, WalkDir};
/// A specialized `Error` for filesystem object store-related errors
#[derive(Debug, Snafu)]
#[allow(missing_docs)]
pub enum Error {
#[snafu(display("File size for {} did not fit in a usize: {}", path, source))]
FileSizeOverflowedUsize {
source: std::num::TryFromIntError,
path: String,
},
#[snafu(display("Unable to walk dir: {}", source))]
UnableToWalkDir {
source: walkdir::Error,
},
#[snafu(display("Unable to access metadata for {}: {}", path, source))]
UnableToAccessMetadata {
source: Box<dyn std::error::Error + Send + Sync + 'static>,
path: String,
},
#[snafu(display("Unable to copy data to file: {}", source))]
UnableToCopyDataToFile {
source: io::Error,
},
#[snafu(display("Unable to create dir {}: {}", path.display(), source))]
UnableToCreateDir {
source: io::Error,
path: std::path::PathBuf,
},
#[snafu(display("Unable to create file {}: {}", path.display(), err))]
UnableToCreateFile {
path: std::path::PathBuf,
err: io::Error,
},
#[snafu(display("Unable to delete file {}: {}", path.display(), source))]
UnableToDeleteFile {
source: io::Error,
path: std::path::PathBuf,
},
#[snafu(display("Unable to open file {}: {}", path.display(), source))]
UnableToOpenFile {
source: io::Error,
path: std::path::PathBuf,
},
#[snafu(display("Unable to read data from file {}: {}", path.display(), source))]
UnableToReadBytes {
source: io::Error,
path: std::path::PathBuf,
},
NotFound {
path: String,
source: io::Error,
},
}
impl From<Error> for super::Error {
fn from(source: Error) -> Self {
match source {
Error::NotFound { path, source } => Self::NotFound {
path,
source: source.into(),
},
_ => Self::Generic {
store: "LocalFileSystem",
source: Box::new(source),
},
}
}
}
/// Local filesystem storage suitable for testing or for opting out of using a
/// cloud storage provider.
#[derive(Debug)]
pub struct LocalFileSystem {
root: std::path::PathBuf,
}
impl std::fmt::Display for LocalFileSystem {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "LocalFileSystem({})", self.root.display())
}
}
#[async_trait]
impl ObjectStoreApi for LocalFileSystem {
async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
let content = bytes::BytesMut::from(&*bytes);
let path = self.path_to_filesystem(location);
let mut file = match fs::File::create(&path).await {
Ok(f) => f,
Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
let parent = path
.parent()
.context(UnableToCreateFileSnafu { path: &path, err })?;
fs::create_dir_all(&parent)
.await
.context(UnableToCreateDirSnafu { path: parent })?;
match fs::File::create(&path).await {
Ok(f) => f,
Err(err) => return Err(Error::UnableToCreateFile { path, err }.into()),
}
}
Err(err) => return Err(Error::UnableToCreateFile { path, err }.into()),
};
tokio::io::copy(&mut &content[..], &mut file)
.await
.context(UnableToCopyDataToFileSnafu)?;
Ok(())
}
async fn get(&self, location: &Path) -> Result<GetResult> {
let (file, path) = self.get_file(location).await?;
Ok(GetResult::File(file, path))
}
async fn head(&self, location: &Path) -> Result<ObjectMeta> {
let (file, _) = self.get_file(location).await?;
let metadata = file
.metadata()
.await
.map_err(|e| Error::UnableToAccessMetadata {
source: e.into(),
path: location.to_string(),
})?;
convert_metadata(metadata, location.clone())
}
async fn delete(&self, location: &Path) -> Result<()> {
let path = self.path_to_filesystem(location);
fs::remove_file(&path)
.await
.context(UnableToDeleteFileSnafu { path })?;
Ok(())
}
async fn list<'a>(
&'a self,
prefix: Option<&'a Path>,
) -> Result<BoxStream<'a, Result<ObjectMeta>>> {
let root_path = match prefix {
Some(prefix) => self.path_to_filesystem(prefix),
None => self.root.to_path_buf(),
};
let walkdir = WalkDir::new(&root_path)
// Don't include the root directory itself
.min_depth(1);
let s =
walkdir.into_iter().flat_map(move |result_dir_entry| {
match convert_walkdir_result(result_dir_entry) {
Err(e) => Some(Err(e)),
Ok(None) => None,
Ok(entry @ Some(_)) => entry
.filter(|dir_entry| dir_entry.file_type().is_file())
.map(|entry| {
let location = self.filesystem_to_path(entry.path()).unwrap();
convert_entry(entry, location)
}),
}
});
Ok(stream::iter(s).boxed())
}
async fn list_with_delimiter(&self, prefix: &Path) -> Result<ListResult> {
let resolved_prefix = self.path_to_filesystem(prefix);
let walkdir = WalkDir::new(&resolved_prefix).min_depth(1).max_depth(1);
let mut common_prefixes = BTreeSet::new();
let mut objects = Vec::new();
for entry_res in walkdir.into_iter().map(convert_walkdir_result) {
if let Some(entry) = entry_res? {
let is_directory = entry.file_type().is_dir();
let entry_location = self.filesystem_to_path(entry.path()).unwrap();
let mut parts = match entry_location.prefix_match(prefix) {
Some(parts) => parts,
None => continue,
};
let common_prefix = match parts.next() {
Some(p) => p,
None => continue,
};
drop(parts);
if is_directory {
common_prefixes.insert(prefix.child(common_prefix));
} else {
objects.push(convert_entry(entry, entry_location)?);
}
}
}
Ok(ListResult {
next_token: None,
common_prefixes: common_prefixes.into_iter().collect(),
objects,
})
}
}
fn convert_entry(entry: DirEntry, location: Path) -> Result<ObjectMeta> {
let metadata = entry
.metadata()
.map_err(|e| Error::UnableToAccessMetadata {
source: e.into(),
path: location.to_string(),
})?;
convert_metadata(metadata, location)
}
fn convert_metadata(metadata: std::fs::Metadata, location: Path) -> Result<ObjectMeta> {
let last_modified = metadata
.modified()
.expect("Modified file time should be supported on this platform")
.into();
let size = usize::try_from(metadata.len()).context(FileSizeOverflowedUsizeSnafu {
path: location.to_raw(),
})?;
Ok(ObjectMeta {
location,
last_modified,
size,
})
}
/// Convert walkdir results and converts not-found errors into `None`.
fn convert_walkdir_result(
res: std::result::Result<walkdir::DirEntry, walkdir::Error>,
) -> Result<Option<walkdir::DirEntry>> {
match res {
Ok(entry) => Ok(Some(entry)),
Err(walkdir_err) => match walkdir_err.io_error() {
Some(io_err) => match io_err.kind() {
io::ErrorKind::NotFound => Ok(None),
_ => Err(Error::UnableToWalkDir {
source: walkdir_err,
}
.into()),
},
None => Err(Error::UnableToWalkDir {
source: walkdir_err,
}
.into()),
},
}
}
impl LocalFileSystem {
/// Create new filesystem storage.
pub fn new(root: impl Into<std::path::PathBuf>) -> Self {
Self { root: root.into() }
}
/// Return filesystem path of the given location
fn path_to_filesystem(&self, location: &Path) -> std::path::PathBuf {
let mut path = self.root.clone();
for component in location.to_raw().split(DELIMITER) {
path.push(component)
}
path.to_path_buf()
}
fn filesystem_to_path(&self, location: &std::path::Path) -> Option<Path> {
let stripped = location.strip_prefix(&self.root).ok()?;
let path = stripped.to_string_lossy();
let split = path.split(std::path::MAIN_SEPARATOR);
Some(Path::from_iter(split))
}
async fn get_file(&self, location: &Path) -> Result<(fs::File, std::path::PathBuf)> {
let path = self.path_to_filesystem(location);
let file = fs::File::open(&path).await.map_err(|e| {
if e.kind() == std::io::ErrorKind::NotFound {
Error::NotFound {
path: location.to_string(),
source: e,
}
} else {
Error::UnableToOpenFile {
path: path.clone(),
source: e,
}
}
})?;
Ok((file, path))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
tests::{
get_nonexistent_object, list_uses_directories_correctly, list_with_delimiter,
put_get_delete_list,
},
Error as ObjectStoreError, ObjectStoreApi,
};
use std::{fs::set_permissions, os::unix::prelude::PermissionsExt};
use tempfile::TempDir;
#[tokio::test]
async fn file_test() {
let root = TempDir::new().unwrap();
let integration = LocalFileSystem::new(root.path());
put_get_delete_list(&integration).await.unwrap();
list_uses_directories_correctly(&integration).await.unwrap();
list_with_delimiter(&integration).await.unwrap();
}
#[tokio::test]
async fn creates_dir_if_not_present() {
let root = TempDir::new().unwrap();
let integration = LocalFileSystem::new(root.path());
let location = Path::from_raw("nested/file/test_file");
let data = Bytes::from("arbitrary data");
let expected_data = data.clone();
integration.put(&location, data).await.unwrap();
let read_data = integration
.get(&location)
.await
.unwrap()
.bytes()
.await
.unwrap();
assert_eq!(&*read_data, expected_data);
}
#[tokio::test]
async fn unknown_length() {
let root = TempDir::new().unwrap();
let integration = LocalFileSystem::new(root.path());
let location = Path::from_raw("some_file");
let data = Bytes::from("arbitrary data");
let expected_data = data.clone();
integration.put(&location, data).await.unwrap();
let read_data = integration
.get(&location)
.await
.unwrap()
.bytes()
.await
.unwrap();
assert_eq!(&*read_data, expected_data);
}
#[tokio::test]
async fn bubble_up_io_errors() {
let root = TempDir::new().unwrap();
// make non-readable
let metadata = root.path().metadata().unwrap();
let mut permissions = metadata.permissions();
permissions.set_mode(0o000);
set_permissions(root.path(), permissions).unwrap();
let store = LocalFileSystem::new(root.path());
// `list` must fail
match store.list(None).await {
Err(_) => {
// ok, error found
}
Ok(mut stream) => {
let mut any_err = false;
while let Some(res) = stream.next().await {
if res.is_err() {
any_err = true;
}
}
assert!(any_err);
}
}
// `list_with_delimiter
assert!(store.list_with_delimiter(&Path::default()).await.is_err());
}
const NON_EXISTENT_NAME: &str = "nonexistentname";
#[tokio::test]
async fn get_nonexistent_location() {
let root = TempDir::new().unwrap();
let integration = LocalFileSystem::new(root.path());
let location = Path::from_raw(NON_EXISTENT_NAME);
let err = get_nonexistent_object(&integration, Some(location))
.await
.unwrap_err();
if let Some(ObjectStoreError::NotFound { path, source }) =
err.downcast_ref::<ObjectStoreError>()
{
let source_variant = source.downcast_ref::<std::io::Error>();
assert!(
matches!(source_variant, Some(std::io::Error { .. }),),
"got: {:?}",
source_variant
);
assert_eq!(path, NON_EXISTENT_NAME);
} else {
panic!("unexpected error type: {:?}", err);
}
}
}

View File

@ -1,461 +0,0 @@
//! This module contains the IOx implementation for using Google Cloud Storage
//! as the object store.
use crate::{
path::{Path, DELIMITER},
GetResult, ListResult, ObjectMeta, ObjectStoreApi, Result,
};
use async_trait::async_trait;
use bytes::Bytes;
use cloud_storage::{Client, Object};
use futures::{stream::BoxStream, StreamExt, TryStreamExt};
use snafu::{ResultExt, Snafu};
use std::{convert::TryFrom, env};
/// A specialized `Error` for Google Cloud Storage object store-related errors
#[derive(Debug, Snafu)]
#[allow(missing_docs)]
pub enum Error {
#[snafu(display("Expected streamed data to have length {}, got {}", expected, actual))]
DataDoesNotMatchLength { expected: usize, actual: usize },
#[snafu(display(
"Unable to PUT data. Bucket: {}, Location: {}, Error: {}",
bucket,
path,
source
))]
UnableToPutData {
source: cloud_storage::Error,
bucket: String,
path: String,
},
#[snafu(display("Unable to list data. Bucket: {}, Error: {}", bucket, source,))]
UnableToListData {
source: cloud_storage::Error,
bucket: String,
},
#[snafu(display("Unable to stream list data. Bucket: {}, Error: {}", bucket, source,))]
UnableToStreamListData {
source: cloud_storage::Error,
bucket: String,
},
#[snafu(display(
"Unable to DELETE data. Bucket: {}, Location: {}, Error: {}",
bucket,
path,
source,
))]
UnableToDeleteData {
source: cloud_storage::Error,
bucket: String,
path: String,
},
#[snafu(display(
"Unable to GET data. Bucket: {}, Location: {}, Error: {}",
bucket,
path,
source,
))]
UnableToGetData {
source: cloud_storage::Error,
bucket: String,
path: String,
},
#[snafu(display(
"Unable to GET data. Bucket: {}, Location: {}, Error: {}",
bucket,
path,
source,
))]
UnableToHeadData {
source: cloud_storage::Error,
bucket: String,
path: String,
},
NotFound {
path: String,
source: cloud_storage::Error,
},
}
impl From<Error> for super::Error {
fn from(source: Error) -> Self {
match source {
Error::NotFound { path, source } => Self::NotFound {
path,
source: source.into(),
},
_ => Self::Generic {
store: "GCS",
source: Box::new(source),
},
}
}
}
/// Configuration for connecting to [Google Cloud Storage](https://cloud.google.com/storage/).
#[derive(Debug)]
pub struct GoogleCloudStorage {
client: Client,
bucket_name: String,
}
impl std::fmt::Display for GoogleCloudStorage {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "GoogleCloudStorage({})", self.bucket_name)
}
}
#[async_trait]
impl ObjectStoreApi for GoogleCloudStorage {
async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
let location = location.to_raw();
let bucket_name = self.bucket_name.clone();
self.client
.object()
.create(
&bucket_name,
bytes.to_vec(),
location,
"application/octet-stream",
)
.await
.context(UnableToPutDataSnafu {
bucket: &self.bucket_name,
path: location,
})?;
Ok(())
}
async fn get(&self, location: &Path) -> Result<GetResult> {
let location = location.to_raw();
let bucket_name = self.bucket_name.clone();
let bytes = self
.client
.object()
.download(&bucket_name, location)
.await
.map_err(|e| match e {
cloud_storage::Error::Other(ref text) if text.starts_with("No such object") => {
Error::NotFound {
path: location.to_string(),
source: e,
}
}
_ => Error::UnableToGetData {
bucket: bucket_name.clone(),
path: location.to_string(),
source: e,
},
})?;
let s = futures::stream::once(async move { Ok(bytes.into()) }).boxed();
Ok(GetResult::Stream(s))
}
async fn head(&self, location: &Path) -> Result<ObjectMeta> {
let object = self
.client
.object()
.read(&self.bucket_name, location.to_raw())
.await
.map_err(|e| match e {
cloud_storage::Error::Google(ref error) if error.error.code == 404 => {
Error::NotFound {
path: location.to_string(),
source: e,
}
}
_ => Error::UnableToHeadData {
bucket: self.bucket_name.clone(),
path: location.to_string(),
source: e,
},
})?;
Ok(convert_object_meta(&object))
}
async fn delete(&self, location: &Path) -> Result<()> {
let location = location.to_raw();
let bucket_name = self.bucket_name.clone();
self.client
.object()
.delete(&bucket_name, location)
.await
.context(UnableToDeleteDataSnafu {
bucket: &self.bucket_name,
path: location,
})?;
Ok(())
}
async fn list<'a>(
&'a self,
prefix: Option<&'a Path>,
) -> Result<BoxStream<'a, Result<ObjectMeta>>> {
let converted_prefix = prefix.map(|p| format!("{}{}", p.to_raw(), DELIMITER));
let list_request = cloud_storage::ListRequest {
prefix: converted_prefix,
..Default::default()
};
let object_lists = self
.client
.object()
.list(&self.bucket_name, list_request)
.await
.context(UnableToListDataSnafu {
bucket: &self.bucket_name,
})?;
let bucket_name = self.bucket_name.clone();
let objects = object_lists
.map_ok(move |list| {
futures::stream::iter(list.items.into_iter().map(|o| Ok(convert_object_meta(&o))))
})
.map_err(move |source| {
crate::Error::from(Error::UnableToStreamListData {
source,
bucket: bucket_name.clone(),
})
});
Ok(objects.try_flatten().boxed())
}
async fn list_with_delimiter(&self, prefix: &Path) -> Result<ListResult> {
let converted_prefix = format!("{}{}", prefix, DELIMITER);
let list_request = cloud_storage::ListRequest {
prefix: Some(converted_prefix),
delimiter: Some(DELIMITER.to_string()),
..Default::default()
};
let mut object_lists = Box::pin(
self.client
.object()
.list(&self.bucket_name, list_request)
.await
.context(UnableToListDataSnafu {
bucket: &self.bucket_name,
})?,
);
let result = match object_lists.next().await {
None => ListResult {
objects: vec![],
common_prefixes: vec![],
next_token: None,
},
Some(list_response) => {
let list_response = list_response.context(UnableToStreamListDataSnafu {
bucket: &self.bucket_name,
})?;
ListResult {
objects: list_response
.items
.iter()
.map(convert_object_meta)
.collect(),
common_prefixes: list_response.prefixes.iter().map(Path::from_raw).collect(),
next_token: list_response.next_page_token,
}
}
};
Ok(result)
}
}
fn convert_object_meta(object: &Object) -> ObjectMeta {
let location = Path::from_raw(&object.name);
let last_modified = object.updated;
let size = usize::try_from(object.size).expect("unsupported size on this platform");
ObjectMeta {
location,
last_modified,
size,
}
}
/// Configure a connection to Google Cloud Storage.
pub fn new_gcs(
service_account_path: impl AsRef<std::ffi::OsStr>,
bucket_name: impl Into<String>,
) -> Result<GoogleCloudStorage> {
// The cloud storage crate currently only supports authentication via
// environment variables. Set the environment variable explicitly so
// that we can optionally accept command line arguments instead.
env::set_var("SERVICE_ACCOUNT", service_account_path);
Ok(GoogleCloudStorage {
client: Default::default(),
bucket_name: bucket_name.into(),
})
}
#[cfg(test)]
mod test {
use super::*;
use crate::{
tests::{
get_nonexistent_object, list_uses_directories_correctly, list_with_delimiter,
put_get_delete_list,
},
Error as ObjectStoreError, ObjectStoreApi,
};
use bytes::Bytes;
use std::env;
const NON_EXISTENT_NAME: &str = "nonexistentname";
#[derive(Debug)]
struct GoogleCloudConfig {
bucket: String,
service_account: String,
}
// Helper macro to skip tests if TEST_INTEGRATION and the GCP environment variables are not set.
macro_rules! maybe_skip_integration {
() => {{
dotenv::dotenv().ok();
let required_vars = ["OBJECT_STORE_BUCKET", "GOOGLE_SERVICE_ACCOUNT"];
let unset_vars: Vec<_> = required_vars
.iter()
.filter_map(|&name| match env::var(name) {
Ok(_) => None,
Err(_) => Some(name),
})
.collect();
let unset_var_names = unset_vars.join(", ");
let force = std::env::var("TEST_INTEGRATION");
if force.is_ok() && !unset_var_names.is_empty() {
panic!(
"TEST_INTEGRATION is set, \
but variable(s) {} need to be set",
unset_var_names
)
} else if force.is_err() {
eprintln!(
"skipping Google Cloud integration test - set {}TEST_INTEGRATION to run",
if unset_var_names.is_empty() {
String::new()
} else {
format!("{} and ", unset_var_names)
}
);
return;
} else {
GoogleCloudConfig {
bucket: env::var("OBJECT_STORE_BUCKET")
.expect("already checked OBJECT_STORE_BUCKET"),
service_account: env::var("GOOGLE_SERVICE_ACCOUNT")
.expect("already checked GOOGLE_SERVICE_ACCOUNT"),
}
}
}};
}
#[tokio::test]
async fn gcs_test() {
let config = maybe_skip_integration!();
let integration = new_gcs(config.service_account, config.bucket).unwrap();
put_get_delete_list(&integration).await.unwrap();
list_uses_directories_correctly(&integration).await.unwrap();
list_with_delimiter(&integration).await.unwrap();
}
#[tokio::test]
async fn gcs_test_get_nonexistent_location() {
let config = maybe_skip_integration!();
let integration = new_gcs(config.service_account, &config.bucket).unwrap();
let location = Path::from_iter([NON_EXISTENT_NAME]);
let err = integration.get(&location).await.unwrap_err();
if let ObjectStoreError::NotFound { path, source } = err {
let source_variant = source.downcast_ref::<cloud_storage::Error>();
assert!(
matches!(source_variant, Some(cloud_storage::Error::Other(_))),
"got: {:?}",
source_variant
);
assert_eq!(path, NON_EXISTENT_NAME);
} else {
panic!("unexpected error type: {:?}", err)
}
}
#[tokio::test]
async fn gcs_test_get_nonexistent_bucket() {
let mut config = maybe_skip_integration!();
config.bucket = NON_EXISTENT_NAME.into();
let integration = new_gcs(config.service_account, &config.bucket).unwrap();
let location = Path::from_iter([NON_EXISTENT_NAME]);
let err = get_nonexistent_object(&integration, Some(location))
.await
.unwrap_err()
.to_string();
assert!(err.contains("Unable to GET data"), "{}", err)
}
#[tokio::test]
async fn gcs_test_delete_nonexistent_location() {
let config = maybe_skip_integration!();
let integration = new_gcs(config.service_account, &config.bucket).unwrap();
let location = Path::from_iter([NON_EXISTENT_NAME]);
let err = integration.delete(&location).await.unwrap_err().to_string();
assert!(err.contains("Unable to DELETE data"), "{}", err)
}
#[tokio::test]
async fn gcs_test_delete_nonexistent_bucket() {
let mut config = maybe_skip_integration!();
config.bucket = NON_EXISTENT_NAME.into();
let integration = new_gcs(config.service_account, &config.bucket).unwrap();
let location = Path::from_iter([NON_EXISTENT_NAME]);
let err = integration.delete(&location).await.unwrap_err().to_string();
assert!(err.contains("Unable to DELETE data"), "{}", err)
}
#[tokio::test]
async fn gcs_test_put_nonexistent_bucket() {
let mut config = maybe_skip_integration!();
config.bucket = NON_EXISTENT_NAME.into();
let integration = new_gcs(config.service_account, &config.bucket).unwrap();
let location = Path::from_iter([NON_EXISTENT_NAME]);
let data = Bytes::from("arbitrary data");
let err = integration
.put(&location, data)
.await
.unwrap_err()
.to_string();
assert!(err.contains("Unable to PUT data"), "{}", err)
}
}

View File

@ -1,406 +0,0 @@
#![deny(rustdoc::broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
#![warn(
missing_copy_implementations,
missing_debug_implementations,
missing_docs,
clippy::explicit_iter_loop,
clippy::future_not_send,
clippy::use_self,
clippy::clone_on_ref_ptr
)]
//! # object_store
//!
//! This crate provides APIs for interacting with object storage services. It
//! currently supports PUT, GET, DELETE, and list for Google Cloud Storage,
//! Amazon S3, in-memory and local file storage.
//!
//! Future compatibility will include Azure Blob Storage, Minio, and Ceph.
#[cfg(feature = "aws")]
pub mod aws;
#[cfg(feature = "azure")]
pub mod azure;
pub mod disk;
#[cfg(feature = "gcp")]
pub mod gcp;
pub mod memory;
pub mod path;
pub mod throttle;
use crate::path::Path;
use async_trait::async_trait;
use bytes::Bytes;
use chrono::{DateTime, Utc};
use futures::{stream::BoxStream, StreamExt, TryStreamExt};
use snafu::Snafu;
use std::fmt::{Debug, Formatter};
/// An alias for a dynamically dispatched object store implementation.
pub type DynObjectStore = dyn ObjectStoreApi;
/// Universal API to multiple object store services.
#[async_trait]
pub trait ObjectStoreApi: std::fmt::Display + Send + Sync + Debug + 'static {
/// Save the provided bytes to the specified location.
async fn put(&self, location: &Path, bytes: Bytes) -> Result<()>;
/// Return the bytes that are stored at the specified location.
async fn get(&self, location: &Path) -> Result<GetResult>;
/// Return the metadata for the specified location
async fn head(&self, location: &Path) -> Result<ObjectMeta>;
/// Delete the object at the specified location.
async fn delete(&self, location: &Path) -> Result<()>;
/// List all the objects with the given prefix.
///
/// Prefixes are evaluated on a path segment basis, i.e. `foo/bar/` is a prefix of `foo/bar/x` but not of
/// `foo/bar_baz/x`.
async fn list<'a>(
&'a self,
prefix: Option<&'a Path>,
) -> Result<BoxStream<'a, Result<ObjectMeta>>>;
/// List objects with the given prefix and an implementation specific
/// delimiter. Returns common prefixes (directories) in addition to object
/// metadata.
///
/// Prefixes are evaluated on a path segment basis, i.e. `foo/bar/` is a prefix of `foo/bar/x` but not of
/// `foo/bar_baz/x`.
async fn list_with_delimiter(&self, prefix: &Path) -> Result<ListResult>;
}
/// Result of a list call that includes objects, prefixes (directories) and a
/// token for the next set of results. Individual result sets may be limited to
/// 1,000 objects based on the underlying object storage's limitations.
#[derive(Debug)]
pub struct ListResult {
/// Token passed to the API for the next page of list results.
pub next_token: Option<String>,
/// Prefixes that are common (like directories)
pub common_prefixes: Vec<Path>,
/// Object metadata for the listing
pub objects: Vec<ObjectMeta>,
}
/// The metadata that describes an object.
#[derive(Debug)]
pub struct ObjectMeta {
/// The full path to the object
pub location: Path,
/// The last modified time
pub last_modified: DateTime<Utc>,
/// The size in bytes of the object
pub size: usize,
}
/// Result for a get request
pub enum GetResult {
/// A file
File(tokio::fs::File, std::path::PathBuf),
/// An asynchronous stream
Stream(BoxStream<'static, Result<Bytes>>),
}
impl Debug for GetResult {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
GetResult::File(_, _) => write!(f, "GetResult(File)"),
GetResult::Stream(_) => write!(f, "GetResult(Stream)"),
}
}
}
impl GetResult {
/// Collects the data into a [`Vec<u8>`]
pub async fn bytes(self) -> Result<Vec<u8>> {
let mut stream = self.into_stream();
let mut bytes = Vec::new();
while let Some(next) = stream.next().await {
bytes.extend_from_slice(next?.as_ref())
}
Ok(bytes)
}
/// Converts this into a byte stream
pub fn into_stream(self) -> BoxStream<'static, Result<Bytes>> {
match self {
Self::File(file, path) => {
tokio_util::codec::FramedRead::new(file, tokio_util::codec::BytesCodec::new())
.map_ok(|b| b.freeze())
.map_err(move |source| {
disk::Error::UnableToReadBytes {
source,
path: path.clone(),
}
.into()
})
.boxed()
}
Self::Stream(s) => s,
}
}
}
/// A specialized `Result` for object store-related errors
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// A specialized `Error` for object store-related errors
#[derive(Debug, Snafu)]
#[allow(missing_docs)]
pub enum Error {
#[snafu(display("Generic {} error: {}", store, source))]
Generic {
store: &'static str,
source: Box<dyn std::error::Error + Send + Sync + 'static>,
},
#[snafu(display("Object at location {} not found: {}", path, source))]
NotFound {
path: String,
source: Box<dyn std::error::Error + Send + Sync + 'static>,
},
}
/// Convenience functions for object stores that are only appropriate to use in tests. Not marked
/// with cfg(test) to make this accessible to other crates.
#[async_trait]
pub trait ObjectStoreTestConvenience {
/// A convenience function for getting all results from a list operation without a prefix. Only
/// appropriate for tests because production code should handle the stream of potentially a
/// large number of returned paths.
async fn list_all(&self) -> Result<Vec<ObjectMeta>>;
}
#[async_trait]
impl ObjectStoreTestConvenience for dyn ObjectStoreApi {
async fn list_all(&self) -> Result<Vec<ObjectMeta>> {
self.list(None).await?.try_collect().await
}
}
#[cfg(test)]
mod tests {
use super::*;
type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
type Result<T, E = Error> = std::result::Result<T, E>;
async fn flatten_list_stream(
storage: &DynObjectStore,
prefix: Option<&Path>,
) -> super::Result<Vec<Path>> {
storage
.list(prefix)
.await?
.map_ok(|meta| meta.location)
.try_collect::<Vec<Path>>()
.await
}
pub(crate) async fn put_get_delete_list(storage: &DynObjectStore) -> Result<()> {
delete_fixtures(storage).await;
let content_list = flatten_list_stream(storage, None).await?;
assert!(
content_list.is_empty(),
"Expected list to be empty; found: {:?}",
content_list
);
let location = Path::from_raw("test_dir/test_file.json");
let data = Bytes::from("arbitrary data");
let expected_data = data.clone();
storage.put(&location, data).await?;
// List everything
let content_list = flatten_list_stream(storage, None).await?;
assert_eq!(content_list, &[location.clone()]);
// List everything starting with a prefix that should return results
let prefix = Path::from_raw("test_dir");
let content_list = flatten_list_stream(storage, Some(&prefix)).await?;
assert_eq!(content_list, &[location.clone()]);
// List everything starting with a prefix that shouldn't return results
let prefix = Path::from_raw("something");
let content_list = flatten_list_stream(storage, Some(&prefix)).await?;
assert!(content_list.is_empty());
let read_data = storage.get(&location).await?.bytes().await?;
assert_eq!(&*read_data, expected_data);
let head = storage.head(&location).await?;
assert_eq!(head.size, expected_data.len());
storage.delete(&location).await?;
let content_list = flatten_list_stream(storage, None).await?;
assert!(content_list.is_empty());
// Azure doesn't report semantic errors
let is_azure = storage.to_string().starts_with("MicrosoftAzure");
let err = storage.get(&location).await.unwrap_err();
assert!(
matches!(err, crate::Error::NotFound { .. }) || is_azure,
"{}",
err
);
let err = storage.head(&location).await.unwrap_err();
assert!(
matches!(err, crate::Error::NotFound { .. }) || is_azure,
"{}",
err
);
Ok(())
}
pub(crate) async fn list_uses_directories_correctly(storage: &DynObjectStore) -> Result<()> {
delete_fixtures(storage).await;
let content_list = flatten_list_stream(storage, None).await?;
assert!(
content_list.is_empty(),
"Expected list to be empty; found: {:?}",
content_list
);
let location1 = Path::from_raw("foo/x.json");
let location2 = Path::from_raw("foo.bar/y.json");
let data = Bytes::from("arbitrary data");
storage.put(&location1, data.clone()).await?;
storage.put(&location2, data).await?;
let prefix = Path::from_raw("foo");
let content_list = flatten_list_stream(storage, Some(&prefix)).await?;
assert_eq!(content_list, &[location1.clone()]);
let prefix = Path::from_raw("foo/x");
let content_list = flatten_list_stream(storage, Some(&prefix)).await?;
assert_eq!(content_list, &[]);
Ok(())
}
pub(crate) async fn list_with_delimiter(storage: &DynObjectStore) -> Result<()> {
delete_fixtures(storage).await;
// ==================== check: store is empty ====================
let content_list = flatten_list_stream(storage, None).await?;
assert!(content_list.is_empty());
// ==================== do: create files ====================
let data = Bytes::from("arbitrary data");
let files: Vec<_> = [
"test_file",
"mydb/wb/000/000/000.segment",
"mydb/wb/000/000/001.segment",
"mydb/wb/000/000/002.segment",
"mydb/wb/001/001/000.segment",
"mydb/wb/foo.json",
"mydb/wbwbwb/111/222/333.segment",
"mydb/data/whatevs",
]
.iter()
.map(|&s| Path::from_raw(s))
.collect();
for f in &files {
let data = data.clone();
storage.put(f, data).await.unwrap();
}
// ==================== check: prefix-list `mydb/wb` (directory) ====================
let prefix = Path::from_raw("mydb/wb");
let expected_000 = Path::from_raw("mydb/wb/000");
let expected_001 = Path::from_raw("mydb/wb/001");
let expected_location = Path::from_raw("mydb/wb/foo.json");
let result = storage.list_with_delimiter(&prefix).await.unwrap();
assert_eq!(result.common_prefixes, vec![expected_000, expected_001]);
assert_eq!(result.objects.len(), 1);
let object = &result.objects[0];
assert_eq!(object.location, expected_location);
assert_eq!(object.size, data.len());
// ==================== check: prefix-list `mydb/wb/000/000/001` (partial filename doesn't match) ====================
let prefix = Path::from_raw("mydb/wb/000/000/001");
let result = storage.list_with_delimiter(&prefix).await.unwrap();
assert!(result.common_prefixes.is_empty());
assert_eq!(result.objects.len(), 0);
// ==================== check: prefix-list `not_there` (non-existing prefix) ====================
let prefix = Path::from_raw("not_there");
let result = storage.list_with_delimiter(&prefix).await.unwrap();
assert!(result.common_prefixes.is_empty());
assert!(result.objects.is_empty());
// ==================== do: remove all files ====================
for f in &files {
storage.delete(f).await.unwrap();
}
// ==================== check: store is empty ====================
let content_list = flatten_list_stream(storage, None).await?;
assert!(content_list.is_empty());
Ok(())
}
pub(crate) async fn get_nonexistent_object(
storage: &DynObjectStore,
location: Option<Path>,
) -> Result<Vec<u8>> {
let location = location.unwrap_or_else(|| Path::from_raw("this_file_should_not_exist"));
let err = storage.head(&location).await.unwrap_err();
assert!(matches!(err, crate::Error::NotFound { .. }));
Ok(storage.get(&location).await?.bytes().await?)
}
async fn delete_fixtures(storage: &DynObjectStore) {
let files: Vec<_> = [
"test_file",
"test_dir/test_file.json",
"mydb/wb/000/000/000.segment",
"mydb/wb/000/000/001.segment",
"mydb/wb/000/000/002.segment",
"mydb/wb/001/001/000.segment",
"mydb/wb/foo.json",
"mydb/data/whatevs",
"mydb/wbwbwb/111/222/333.segment",
"foo/x.json",
"foo.bar/y.json",
]
.iter()
.map(|&s| Path::from_raw(s))
.collect();
for f in &files {
// don't care if it errors, should fail elsewhere
let _ = storage.delete(f).await;
}
}
// Tests TODO:
// GET nonexisting location (in_memory/file)
// DELETE nonexisting location
// PUT overwriting
}

View File

@ -1,235 +0,0 @@
//! This module contains the IOx implementation for using memory as the object
//! store.
use crate::{path::Path, GetResult, ListResult, ObjectMeta, ObjectStoreApi, Result};
use async_trait::async_trait;
use bytes::Bytes;
use chrono::Utc;
use futures::{stream::BoxStream, StreamExt};
use snafu::{OptionExt, Snafu};
use std::collections::BTreeMap;
use std::collections::BTreeSet;
use tokio::sync::RwLock;
/// A specialized `Error` for in-memory object store-related errors
#[derive(Debug, Snafu)]
#[allow(missing_docs)]
enum Error {
#[snafu(display("No data in memory found. Location: {path}"))]
NoDataInMemory { path: String },
}
impl From<Error> for super::Error {
fn from(source: Error) -> Self {
match source {
Error::NoDataInMemory { ref path } => Self::NotFound {
path: path.into(),
source: source.into(),
},
// currently "not found" is the only error that can happen with the in-memory store
}
}
}
/// In-memory storage suitable for testing or for opting out of using a cloud
/// storage provider.
#[derive(Debug, Default)]
pub struct InMemory {
storage: RwLock<BTreeMap<Path, Bytes>>,
}
impl std::fmt::Display for InMemory {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "InMemory")
}
}
#[async_trait]
impl ObjectStoreApi for InMemory {
async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
self.storage.write().await.insert(location.clone(), bytes);
Ok(())
}
async fn get(&self, location: &Path) -> Result<GetResult> {
let data = self.get_bytes(location).await?;
Ok(GetResult::Stream(
futures::stream::once(async move { Ok(data) }).boxed(),
))
}
async fn head(&self, location: &Path) -> Result<ObjectMeta> {
let last_modified = Utc::now();
let bytes = self.get_bytes(location).await?;
Ok(ObjectMeta {
location: location.clone(),
last_modified,
size: bytes.len(),
})
}
async fn delete(&self, location: &Path) -> Result<()> {
self.storage.write().await.remove(location);
Ok(())
}
async fn list<'a>(
&'a self,
prefix: Option<&'a Path>,
) -> Result<BoxStream<'a, Result<ObjectMeta>>> {
let last_modified = Utc::now();
let storage = self.storage.read().await;
let values: Vec<_> = storage
.iter()
.filter(move |(key, _)| prefix.map(|p| key.prefix_matches(p)).unwrap_or(true))
.map(move |(key, value)| {
Ok(ObjectMeta {
location: key.clone(),
last_modified,
size: value.len(),
})
})
.collect();
Ok(futures::stream::iter(values).boxed())
}
/// The memory implementation returns all results, as opposed to the cloud
/// versions which limit their results to 1k or more because of API
/// limitations.
async fn list_with_delimiter(&self, prefix: &Path) -> Result<ListResult> {
let mut common_prefixes = BTreeSet::new();
let last_modified = Utc::now();
// Only objects in this base level should be returned in the
// response. Otherwise, we just collect the common prefixes.
let mut objects = vec![];
for (k, v) in self.storage.read().await.range((prefix)..) {
let mut parts = match k.prefix_match(prefix) {
Some(parts) => parts,
None => break,
};
// Pop first element
let common_prefix = match parts.next() {
Some(p) => p,
None => continue,
};
if parts.next().is_some() {
common_prefixes.insert(prefix.child(common_prefix));
} else {
let object = ObjectMeta {
location: k.clone(),
last_modified,
size: v.len(),
};
objects.push(object);
}
}
Ok(ListResult {
objects,
common_prefixes: common_prefixes.into_iter().collect(),
next_token: None,
})
}
}
impl InMemory {
/// Create new in-memory storage.
pub fn new() -> Self {
Self::default()
}
/// Creates a clone of the store
pub async fn clone(&self) -> Self {
let storage = self.storage.read().await;
let storage = storage.clone();
Self {
storage: RwLock::new(storage),
}
}
async fn get_bytes(&self, location: &Path) -> Result<Bytes> {
let storage = self.storage.read().await;
let bytes = storage
.get(location)
.cloned()
.context(NoDataInMemorySnafu {
path: location.to_string(),
})?;
Ok(bytes)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
tests::{
get_nonexistent_object, list_uses_directories_correctly, list_with_delimiter,
put_get_delete_list,
},
Error as ObjectStoreError, ObjectStoreApi,
};
#[tokio::test]
async fn in_memory_test() {
let integration = InMemory::new();
put_get_delete_list(&integration).await.unwrap();
list_uses_directories_correctly(&integration).await.unwrap();
list_with_delimiter(&integration).await.unwrap();
}
#[tokio::test]
async fn unknown_length() {
let integration = InMemory::new();
let location = Path::from_raw("some_file");
let data = Bytes::from("arbitrary data");
let expected_data = data.clone();
integration.put(&location, data).await.unwrap();
let read_data = integration
.get(&location)
.await
.unwrap()
.bytes()
.await
.unwrap();
assert_eq!(&*read_data, expected_data);
}
const NON_EXISTENT_NAME: &str = "nonexistentname";
#[tokio::test]
async fn nonexistent_location() {
let integration = InMemory::new();
let location = Path::from_raw(NON_EXISTENT_NAME);
let err = get_nonexistent_object(&integration, Some(location))
.await
.unwrap_err();
if let Some(ObjectStoreError::NotFound { path, source }) =
err.downcast_ref::<ObjectStoreError>()
{
let source_variant = source.downcast_ref::<Error>();
assert!(
matches!(source_variant, Some(Error::NoDataInMemory { .. }),),
"got: {:?}",
source_variant
);
assert_eq!(path, NON_EXISTENT_NAME);
} else {
panic!("unexpected error type: {:?}", err);
}
}
}

View File

@ -1,297 +0,0 @@
//! Path abstraction for Object Storage
use itertools::Itertools;
use std::fmt::Formatter;
/// The delimiter to separate object namespaces, creating a directory structure.
pub const DELIMITER: &str = "/";
mod parts;
pub(crate) use parts::PathPart;
/// An object storage location suitable for passing to cloud storage APIs such
/// as AWS, GCS, and Azure.
#[derive(Debug, Clone, Default, PartialEq, Eq, Hash, Ord, PartialOrd)]
pub struct Path {
/// The raw path with no leading or trailing delimiters
raw: String,
}
impl Path {
/// Create a new [`Path`] from a string
pub fn from_raw(path: impl AsRef<str>) -> Self {
let path = path.as_ref();
Self::from_iter(path.split(DELIMITER))
}
/// Returns the raw encoded path in object storage
pub fn to_raw(&self) -> &str {
&self.raw
}
/// Returns the [`PathPart`] of this [`Path`]
pub fn parts(&self) -> impl Iterator<Item = PathPart<'_>> {
match self.raw.is_empty() {
true => itertools::Either::Left(std::iter::empty()),
false => itertools::Either::Right(
self.raw
.split(DELIMITER)
.map(|s| PathPart { raw: s.into() }),
),
}
}
pub(crate) fn prefix_match(
&self,
prefix: &Self,
) -> Option<impl Iterator<Item = PathPart<'_>> + '_> {
let diff = itertools::diff_with(
self.raw.split(DELIMITER),
prefix.raw.split(DELIMITER),
|a, b| a == b,
);
match diff {
// Both were equal
None => Some(itertools::Either::Left(std::iter::empty())),
// Mismatch or prefix was longer => None
Some(itertools::Diff::FirstMismatch(_, _, _) | itertools::Diff::Longer(_, _)) => None,
// Match with remaining
Some(itertools::Diff::Shorter(_, back)) => Some(itertools::Either::Right(
back.map(|s| PathPart { raw: s.into() }),
)),
}
}
pub(crate) fn prefix_matches(&self, prefix: &Self) -> bool {
self.prefix_match(prefix).is_some()
}
/// Creates a new child of this [`Path`]
pub fn child<'a>(&self, child: impl Into<PathPart<'a>>) -> Self {
let raw = match self.raw.is_empty() {
true => format!("{}", child.into().raw),
false => format!("{}{}{}", self.raw, DELIMITER, child.into().raw),
};
Self { raw }
}
}
impl std::fmt::Display for Path {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
self.raw.fmt(f)
}
}
impl<'a, I> FromIterator<I> for Path
where
I: Into<PathPart<'a>>,
{
fn from_iter<T: IntoIterator<Item = I>>(iter: T) -> Self {
let raw = T::into_iter(iter)
.map(|s| s.into())
.filter(|s| !s.raw.is_empty())
.map(|s| s.raw)
.join(DELIMITER);
Self { raw }
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn cloud_prefix_with_trailing_delimiter() {
// Use case: files exist in object storage named `foo/bar.json` and
// `foo_test.json`. A search for the prefix `foo/` should return
// `foo/bar.json` but not `foo_test.json'.
let prefix = Path::from_iter(["test"]);
let converted = prefix.to_raw();
assert_eq!(converted, "test");
}
#[test]
fn push_encodes() {
let location = Path::from_iter(["foo/bar", "baz%2Ftest"]);
let converted = location.to_raw();
assert_eq!(converted, "foo%2Fbar/baz%252Ftest");
}
#[test]
fn convert_raw_before_partial_eq() {
// dir and file_name
let cloud = Path::from_raw("test_dir/test_file.json");
let built = Path::from_iter(["test_dir", "test_file.json"]);
assert_eq!(built, cloud);
// dir and file_name w/o dot
let cloud = Path::from_raw("test_dir/test_file");
let built = Path::from_iter(["test_dir", "test_file"]);
assert_eq!(built, cloud);
// dir, no file
let cloud = Path::from_raw("test_dir/");
let built = Path::from_iter(["test_dir"]);
assert_eq!(built, cloud);
// file_name, no dir
let cloud = Path::from_raw("test_file.json");
let built = Path::from_iter(["test_file.json"]);
assert_eq!(built, cloud);
// empty
let cloud = Path::from_raw("");
let built = Path::from_iter(["", ""]);
assert_eq!(built, cloud);
}
#[test]
fn parts_after_prefix_behavior() {
let existing_path = Path::from_raw("apple/bear/cow/dog/egg.json");
// Prefix with one directory
let prefix = Path::from_raw("apple");
let expected_parts: Vec<PathPart<'_>> = vec!["bear", "cow", "dog", "egg.json"]
.into_iter()
.map(Into::into)
.collect();
let parts: Vec<_> = existing_path.prefix_match(&prefix).unwrap().collect();
assert_eq!(parts, expected_parts);
// Prefix with two directories
let prefix = Path::from_raw("apple/bear");
let expected_parts: Vec<PathPart<'_>> = vec!["cow", "dog", "egg.json"]
.into_iter()
.map(Into::into)
.collect();
let parts: Vec<_> = existing_path.prefix_match(&prefix).unwrap().collect();
assert_eq!(parts, expected_parts);
// Not a prefix
let prefix = Path::from_raw("cow");
assert!(existing_path.prefix_match(&prefix).is_none());
// Prefix with a partial directory
let prefix = Path::from_raw("ap");
assert!(existing_path.prefix_match(&prefix).is_none());
// Prefix matches but there aren't any parts after it
let existing_path = Path::from_raw("apple/bear/cow/dog");
let prefix = existing_path.clone();
assert_eq!(existing_path.prefix_match(&prefix).unwrap().count(), 0);
}
#[test]
fn prefix_matches() {
let haystack = Path::from_iter(["foo/bar", "baz%2Ftest", "something"]);
let needle = haystack.clone();
// self starts with self
assert!(
haystack.prefix_matches(&haystack),
"{:?} should have started with {:?}",
haystack,
haystack
);
// a longer prefix doesn't match
let needle = needle.child("longer now");
assert!(
!haystack.prefix_matches(&needle),
"{:?} shouldn't have started with {:?}",
haystack,
needle
);
// one dir prefix matches
let needle = Path::from_iter(["foo/bar"]);
assert!(
haystack.prefix_matches(&needle),
"{:?} should have started with {:?}",
haystack,
needle
);
// two dir prefix matches
let needle = needle.child("baz%2Ftest");
assert!(
haystack.prefix_matches(&needle),
"{:?} should have started with {:?}",
haystack,
needle
);
// partial dir prefix doesn't match
let needle = Path::from_iter(["f"]);
assert!(
!haystack.prefix_matches(&needle),
"{:?} should not have started with {:?}",
haystack,
needle
);
// one dir and one partial dir doesn't match
let needle = Path::from_iter(["foo/bar", "baz"]);
assert!(
!haystack.prefix_matches(&needle),
"{:?} should not have started with {:?}",
haystack,
needle
);
}
#[test]
fn prefix_matches_with_file_name() {
let haystack = Path::from_iter(["foo/bar", "baz%2Ftest", "something", "foo.segment"]);
// All directories match and file name is a prefix
let needle = Path::from_iter(["foo/bar", "baz%2Ftest", "something", "foo"]);
assert!(
!haystack.prefix_matches(&needle),
"{:?} should not have started with {:?}",
haystack,
needle
);
// All directories match but file name is not a prefix
let needle = Path::from_iter(["foo/bar", "baz%2Ftest", "something", "e"]);
assert!(
!haystack.prefix_matches(&needle),
"{:?} should not have started with {:?}",
haystack,
needle
);
// Not all directories match; file name is a prefix of the next directory; this
// does not match
let needle = Path::from_iter(["foo/bar", "baz%2Ftest", "s"]);
assert!(
!haystack.prefix_matches(&needle),
"{:?} should not have started with {:?}",
haystack,
needle
);
// Not all directories match; file name is NOT a prefix of the next directory;
// no match
let needle = Path::from_iter(["foo/bar", "baz%2Ftest", "p"]);
assert!(
!haystack.prefix_matches(&needle),
"{:?} should not have started with {:?}",
haystack,
needle
);
}
}

View File

@ -1,108 +0,0 @@
use percent_encoding::{percent_decode_str, percent_encode, AsciiSet, CONTROLS};
use std::borrow::Cow;
use super::DELIMITER;
// percent_encode's API needs this as a byte
const DELIMITER_BYTE: u8 = DELIMITER.as_bytes()[0];
/// The PathPart type exists to validate the directory/file names that form part
/// of a path.
///
/// A PathPart instance is guaranteed to be non-empty and to contain no `/`
/// characters as it can only be constructed by going through the `from` impl.
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Default, Hash)]
pub struct PathPart<'a> {
pub(super) raw: Cow<'a, str>,
}
/// Characters we want to encode.
const INVALID: &AsciiSet = &CONTROLS
// The delimiter we are reserving for internal hierarchy
.add(DELIMITER_BYTE)
// Characters AWS recommends avoiding for object keys
// https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingMetadata.html
.add(b'\\')
.add(b'{')
// TODO: Non-printable ASCII characters (128255 decimal characters)
.add(b'^')
.add(b'}')
.add(b'%')
.add(b'`')
.add(b']')
.add(b'"') // " <-- my editor is confused about double quotes within single quotes
.add(b'>')
.add(b'[')
.add(b'~')
.add(b'<')
.add(b'#')
.add(b'|')
// Characters Google Cloud Storage recommends avoiding for object names
// https://cloud.google.com/storage/docs/naming-objects
.add(b'\r')
.add(b'\n')
.add(b'*')
.add(b'?');
impl<'a> From<&'a str> for PathPart<'a> {
fn from(v: &'a str) -> Self {
let inner = match v {
// We don't want to encode `.` generally, but we do want to disallow parts of paths
// to be equal to `.` or `..` to prevent file system traversal shenanigans.
"." => "%2E".into(),
".." => "%2E%2E".into(),
other => percent_encode(other.as_bytes(), INVALID).into(),
};
Self { raw: inner }
}
}
impl From<String> for PathPart<'static> {
fn from(s: String) -> Self {
Self {
raw: Cow::Owned(PathPart::from(s.as_ref()).raw.into_owned()),
}
}
}
impl<'a> std::fmt::Display for PathPart<'a> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
percent_decode_str(self.raw.as_ref())
.decode_utf8()
.expect("Valid UTF-8 that came from String")
.fmt(f)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn path_part_delimiter_gets_encoded() {
let part: PathPart<'_> = "foo/bar".into();
assert_eq!(part.raw, "foo%2Fbar");
assert_eq!(part.to_string(), "foo/bar")
}
#[test]
fn path_part_given_already_encoded_string() {
let part: PathPart<'_> = "foo%2Fbar".into();
assert_eq!(part.raw, "foo%252Fbar");
assert_eq!(part.to_string(), "foo%2Fbar");
}
#[test]
fn path_part_cant_be_one_dot() {
let part: PathPart<'_> = ".".into();
assert_eq!(part.raw, "%2E");
assert_eq!(part.to_string(), ".");
}
#[test]
fn path_part_cant_be_two_dots() {
let part: PathPart<'_> = "..".into();
assert_eq!(part.raw, "%2E%2E");
assert_eq!(part.to_string(), "..");
}
}

View File

@ -1,467 +0,0 @@
//! This module contains the IOx implementation for wrapping existing object store types into an artificial "sleep" wrapper.
use std::{
convert::TryInto,
sync::{Arc, Mutex},
};
use crate::{GetResult, ListResult, ObjectMeta, ObjectStoreApi, Path, Result};
use async_trait::async_trait;
use bytes::Bytes;
use futures::{stream::BoxStream, StreamExt};
use tokio::time::{sleep, Duration};
/// Configuration settings for throttled store
#[derive(Debug, Default, Clone, Copy)]
pub struct ThrottleConfig {
/// Sleep duration for every call to [`delete`](ThrottledStore::delete).
///
/// Sleeping is done before the underlying store is called and independently of the success of
/// the operation.
pub wait_delete_per_call: Duration,
/// Sleep duration for every byte received during [`get`](ThrottledStore::get).
///
/// Sleeping is performed after the underlying store returned and only for successful gets. The
/// sleep duration is additive to [`wait_get_per_call`](Self::wait_get_per_call).
///
/// Note that the per-byte sleep only happens as the user consumes the output bytes. Should
/// there be an intermediate failure (i.e. after partly consuming the output bytes), the
/// resulting sleep time will be partial as well.
pub wait_get_per_byte: Duration,
/// Sleep duration for every call to [`get`](ThrottledStore::get).
///
/// Sleeping is done before the underlying store is called and independently of the success of
/// the operation. The sleep duration is additive to
/// [`wait_get_per_byte`](Self::wait_get_per_byte).
pub wait_get_per_call: Duration,
/// Sleep duration for every call to [`list`](ThrottledStore::list).
///
/// Sleeping is done before the underlying store is called and independently of the success of
/// the operation. The sleep duration is additive to
/// [`wait_list_per_entry`](Self::wait_list_per_entry).
pub wait_list_per_call: Duration,
/// Sleep duration for every entry received during [`list`](ThrottledStore::list).
///
/// Sleeping is performed after the underlying store returned and only for successful lists.
/// The sleep duration is additive to [`wait_list_per_call`](Self::wait_list_per_call).
///
/// Note that the per-entry sleep only happens as the user consumes the output entries. Should
/// there be an intermediate failure (i.e. after partly consuming the output entries), the
/// resulting sleep time will be partial as well.
pub wait_list_per_entry: Duration,
/// Sleep duration for every call to
/// [`list_with_delimiter`](ThrottledStore::list_with_delimiter).
///
/// Sleeping is done before the underlying store is called and independently of the success of
/// the operation. The sleep duration is additive to
/// [`wait_list_with_delimiter_per_entry`](Self::wait_list_with_delimiter_per_entry).
pub wait_list_with_delimiter_per_call: Duration,
/// Sleep duration for every entry received during
/// [`list_with_delimiter`](ThrottledStore::list_with_delimiter).
///
/// Sleeping is performed after the underlying store returned and only for successful gets. The
/// sleep duration is additive to
/// [`wait_list_with_delimiter_per_call`](Self::wait_list_with_delimiter_per_call).
pub wait_list_with_delimiter_per_entry: Duration,
/// Sleep duration for every call to [`put`](ThrottledStore::put).
///
/// Sleeping is done before the underlying store is called and independently of the success of
/// the operation.
pub wait_put_per_call: Duration,
}
/// Store wrapper that wraps an inner store with some `sleep` calls.
///
/// This can be used for performance testing.
///
/// **Note that the behavior of the wrapper is deterministic and might not reflect real-world
/// conditions!**
#[derive(Debug)]
pub struct ThrottledStore<T: ObjectStoreApi> {
inner: T,
config: Arc<Mutex<ThrottleConfig>>,
}
impl<T: ObjectStoreApi> ThrottledStore<T> {
/// Create new wrapper with zero waiting times.
pub fn new(inner: T, config: ThrottleConfig) -> Self {
Self {
inner,
config: Arc::new(Mutex::new(config)),
}
}
/// Mutate config.
pub fn config_mut<F>(&self, f: F)
where
F: Fn(&mut ThrottleConfig),
{
let mut guard = self.config.lock().expect("lock poissened");
f(&mut guard)
}
/// Return copy of current config.
pub fn config(&self) -> ThrottleConfig {
*self.config.lock().expect("lock poissened")
}
}
impl<T: ObjectStoreApi> std::fmt::Display for ThrottledStore<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "ThrottledStore({})", self.inner)
}
}
#[async_trait]
impl<T: ObjectStoreApi> ObjectStoreApi for ThrottledStore<T> {
async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
sleep(self.config().wait_put_per_call).await;
self.inner.put(location, bytes).await
}
async fn get(&self, location: &Path) -> Result<GetResult> {
sleep(self.config().wait_get_per_call).await;
// need to copy to avoid moving / referencing `self`
let wait_get_per_byte = self.config().wait_get_per_byte;
self.inner.get(location).await.map(|result| {
let s = match result {
GetResult::Stream(s) => s,
GetResult::File(_, _) => unimplemented!(),
};
GetResult::Stream(
s.then(move |bytes_result| async move {
match bytes_result {
Ok(bytes) => {
let bytes_len: u32 = usize_to_u32_saturate(bytes.len());
sleep(wait_get_per_byte * bytes_len).await;
Ok(bytes)
}
Err(err) => Err(err),
}
})
.boxed(),
)
})
}
async fn head(&self, location: &Path) -> Result<ObjectMeta> {
sleep(self.config().wait_put_per_call).await;
self.inner.head(location).await
}
async fn delete(&self, location: &Path) -> Result<()> {
sleep(self.config().wait_delete_per_call).await;
self.inner.delete(location).await
}
async fn list<'a>(
&'a self,
prefix: Option<&'a Path>,
) -> Result<BoxStream<'a, Result<ObjectMeta>>> {
sleep(self.config().wait_list_per_call).await;
// need to copy to avoid moving / referencing `self`
let wait_list_per_entry = self.config().wait_list_per_entry;
self.inner.list(prefix).await.map(|stream| {
stream
.then(move |result| async move {
match result {
Ok(entry) => {
sleep(wait_list_per_entry).await;
Ok(entry)
}
Err(err) => Err(err),
}
})
.boxed()
})
}
async fn list_with_delimiter(&self, prefix: &Path) -> Result<ListResult> {
sleep(self.config().wait_list_with_delimiter_per_call).await;
match self.inner.list_with_delimiter(prefix).await {
Ok(list_result) => {
let entries_len = usize_to_u32_saturate(list_result.objects.len());
sleep(self.config().wait_list_with_delimiter_per_entry * entries_len).await;
Ok(list_result)
}
Err(err) => Err(err),
}
}
}
/// Saturated `usize` to `u32` cast.
fn usize_to_u32_saturate(x: usize) -> u32 {
x.try_into().unwrap_or(u32::MAX)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
memory::InMemory,
tests::{list_uses_directories_correctly, list_with_delimiter, put_get_delete_list},
};
use bytes::Bytes;
use futures::TryStreamExt;
use tokio::time::Duration;
use tokio::time::Instant;
const WAIT_TIME: Duration = Duration::from_millis(100);
const ZERO: Duration = Duration::from_millis(0); // Duration::default isn't constant
macro_rules! assert_bounds {
($d:expr, $lower:expr) => {
assert_bounds!($d, $lower, $lower + 1);
};
($d:expr, $lower:expr, $upper:expr) => {
let d = $d;
let lower = $lower * WAIT_TIME;
let upper = $upper * WAIT_TIME;
assert!(d >= lower, "{:?} must be >= than {:?}", d, lower);
assert!(d < upper, "{:?} must be < than {:?}", d, upper);
};
}
#[tokio::test]
async fn throttle_test() {
let inner = InMemory::new();
let store = ThrottledStore::new(inner, ThrottleConfig::default());
put_get_delete_list(&store).await.unwrap();
list_uses_directories_correctly(&store).await.unwrap();
list_with_delimiter(&store).await.unwrap();
}
#[tokio::test]
async fn delete_test() {
let inner = InMemory::new();
let store = ThrottledStore::new(inner, ThrottleConfig::default());
assert_bounds!(measure_delete(&store, None).await, 0);
assert_bounds!(measure_delete(&store, Some(0)).await, 0);
assert_bounds!(measure_delete(&store, Some(10)).await, 0);
store.config_mut(|cfg| cfg.wait_delete_per_call = WAIT_TIME);
assert_bounds!(measure_delete(&store, None).await, 1);
assert_bounds!(measure_delete(&store, Some(0)).await, 1);
assert_bounds!(measure_delete(&store, Some(10)).await, 1);
}
#[tokio::test]
async fn get_test() {
let inner = InMemory::new();
let store = ThrottledStore::new(inner, ThrottleConfig::default());
assert_bounds!(measure_get(&store, None).await, 0);
assert_bounds!(measure_get(&store, Some(0)).await, 0);
assert_bounds!(measure_get(&store, Some(10)).await, 0);
store.config_mut(|cfg| cfg.wait_get_per_call = WAIT_TIME);
assert_bounds!(measure_get(&store, None).await, 1);
assert_bounds!(measure_get(&store, Some(0)).await, 1);
assert_bounds!(measure_get(&store, Some(10)).await, 1);
store.config_mut(|cfg| {
cfg.wait_get_per_call = ZERO;
cfg.wait_get_per_byte = WAIT_TIME;
});
assert_bounds!(measure_get(&store, Some(2)).await, 2);
store.config_mut(|cfg| {
cfg.wait_get_per_call = WAIT_TIME;
cfg.wait_get_per_byte = WAIT_TIME;
});
assert_bounds!(measure_get(&store, Some(2)).await, 3);
}
#[tokio::test]
async fn list_test() {
let inner = InMemory::new();
let store = ThrottledStore::new(inner, ThrottleConfig::default());
assert_bounds!(measure_list(&store, 0).await, 0);
assert_bounds!(measure_list(&store, 10).await, 0);
store.config_mut(|cfg| cfg.wait_list_per_call = WAIT_TIME);
assert_bounds!(measure_list(&store, 0).await, 1);
assert_bounds!(measure_list(&store, 10).await, 1);
store.config_mut(|cfg| {
cfg.wait_list_per_call = ZERO;
cfg.wait_list_per_entry = WAIT_TIME;
});
assert_bounds!(measure_list(&store, 2).await, 2);
store.config_mut(|cfg| {
cfg.wait_list_per_call = WAIT_TIME;
cfg.wait_list_per_entry = WAIT_TIME;
});
assert_bounds!(measure_list(&store, 2).await, 3);
}
#[tokio::test]
async fn list_with_delimiter_test() {
let inner = InMemory::new();
let store = ThrottledStore::new(inner, ThrottleConfig::default());
assert_bounds!(measure_list_with_delimiter(&store, 0).await, 0);
assert_bounds!(measure_list_with_delimiter(&store, 10).await, 0);
store.config_mut(|cfg| cfg.wait_list_with_delimiter_per_call = WAIT_TIME);
assert_bounds!(measure_list_with_delimiter(&store, 0).await, 1);
assert_bounds!(measure_list_with_delimiter(&store, 10).await, 1);
store.config_mut(|cfg| {
cfg.wait_list_with_delimiter_per_call = ZERO;
cfg.wait_list_with_delimiter_per_entry = WAIT_TIME;
});
assert_bounds!(measure_list_with_delimiter(&store, 2).await, 2);
store.config_mut(|cfg| {
cfg.wait_list_with_delimiter_per_call = WAIT_TIME;
cfg.wait_list_with_delimiter_per_entry = WAIT_TIME;
});
assert_bounds!(measure_list_with_delimiter(&store, 2).await, 3);
}
#[tokio::test]
async fn put_test() {
let inner = InMemory::new();
let store = ThrottledStore::new(inner, ThrottleConfig::default());
assert_bounds!(measure_put(&store, 0).await, 0);
assert_bounds!(measure_put(&store, 10).await, 0);
store.config_mut(|cfg| cfg.wait_put_per_call = WAIT_TIME);
assert_bounds!(measure_put(&store, 0).await, 1);
assert_bounds!(measure_put(&store, 10).await, 1);
store.config_mut(|cfg| cfg.wait_put_per_call = ZERO);
assert_bounds!(measure_put(&store, 0).await, 0);
}
async fn place_test_object(store: &ThrottledStore<InMemory>, n_bytes: Option<usize>) -> Path {
let path = Path::from_raw("foo");
if let Some(n_bytes) = n_bytes {
let data: Vec<_> = std::iter::repeat(1u8).take(n_bytes).collect();
let bytes = Bytes::from(data);
store.put(&path, bytes).await.unwrap();
} else {
// ensure object is absent
store.delete(&path).await.unwrap();
}
path
}
async fn place_test_objects(store: &ThrottledStore<InMemory>, n_entries: usize) -> Path {
let prefix = Path::from_raw("foo");
// clean up store
let entries: Vec<_> = store
.list(Some(&prefix))
.await
.unwrap()
.try_collect()
.await
.unwrap();
for entry in entries {
store.delete(&entry.location).await.unwrap();
}
// create new entries
for i in 0..n_entries {
let path = prefix.child(i.to_string().as_str());
let data = Bytes::from("bar");
store.put(&path, data).await.unwrap();
}
prefix
}
async fn measure_delete(store: &ThrottledStore<InMemory>, n_bytes: Option<usize>) -> Duration {
let path = place_test_object(store, n_bytes).await;
let t0 = Instant::now();
store.delete(&path).await.unwrap();
t0.elapsed()
}
async fn measure_get(store: &ThrottledStore<InMemory>, n_bytes: Option<usize>) -> Duration {
let path = place_test_object(store, n_bytes).await;
let t0 = Instant::now();
let res = store.get(&path).await;
if n_bytes.is_some() {
// need to consume bytes to provoke sleep times
let s = match res.unwrap() {
GetResult::Stream(s) => s,
GetResult::File(_, _) => unimplemented!(),
};
s.map_ok(|b| bytes::BytesMut::from(&b[..]))
.try_concat()
.await
.unwrap();
} else {
assert!(res.is_err());
}
t0.elapsed()
}
async fn measure_list(store: &ThrottledStore<InMemory>, n_entries: usize) -> Duration {
let prefix = place_test_objects(store, n_entries).await;
let t0 = Instant::now();
store
.list(Some(&prefix))
.await
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap();
t0.elapsed()
}
async fn measure_list_with_delimiter(
store: &ThrottledStore<InMemory>,
n_entries: usize,
) -> Duration {
let prefix = place_test_objects(store, n_entries).await;
let t0 = Instant::now();
store.list_with_delimiter(&prefix).await.unwrap();
t0.elapsed()
}
async fn measure_put(store: &ThrottledStore<InMemory>, n_bytes: usize) -> Duration {
let data: Vec<_> = std::iter::repeat(1u8).take(n_bytes).collect();
let bytes = Bytes::from(data);
let t0 = Instant::now();
store.put(&Path::from_raw("foo"), bytes).await.unwrap();
t0.elapsed()
}
}

View File

@ -10,7 +10,7 @@ bytes = "1.1"
futures = "0.3"
iox_time = { version = "0.1.0", path = "../iox_time" }
metric = { version = "0.1.0", path = "../metric" }
object_store = { path = "../object_store" }
object_store = "0.0.1"
pin-project = "1.0.10"
workspace-hack = { path = "../workspace-hack" }

View File

@ -5,7 +5,7 @@ use async_trait::async_trait;
use bytes::Bytes;
use snafu::Snafu;
use object_store::{path::Path, GetResult, ListResult, ObjectMeta, ObjectStoreApi, Result};
use object_store::{path::Path, GetResult, ListResult, ObjectMeta, ObjectStore, Result};
/// A specialized `Error` for Azure object store-related errors
#[derive(Debug, Snafu, Clone)]
@ -50,7 +50,7 @@ impl std::fmt::Display for DummyObjectStore {
}
#[async_trait]
impl ObjectStoreApi for DummyObjectStore {
impl ObjectStore for DummyObjectStore {
async fn put(&self, _location: &Path, _bytes: Bytes) -> Result<()> {
Ok(NotSupportedSnafu { name: self.name }.fail()?)
}

View File

@ -1,4 +1,4 @@
//! A metric instrumentation wrapper over [`ObjectStoreApi`] implementations.
//! A metric instrumentation wrapper over [`ObjectStore`] implementations.
use std::sync::Arc;
use std::{
@ -14,23 +14,23 @@ use iox_time::{SystemProvider, Time, TimeProvider};
use metric::{Metric, U64Counter, U64Histogram, U64HistogramOptions};
use pin_project::{pin_project, pinned_drop};
use object_store::{path::Path, GetResult, ListResult, ObjectMeta, ObjectStoreApi, Result};
use object_store::{path::Path, GetResult, ListResult, ObjectMeta, ObjectStore, Result};
#[cfg(test)]
mod dummy;
/// An instrumentation decorator, wrapping an underlying [`ObjectStoreApi`]
/// An instrumentation decorator, wrapping an underlying [`ObjectStore`]
/// implementation and recording bytes transferred and call latency.
///
/// # Stream Duration
///
/// The [`ObjectStoreApi::get()`] call can return a [`Stream`] which is polled
/// The [`ObjectStore::get()`] call can return a [`Stream`] which is polled
/// by the caller and may yield chunks of a file over a series of polls (as
/// opposed to all of the file data in one go). Because the caller drives the
/// polling and therefore fetching of data from the object store over the
/// lifetime of the [`Stream`], the duration of a [`ObjectStoreApi::get()`]
/// lifetime of the [`Stream`], the duration of a [`ObjectStore::get()`]
/// request is measured to be the wall clock difference between the moment the
/// caller executes the [`ObjectStoreApi::get()`] call, up until the last chunk
/// caller executes the [`ObjectStore::get()`] call, up until the last chunk
/// of data is yielded to the caller.
///
/// This means the duration metrics measuring consumption of returned streams
@ -39,7 +39,7 @@ mod dummy;
///
/// # Stream Errors
///
/// The [`ObjectStoreApi::get()`] method can return a [`Stream`] of [`Result`]
/// The [`ObjectStore::get()`] method can return a [`Stream`] of [`Result`]
/// instances, and returning an error when polled is not necessarily a terminal
/// state. The metric recorder allows for a caller to observe a transient error
/// and subsequently go on to complete reading the stream, recording this read
@ -67,7 +67,7 @@ mod dummy;
/// are not recorded. The bytes transferred metric is not affected.
#[derive(Debug)]
pub struct ObjectStoreMetrics {
inner: Arc<dyn ObjectStoreApi>,
inner: Arc<dyn ObjectStore>,
time_provider: Arc<dyn TimeProvider>,
put_success_duration_ms: U64Histogram,
@ -88,7 +88,7 @@ pub struct ObjectStoreMetrics {
impl ObjectStoreMetrics {
/// Instrument `T`, pushing to `registry`.
pub fn new(
inner: Arc<dyn ObjectStoreApi>,
inner: Arc<dyn ObjectStore>,
time_provider: Arc<dyn TimeProvider>,
registry: &metric::Registry,
) -> Self {
@ -162,7 +162,7 @@ impl std::fmt::Display for ObjectStoreMetrics {
}
#[async_trait]
impl ObjectStoreApi for ObjectStoreMetrics {
impl ObjectStore for ObjectStoreMetrics {
async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
let t = self.time_provider.now();
@ -309,7 +309,7 @@ trait MetricDelegate {
/// A [`MetricDelegate`] for instrumented streams of [`Bytes`].
///
/// This impl is used to record the number of bytes yielded for
/// [`ObjectStoreApi::get()`] calls.
/// [`ObjectStore::get()`] calls.
#[derive(Debug)]
struct BytesStreamDelegate(U64Counter);
@ -522,7 +522,7 @@ mod tests {
use tokio::io::AsyncReadExt;
use dummy::DummyObjectStore;
use object_store::{disk::LocalFileSystem, memory::InMemory};
use object_store::{local::LocalFileSystem, memory::InMemory};
use super::*;

View File

@ -15,7 +15,7 @@ futures = "0.3"
generated_types = { path = "../generated_types" }
iox_time = { path = "../iox_time" }
metric = { path = "../metric" }
object_store = { path = "../object_store" }
object_store = "0.0.1"
observability_deps = { path = "../observability_deps" }
parking_lot = "0.12"
parquet = {version = "13", features = ["experimental"]}

View File

@ -17,7 +17,7 @@ generated_types = { path = "../generated_types" }
influxdb_iox_client = { path = "../influxdb_iox_client" }
iox_catalog = { path = "../iox_catalog" }
metric = { path = "../metric" }
object_store = { path = "../object_store" }
object_store = "0.0.1"
observability_deps = { path = "../observability_deps" }
parking_lot = "0.12"
parquet_file = { path = "../parquet_file" }

View File

@ -20,7 +20,7 @@ metric = { path = "../metric" }
mutable_batch = { path = "../mutable_batch" }
mutable_batch_lp = { path = "../mutable_batch_lp" }
mutable_batch_pb = { version = "0.1.0", path = "../mutable_batch_pb" }
object_store = { path = "../object_store" }
object_store = "0.0.1"
observability_deps = { path = "../observability_deps" }
parking_lot = "0.12"
predicate = { path = "../predicate" }

View File

@ -8,7 +8,7 @@ data_types = { path = "../data_types" }
futures = "0.3"
generated_types = { path = "../generated_types" }
iox_catalog = { path = "../iox_catalog" }
object_store = { path = "../object_store" }
object_store = "0.0.1"
observability_deps = { path = "../observability_deps" }
parquet_file = { path = "../parquet_file" }
tokio = { version = "1", features = ["rt-multi-thread", "macros"] }

View File

@ -98,7 +98,7 @@ mod tests {
use data_types::{KafkaPartition, ParquetFileParams, SequenceNumber, Timestamp};
use generated_types::influxdata::iox::object_store::v1::object_store_service_server::ObjectStoreService;
use iox_catalog::mem::MemCatalog;
use object_store::{memory::InMemory, ObjectStoreApi};
use object_store::{memory::InMemory, ObjectStore};
use uuid::Uuid;
#[tokio::test]

View File

@ -42,6 +42,7 @@ nom = { version = "7", features = ["alloc", "std"] }
num-bigint = { version = "0.4", features = ["std"] }
num-integer = { version = "0.1", default-features = false, features = ["i128", "std"] }
num-traits = { version = "0.2", features = ["i128", "libm", "std"] }
object_store = { version = "0.0.1", default-features = false, features = ["aws", "azure", "azure_core", "azure_storage", "azure_storage_blobs", "cloud-storage", "gcp", "hyper", "hyper-rustls", "reqwest", "rusoto_core", "rusoto_credential", "rusoto_s3"] }
once_cell = { version = "1", features = ["alloc", "parking_lot", "race", "std"] }
parquet = { version = "13", features = ["arrow", "base64", "brotli", "experimental", "flate2", "lz4", "snap", "zstd"] }
predicates = { version = "2", features = ["diff", "difflib", "float-cmp", "normalize-line-endings", "regex"] }