diff --git a/.circleci/config.yml b/.circleci/config.yml index a0463521cd..6cdc2c7b42 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -201,8 +201,6 @@ jobs: # setup multiple docker images (see https://circleci.com/docs/2.0/configuration-reference/#docker) docker: - image: quay.io/influxdb/rust:ci - - image: localstack/localstack - - image: mcr.microsoft.com/azure-storage/azurite - image: vectorized/redpanda:v21.9.2 command: redpanda start --overprovisioned --smp 1 --memory 1G --reserve-memory 0M - image: postgres @@ -222,38 +220,18 @@ jobs: TEST_INTEGRATION: 1 INFLUXDB_IOX_INTEGRATION_LOCAL: 1 KAFKA_CONNECT: "localhost:9092" - AWS_DEFAULT_REGION: "us-east-1" - AWS_ACCESS_KEY_ID: test - AWS_SECRET_ACCESS_KEY: test - AWS_ENDPOINT: http://127.0.0.1:4566 - AZURE_USE_EMULATOR: "1" - OBJECT_STORE_BUCKET: iox-test POSTGRES_USER: postgres TEST_INFLUXDB_IOX_CATALOG_DSN: "postgres://postgres@localhost/iox_shared" # When removing this, also remove the ignore on the test in trogging/src/cli.rs RUST_LOG: debug LOG_FILTER: debug steps: - - run: - name: Setup localstack (AWS emulation) - command: | - cd /tmp - curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip" - unzip awscliv2.zip - sudo ./aws/install - aws --endpoint-url=http://localhost:4566 s3 mb s3://iox-test - - run: - name: Setup Azurite (Azure emulation) - # the magical connection string is from https://docs.microsoft.com/en-us/azure/storage/common/storage-use-azurite?tabs=visual-studio#http-connection-strings - command: | - curl -sL https://aka.ms/InstallAzureCLIDeb | sudo bash - az storage container create -n iox-test --connection-string 'DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;QueueEndpoint=http://127.0.0.1:10001/devstoreaccount1;' - checkout - rust_components - cache_restore - run: name: Cargo test - command: cargo test --workspace --features=aws,azure,azure_test + command: cargo test --workspace - cache_save # end to end tests with Heappy (heap profiling enabled) diff --git a/Cargo.lock b/Cargo.lock index ddd4d6525d..09ad3f50c4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3399,7 +3399,9 @@ dependencies = [ [[package]] name = "object_store" -version = "0.1.0" +version = "0.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "71fc15440e94ba9b3a8bc83c8f94a0d5a2861d72254cdb9ede72fbbd18e015db" dependencies = [ "async-trait", "azure_core", @@ -3408,13 +3410,10 @@ dependencies = [ "bytes", "chrono", "cloud-storage", - "dotenv", "futures", - "futures-test", "hyper", "hyper-rustls", "itertools", - "observability_deps", "percent-encoding", "reqwest", "rusoto_core", @@ -3424,8 +3423,8 @@ dependencies = [ "tempfile", "tokio", "tokio-util 0.7.1", + "tracing", "walkdir", - "workspace-hack", ] [[package]] @@ -6625,6 +6624,7 @@ dependencies = [ "num-bigint 0.4.3", "num-integer", "num-traits", + "object_store", "once_cell", "parquet", "predicates", diff --git a/Cargo.toml b/Cargo.toml index 313521b22c..8622c75c80 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,7 +41,6 @@ members = [ "mutable_batch_lp", "mutable_batch_pb", "mutable_batch_tests", - "object_store", "object_store_metrics", "observability_deps", "packers", diff --git a/clap_blocks/Cargo.toml b/clap_blocks/Cargo.toml index d90272c733..6c51372847 100644 --- a/clap_blocks/Cargo.toml +++ b/clap_blocks/Cargo.toml @@ -10,7 +10,7 @@ humantime = "2.1.0" iox_catalog = { path = "../iox_catalog" } iox_time = { path = "../iox_time" } metric = { path = "../metric" } -object_store = { path = "../object_store" } +object_store = "0.0.1" observability_deps = { path = "../observability_deps" } snafu = "0.7" tempfile = "3.1.0" diff --git a/clap_blocks/src/object_store.rs b/clap_blocks/src/object_store.rs index 8ab1b2617e..c10b261759 100644 --- a/clap_blocks/src/object_store.rs +++ b/clap_blocks/src/object_store.rs @@ -369,7 +369,7 @@ pub fn make_object_store(config: &ObjectStoreConfig) -> Result { fs::create_dir_all(db_dir) .context(CreatingDatabaseDirectorySnafu { path: db_dir })?; - Ok(Arc::new(object_store::disk::LocalFileSystem::new(&db_dir))) + Ok(Arc::new(object_store::local::LocalFileSystem::new(&db_dir))) } None => MissingObjectStoreConfigSnafu { object_store: ObjectStoreType::File, diff --git a/compactor/Cargo.toml b/compactor/Cargo.toml index 0e57aabf7b..47e402e5ba 100644 --- a/compactor/Cargo.toml +++ b/compactor/Cargo.toml @@ -14,7 +14,7 @@ datafusion = { path = "../datafusion" } futures = "0.3" iox_catalog = { path = "../iox_catalog" } metric = { path = "../metric" } -object_store = { path = "../object_store" } +object_store = "0.0.1" observability_deps = { path = "../observability_deps" } parquet_file = { path = "../parquet_file" } predicate = { path = "../predicate" } diff --git a/compactor/src/compact.rs b/compactor/src/compact.rs index 1c72651a92..267d787a97 100644 --- a/compactor/src/compact.rs +++ b/compactor/src/compact.rs @@ -1108,10 +1108,10 @@ mod tests { use super::*; use arrow_util::assert_batches_sorted_eq; use data_types::{ChunkId, KafkaPartition, NamespaceId, ParquetFileParams, SequenceNumber}; + use futures::TryStreamExt; use iox_catalog::interface::INITIAL_COMPACTION_LEVEL; use iox_tests::util::TestCatalog; use iox_time::SystemProvider; - use object_store::ObjectStoreTestConvenience; use querier::{ cache::CatalogCache, chunk::{collect_read_filter, ParquetChunkAdapter}, @@ -2488,7 +2488,8 @@ mod tests { .await .unwrap(); - let object_store_files = compactor.object_store.list_all().await.unwrap(); + let list = compactor.object_store.list(None).await.unwrap(); + let object_store_files: Vec<_> = list.try_collect().await.unwrap(); assert_eq!(object_store_files.len(), 1); } diff --git a/compactor/src/garbage_collector.rs b/compactor/src/garbage_collector.rs index 7d73116a64..9099df431d 100644 --- a/compactor/src/garbage_collector.rs +++ b/compactor/src/garbage_collector.rs @@ -94,9 +94,9 @@ impl GarbageCollector { mod tests { use super::*; use data_types::{KafkaPartition, ParquetFile, ParquetFileParams, SequenceNumber}; + use futures::{StreamExt, TryStreamExt}; use iox_catalog::interface::INITIAL_COMPACTION_LEVEL; use iox_tests::util::TestCatalog; - use object_store::ObjectStoreTestConvenience; use std::time::Duration; use uuid::Uuid; @@ -211,7 +211,9 @@ mod tests { 1 ); - assert_eq!(catalog.object_store.list_all().await.unwrap().len(), 1); + let list = catalog.object_store.list(None).await.unwrap(); + let obj_store_paths: Vec<_> = list.try_collect().await.unwrap(); + assert_eq!(obj_store_paths.len(), 1); } #[tokio::test] @@ -294,7 +296,10 @@ mod tests { .unwrap(), 1 ); - assert_eq!(catalog.object_store.list_all().await.unwrap().len(), 1); + + let list = catalog.object_store.list(None).await.unwrap(); + let obj_store_paths: Vec<_> = list.try_collect().await.unwrap(); + assert_eq!(obj_store_paths.len(), 1); } #[tokio::test] @@ -377,6 +382,7 @@ mod tests { .unwrap(), 0 ); - assert!(catalog.object_store.list_all().await.unwrap().is_empty()); + let mut list = catalog.object_store.list(None).await.unwrap(); + assert!(list.next().await.is_none()); } } diff --git a/influxdb_iox/Cargo.toml b/influxdb_iox/Cargo.toml index f608ce92c2..90c8e96d75 100644 --- a/influxdb_iox/Cargo.toml +++ b/influxdb_iox/Cargo.toml @@ -22,7 +22,7 @@ ioxd_router = { path = "../ioxd_router"} ioxd_querier = { path = "../ioxd_querier"} ioxd_test = { path = "../ioxd_test"} metric = { path = "../metric" } -object_store = { path = "../object_store" } +object_store = "0.0.1" object_store_metrics = { path = "../object_store_metrics" } observability_deps = { path = "../observability_deps" } panic_logging = { path = "../panic_logging" } diff --git a/ingester/Cargo.toml b/ingester/Cargo.toml index 1a2829e822..a5368fdbde 100644 --- a/ingester/Cargo.toml +++ b/ingester/Cargo.toml @@ -23,7 +23,7 @@ iox_catalog = { path = "../iox_catalog" } metric = { path = "../metric" } mutable_batch = { path = "../mutable_batch"} mutable_batch_lp = { path = "../mutable_batch_lp" } -object_store = { path = "../object_store" } +object_store = "0.0.1" observability_deps = { path = "../observability_deps" } parking_lot = "0.12" parquet_file = { path = "../parquet_file" } diff --git a/ingester/src/persist.rs b/ingester/src/persist.rs index ec401aad79..91f6678867 100644 --- a/ingester/src/persist.rs +++ b/ingester/src/persist.rs @@ -82,10 +82,10 @@ pub async fn persist( mod tests { use super::*; use data_types::{NamespaceId, PartitionId, SequenceNumber, SequencerId, TableId}; + use futures::{StreamExt, TryStreamExt}; use iox_catalog::interface::INITIAL_COMPACTION_LEVEL; use iox_time::Time; use object_store::memory::InMemory; - use object_store::ObjectStoreTestConvenience; use query::test::{raw_data, TestChunk}; use std::sync::Arc; use uuid::Uuid; @@ -124,7 +124,8 @@ mod tests { .await .unwrap(); - assert!(object_store.list_all().await.unwrap().is_empty()); + let mut list = object_store.list(None).await.unwrap(); + assert!(list.next().await.is_none()); } #[tokio::test] @@ -165,7 +166,8 @@ mod tests { .await .unwrap(); - let obj_store_paths = object_store.list_all().await.unwrap(); + let list = object_store.list(None).await.unwrap(); + let obj_store_paths: Vec<_> = list.try_collect().await.unwrap(); assert_eq!(obj_store_paths.len(), 1); } } diff --git a/iox_tests/Cargo.toml b/iox_tests/Cargo.toml index dd341f5ac0..588b7e2765 100644 --- a/iox_tests/Cargo.toml +++ b/iox_tests/Cargo.toml @@ -14,7 +14,7 @@ iox_catalog = { path = "../iox_catalog" } iox_time = { path = "../iox_time" } metric = { path = "../metric" } mutable_batch_lp = { path = "../mutable_batch_lp" } -object_store = { path = "../object_store" } +object_store = "0.0.1" parquet_file = { path = "../parquet_file" } query = { path = "../query" } schema = { path = "../schema" } diff --git a/ioxd_compactor/Cargo.toml b/ioxd_compactor/Cargo.toml index fb5f9b021d..c733a78965 100644 --- a/ioxd_compactor/Cargo.toml +++ b/ioxd_compactor/Cargo.toml @@ -14,7 +14,7 @@ iox_catalog = { path = "../iox_catalog" } ioxd_common = { path = "../ioxd_common" } metric = { path = "../metric" } query = { path = "../query" } -object_store = { path = "../object_store" } +object_store = "0.0.1" iox_time = { path = "../iox_time" } trace = { path = "../trace" } diff --git a/ioxd_ingester/Cargo.toml b/ioxd_ingester/Cargo.toml index 62c23125b3..34151d9f4a 100644 --- a/ioxd_ingester/Cargo.toml +++ b/ioxd_ingester/Cargo.toml @@ -11,7 +11,7 @@ ingester = { path = "../ingester" } iox_catalog = { path = "../iox_catalog" } ioxd_common = { path = "../ioxd_common" } metric = { path = "../metric" } -object_store = { path = "../object_store" } +object_store = "0.0.1" query = { path = "../query" } trace = { path = "../trace" } write_buffer = { path = "../write_buffer" } diff --git a/ioxd_querier/Cargo.toml b/ioxd_querier/Cargo.toml index 4799e98077..14d9ce570b 100644 --- a/ioxd_querier/Cargo.toml +++ b/ioxd_querier/Cargo.toml @@ -12,7 +12,7 @@ generated_types = { path = "../generated_types" } iox_catalog = { path = "../iox_catalog" } ioxd_common = { path = "../ioxd_common" } metric = { path = "../metric" } -object_store = { path = "../object_store" } +object_store = "0.0.1" querier = { path = "../querier" } query = { path = "../query" } service_grpc_flight = { path = "../service_grpc_flight" } diff --git a/ioxd_router/Cargo.toml b/ioxd_router/Cargo.toml index 401a50ec55..ea58c32f12 100644 --- a/ioxd_router/Cargo.toml +++ b/ioxd_router/Cargo.toml @@ -11,7 +11,7 @@ iox_catalog = { path = "../iox_catalog" } ioxd_common = { path = "../ioxd_common" } metric = { path = "../metric" } mutable_batch = { path = "../mutable_batch" } -object_store = { path = "../object_store" } +object_store = "0.0.1" observability_deps = { path = "../observability_deps" } router = { path = "../router" } trace = { path = "../trace" } diff --git a/object_store/Cargo.toml b/object_store/Cargo.toml deleted file mode 100644 index 1db0ed4421..0000000000 --- a/object_store/Cargo.toml +++ /dev/null @@ -1,48 +0,0 @@ -[package] -name = "object_store" -version = "0.1.0" -authors = ["Paul Dix "] -edition = "2021" - -[dependencies] # In alphabetical order -async-trait = "0.1.53" -# Microsoft Azure Blob storage integration -azure_core = { version = "0.2", optional = true, default-features = false, features = ["enable_reqwest_rustls"] } -azure_storage = { version = "0.2", optional = true, default-features = false, features = ["account"] } -azure_storage_blobs = { version = "0.2", optional = true, default-features = false, features = ["enable_reqwest_rustls"] } -bytes = "1.0" -chrono = { version = "0.4", default-features = false, features = ["clock"] } -# Google Cloud Storage integration -cloud-storage = { version = "0.11.1", optional = true, default-features = false, features = ["rustls-tls"] } -futures = "0.3" -# for rusoto -hyper = { version = "0.14", optional = true, default-features = false } -# for rusoto -hyper-rustls = { version = "0.23.0", optional = true, default-features = false, features = ["webpki-tokio", "http1", "http2", "tls12"] } -itertools = "0.10.1" -observability_deps = { path = "../observability_deps", optional = true } -percent-encoding = "2.1" -# rusoto crates are for Amazon S3 integration -rusoto_core = { version = "0.48.0", optional = true, default-features = false, features = ["rustls"] } -rusoto_credential = { version = "0.48.0", optional = true, default-features = false } -rusoto_s3 = { version = "0.48.0", optional = true, default-features = false, features = ["rustls"] } -snafu = "0.7" -tokio = { version = "1.18", features = ["fs", "io-util", "macros", "parking_lot", "rt-multi-thread", "time"] } -# Filesystem integration -tokio-util = { version = "0.7.1", features = ["codec", "io"] } -reqwest = { version = "0.11", optional = true, default-features = false, features = ["rustls-tls"] } -# Filesystem integration -walkdir = "2" -tempfile = "3.1.0" -workspace-hack = { path = "../workspace-hack" } - -[features] -azure = ["azure_core", "azure_storage_blobs", "azure_storage", "reqwest"] -azure_test = ["azure", "azure_core/azurite_workaround", "azure_storage/azurite_workaround", "azure_storage_blobs/azurite_workaround"] -gcp = ["cloud-storage"] -aws = ["rusoto_core", "rusoto_credential", "rusoto_s3", "hyper", "hyper-rustls", "observability_deps"] - -[dev-dependencies] # In alphabetical order -dotenv = "0.15.0" -tempfile = "3.1.0" -futures-test = "0.3" diff --git a/object_store/src/aws.rs b/object_store/src/aws.rs deleted file mode 100644 index 8e38418341..0000000000 --- a/object_store/src/aws.rs +++ /dev/null @@ -1,940 +0,0 @@ -//! This module contains the IOx implementation for using S3 as the object -//! store. -use crate::{ - path::{Path, DELIMITER}, - GetResult, ListResult, ObjectMeta, ObjectStoreApi, Result, -}; -use async_trait::async_trait; -use bytes::Bytes; -use chrono::{DateTime, Utc}; -use futures::{ - stream::{self, BoxStream}, - Future, StreamExt, TryStreamExt, -}; -use hyper::client::Builder as HyperBuilder; -use observability_deps::tracing::{debug, warn}; -use rusoto_core::ByteStream; -use rusoto_credential::{InstanceMetadataProvider, StaticProvider}; -use rusoto_s3::S3; -use snafu::{OptionExt, ResultExt, Snafu}; -use std::{convert::TryFrom, fmt, num::NonZeroUsize, ops::Deref, sync::Arc, time::Duration}; -use tokio::sync::{OwnedSemaphorePermit, Semaphore}; - -/// The maximum number of times a request will be retried in the case of an AWS server error -pub const MAX_NUM_RETRIES: u32 = 3; - -/// A specialized `Error` for object store-related errors -#[derive(Debug, Snafu)] -#[allow(missing_docs)] -enum Error { - #[snafu(display("Expected streamed data to have length {}, got {}", expected, actual))] - DataDoesNotMatchLength { expected: usize, actual: usize }, - - #[snafu(display("Did not receive any data. Bucket: {}, Location: {}", bucket, path))] - NoData { bucket: String, path: String }, - - #[snafu(display( - "Unable to DELETE data. Bucket: {}, Location: {}, Error: {} ({:?})", - bucket, - path, - source, - source, - ))] - UnableToDeleteData { - source: rusoto_core::RusotoError, - bucket: String, - path: String, - }, - - #[snafu(display( - "Unable to GET data. Bucket: {}, Location: {}, Error: {} ({:?})", - bucket, - path, - source, - source, - ))] - UnableToGetData { - source: rusoto_core::RusotoError, - bucket: String, - path: String, - }, - - #[snafu(display( - "Unable to HEAD data. Bucket: {}, Location: {}, Error: {} ({:?})", - bucket, - path, - source, - source, - ))] - UnableToHeadData { - source: rusoto_core::RusotoError, - bucket: String, - path: String, - }, - - #[snafu(display( - "Unable to GET part of the data. Bucket: {}, Location: {}, Error: {} ({:?})", - bucket, - path, - source, - source, - ))] - UnableToGetPieceOfData { - source: std::io::Error, - bucket: String, - path: String, - }, - - #[snafu(display( - "Unable to PUT data. Bucket: {}, Location: {}, Error: {} ({:?})", - bucket, - path, - source, - source, - ))] - UnableToPutData { - source: rusoto_core::RusotoError, - bucket: String, - path: String, - }, - - #[snafu(display( - "Unable to list data. Bucket: {}, Error: {} ({:?})", - bucket, - source, - source, - ))] - UnableToListData { - source: rusoto_core::RusotoError, - bucket: String, - }, - - #[snafu(display( - "Unable to parse last modified date. Bucket: {}, Error: {} ({:?})", - bucket, - source, - source, - ))] - UnableToParseLastModified { - source: chrono::ParseError, - bucket: String, - }, - - #[snafu(display( - "Unable to buffer data into temporary file, Error: {} ({:?})", - source, - source, - ))] - UnableToBufferStream { source: std::io::Error }, - - #[snafu(display( - "Could not parse `{}` as an AWS region. Regions should look like `us-east-2`. {} ({:?})", - region, - source, - source, - ))] - InvalidRegion { - region: String, - source: rusoto_core::region::ParseRegionError, - }, - - #[snafu(display("Missing aws-access-key"))] - MissingAccessKey, - - #[snafu(display("Missing aws-secret-access-key"))] - MissingSecretAccessKey, - - NotFound { - path: String, - source: Box, - }, -} - -impl From for super::Error { - fn from(source: Error) -> Self { - match source { - Error::NotFound { path, source } => Self::NotFound { path, source }, - _ => Self::Generic { - store: "S3", - source: Box::new(source), - }, - } - } -} - -/// Configuration for connecting to [Amazon S3](https://aws.amazon.com/s3/). -pub struct AmazonS3 { - /// S3 client w/o any connection limit. - /// - /// You should normally use [`Self::client`] instead. - client_unrestricted: rusoto_s3::S3Client, - - /// Semaphore that limits the usage of [`client_unrestricted`](Self::client_unrestricted). - connection_semaphore: Arc, - - /// Bucket name used by this object store client. - bucket_name: String, -} - -impl fmt::Debug for AmazonS3 { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("AmazonS3") - .field("client", &"rusoto_s3::S3Client") - .field("bucket_name", &self.bucket_name) - .finish() - } -} - -impl fmt::Display for AmazonS3 { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "AmazonS3({})", self.bucket_name) - } -} - -#[async_trait] -impl ObjectStoreApi for AmazonS3 { - async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> { - let bucket_name = self.bucket_name.clone(); - let key = location.to_raw(); - let request_factory = move || { - let bytes = bytes.clone(); - - let length = bytes.len(); - let stream_data = std::io::Result::Ok(bytes); - let stream = futures::stream::once(async move { stream_data }); - let byte_stream = ByteStream::new_with_size(stream, length); - - rusoto_s3::PutObjectRequest { - bucket: bucket_name.clone(), - key: key.to_string(), - body: Some(byte_stream), - ..Default::default() - } - }; - - let s3 = self.client().await; - - s3_request(move || { - let (s3, request_factory) = (s3.clone(), request_factory.clone()); - - async move { s3.put_object(request_factory()).await } - }) - .await - .context(UnableToPutDataSnafu { - bucket: &self.bucket_name, - path: location.to_raw(), - })?; - - Ok(()) - } - - async fn get(&self, location: &Path) -> Result { - let key = location.to_raw().to_string(); - let get_request = rusoto_s3::GetObjectRequest { - bucket: self.bucket_name.clone(), - key: key.clone(), - ..Default::default() - }; - let bucket_name = self.bucket_name.clone(); - let s = self - .client() - .await - .get_object(get_request) - .await - .map_err(|e| match e { - rusoto_core::RusotoError::Service(rusoto_s3::GetObjectError::NoSuchKey(_)) => { - Error::NotFound { - path: key.clone(), - source: e.into(), - } - } - _ => Error::UnableToGetData { - bucket: self.bucket_name.to_owned(), - path: key.clone(), - source: e, - }, - })? - .body - .context(NoDataSnafu { - bucket: self.bucket_name.to_owned(), - path: key.clone(), - })? - .map_err(move |source| Error::UnableToGetPieceOfData { - source, - bucket: bucket_name.clone(), - path: key.clone(), - }) - .err_into() - .boxed(); - - Ok(GetResult::Stream(s)) - } - - async fn head(&self, location: &Path) -> Result { - let key = location.to_raw().to_string(); - let head_request = rusoto_s3::HeadObjectRequest { - bucket: self.bucket_name.clone(), - key: key.clone(), - ..Default::default() - }; - let s = self - .client() - .await - .head_object(head_request) - .await - .map_err(|e| match e { - rusoto_core::RusotoError::Service(rusoto_s3::HeadObjectError::NoSuchKey(_)) => { - Error::NotFound { - path: key.clone(), - source: e.into(), - } - } - rusoto_core::RusotoError::Unknown(h) if h.status.as_u16() == 404 => { - Error::NotFound { - path: key.clone(), - source: "resource not found".into(), - } - } - _ => Error::UnableToHeadData { - bucket: self.bucket_name.to_owned(), - path: key.clone(), - source: e, - }, - })?; - - // Note: GetObject and HeadObject return a different date format from ListObjects - // - // S3 List returns timestamps in the form - // 2013-09-17T18:07:53.000Z - // S3 GetObject returns timestamps in the form - // Last-Modified: Sun, 1 Jan 2006 12:00:00 GMT - let last_modified = match s.last_modified { - Some(lm) => DateTime::parse_from_rfc2822(&lm) - .context(UnableToParseLastModifiedSnafu { - bucket: &self.bucket_name, - })? - .with_timezone(&Utc), - None => Utc::now(), - }; - - Ok(ObjectMeta { - last_modified, - location: location.clone(), - size: usize::try_from(s.content_length.unwrap_or(0)) - .expect("unsupported size on this platform"), - }) - } - - async fn delete(&self, location: &Path) -> Result<()> { - let key = location.to_raw(); - let bucket_name = self.bucket_name.clone(); - - let request_factory = move || rusoto_s3::DeleteObjectRequest { - bucket: bucket_name.clone(), - key: key.to_string(), - ..Default::default() - }; - - let s3 = self.client().await; - - s3_request(move || { - let (s3, request_factory) = (s3.clone(), request_factory.clone()); - - async move { s3.delete_object(request_factory()).await } - }) - .await - .context(UnableToDeleteDataSnafu { - bucket: &self.bucket_name, - path: location.to_raw(), - })?; - - Ok(()) - } - - async fn list<'a>( - &'a self, - prefix: Option<&'a Path>, - ) -> Result>> { - Ok(self - .list_objects_v2(prefix, None) - .await? - .map_ok(move |list_objects_v2_result| { - let contents = list_objects_v2_result.contents.unwrap_or_default(); - let iter = contents - .into_iter() - .map(|object| convert_object_meta(object, &self.bucket_name)); - - futures::stream::iter(iter) - }) - .try_flatten() - .boxed()) - } - - async fn list_with_delimiter(&self, prefix: &Path) -> Result { - Ok(self - .list_objects_v2(Some(prefix), Some(DELIMITER.to_string())) - .await? - .try_fold( - ListResult { - next_token: None, - common_prefixes: vec![], - objects: vec![], - }, - |acc, list_objects_v2_result| async move { - let mut res = acc; - let contents = list_objects_v2_result.contents.unwrap_or_default(); - let mut objects = contents - .into_iter() - .map(|object| convert_object_meta(object, &self.bucket_name)) - .collect::>>()?; - - res.objects.append(&mut objects); - - res.common_prefixes.extend( - list_objects_v2_result - .common_prefixes - .unwrap_or_default() - .into_iter() - .map(|p| { - Path::from_raw( - p.prefix.expect("can't have a prefix without a value"), - ) - }), - ); - - Ok(res) - }, - ) - .await?) - } -} - -fn convert_object_meta(object: rusoto_s3::Object, bucket: &str) -> Result { - let location = Path::from_raw(object.key.expect("object doesn't exist without a key")); - let last_modified = match object.last_modified { - Some(lm) => DateTime::parse_from_rfc3339(&lm) - .context(UnableToParseLastModifiedSnafu { bucket })? - .with_timezone(&Utc), - None => Utc::now(), - }; - let size = - usize::try_from(object.size.unwrap_or(0)).expect("unsupported size on this platform"); - - Ok(ObjectMeta { - location, - last_modified, - size, - }) -} - -/// Configure a connection to Amazon S3 using the specified credentials in -/// the specified Amazon region and bucket. -#[allow(clippy::too_many_arguments)] -pub fn new_s3( - access_key_id: Option>, - secret_access_key: Option>, - region: impl Into, - bucket_name: impl Into, - endpoint: Option>, - session_token: Option>, - max_connections: NonZeroUsize, - allow_http: bool, -) -> Result { - let region = region.into(); - let region: rusoto_core::Region = match endpoint { - None => region.parse().context(InvalidRegionSnafu { region })?, - Some(endpoint) => rusoto_core::Region::Custom { - name: region, - endpoint: endpoint.into(), - }, - }; - - let mut builder = HyperBuilder::default(); - builder.pool_max_idle_per_host(max_connections.get()); - - let connector = if allow_http { - hyper_rustls::HttpsConnectorBuilder::new() - .with_webpki_roots() - .https_or_http() - .enable_http1() - .enable_http2() - .build() - } else { - hyper_rustls::HttpsConnectorBuilder::new() - .with_webpki_roots() - .https_only() - .enable_http1() - .enable_http2() - .build() - }; - - let http_client = rusoto_core::request::HttpClient::from_builder(builder, connector); - - let client = match (access_key_id, secret_access_key, session_token) { - (Some(access_key_id), Some(secret_access_key), Some(session_token)) => { - let credentials_provider = StaticProvider::new( - access_key_id.into(), - secret_access_key.into(), - Some(session_token.into()), - None, - ); - rusoto_s3::S3Client::new_with(http_client, credentials_provider, region) - } - (Some(access_key_id), Some(secret_access_key), None) => { - let credentials_provider = - StaticProvider::new_minimal(access_key_id.into(), secret_access_key.into()); - rusoto_s3::S3Client::new_with(http_client, credentials_provider, region) - } - (None, Some(_), _) => return Err(Error::MissingAccessKey.into()), - (Some(_), None, _) => return Err(Error::MissingSecretAccessKey.into()), - _ => { - let credentials_provider = InstanceMetadataProvider::new(); - rusoto_s3::S3Client::new_with(http_client, credentials_provider, region) - } - }; - - Ok(AmazonS3 { - client_unrestricted: client, - connection_semaphore: Arc::new(Semaphore::new(max_connections.get())), - bucket_name: bucket_name.into(), - }) -} - -/// Create a new [`AmazonS3`] that always errors -pub fn new_failing_s3() -> Result { - new_s3( - Some("foo"), - Some("bar"), - "us-east-1", - "bucket", - None as Option<&str>, - None as Option<&str>, - NonZeroUsize::new(16).unwrap(), - true, - ) -} - -/// S3 client bundled w/ a semaphore permit. -#[derive(Clone)] -struct SemaphoreClient { - /// Permit for this specific use of the client. - /// - /// Note that this field is never read and therefore considered "dead code" by rustc. - #[allow(dead_code)] - permit: Arc, - - inner: rusoto_s3::S3Client, -} - -impl Deref for SemaphoreClient { - type Target = rusoto_s3::S3Client; - - fn deref(&self) -> &Self::Target { - &self.inner - } -} - -impl AmazonS3 { - /// Get a client according to the current connection limit. - async fn client(&self) -> SemaphoreClient { - let permit = Arc::clone(&self.connection_semaphore) - .acquire_owned() - .await - .expect("semaphore shouldn't be closed yet"); - SemaphoreClient { - permit: Arc::new(permit), - inner: self.client_unrestricted.clone(), - } - } - - async fn list_objects_v2( - &self, - prefix: Option<&Path>, - delimiter: Option, - ) -> Result>> { - #[derive(Clone)] - enum ListState { - Start, - HasMore(String), - Done, - } - use ListState::*; - - let raw_prefix = prefix.map(|p| format!("{}{}", p.to_raw(), DELIMITER)); - let bucket = self.bucket_name.clone(); - - let request_factory = move || rusoto_s3::ListObjectsV2Request { - bucket, - prefix: raw_prefix, - delimiter, - ..Default::default() - }; - let s3 = self.client().await; - - Ok(stream::unfold(ListState::Start, move |state| { - let request_factory = request_factory.clone(); - let s3 = s3.clone(); - - async move { - let continuation_token = match state.clone() { - HasMore(continuation_token) => Some(continuation_token), - Done => { - return None; - } - // If this is the first request we've made, we don't need to make any - // modifications to the request - Start => None, - }; - - let resp = s3_request(move || { - let (s3, request_factory, continuation_token) = ( - s3.clone(), - request_factory.clone(), - continuation_token.clone(), - ); - - async move { - s3.list_objects_v2(rusoto_s3::ListObjectsV2Request { - continuation_token, - ..request_factory() - }) - .await - } - }) - .await; - - let resp = match resp { - Ok(resp) => resp, - Err(e) => return Some((Err(e), state)), - }; - - // The AWS response contains a field named `is_truncated` as well as - // `next_continuation_token`, and we're assuming that `next_continuation_token` - // is only set when `is_truncated` is true (and therefore not - // checking `is_truncated`). - let next_state = - if let Some(next_continuation_token) = &resp.next_continuation_token { - ListState::HasMore(next_continuation_token.to_string()) - } else { - ListState::Done - }; - - Some((Ok(resp), next_state)) - } - }) - .map_err(move |e| { - Error::UnableToListData { - source: e, - bucket: self.bucket_name.clone(), - } - .into() - }) - .boxed()) - } -} - -/// Handles retrying a request to S3 up to `MAX_NUM_RETRIES` times if S3 returns 5xx server errors. -/// -/// The `future_factory` argument is a function `F` that takes no arguments and, when called, will -/// return a `Future` (type `G`) that, when `await`ed, will perform a request to S3 through -/// `rusoto` and return a `Result` that returns some type `R` on success and some -/// `rusoto_core::RusotoError` on error. -/// -/// If the executed `Future` returns success, this function will return that success. -/// If the executed `Future` returns a 5xx server error, this function will wait an amount of -/// time that increases exponentially with the number of times it has retried, get a new `Future` by -/// calling `future_factory` again, and retry the request by `await`ing the `Future` again. -/// The retries will continue until the maximum number of retries has been attempted. In that case, -/// this function will return the last encountered error. -/// -/// Client errors (4xx) will never be retried by this function. -async fn s3_request(future_factory: F) -> Result> -where - E: std::error::Error + Send, - F: Fn() -> G + Send, - G: Future>> + Send, - R: Send, -{ - let mut attempts = 0; - - loop { - let request = future_factory(); - - let result = request.await; - - match result { - Ok(r) => return Ok(r), - Err(error) => { - attempts += 1; - - let should_retry = matches!( - error, - rusoto_core::RusotoError::Unknown(ref response) - if response.status.is_server_error() - ); - - if attempts > MAX_NUM_RETRIES { - warn!( - ?error, - attempts, "maximum number of retries exceeded for AWS S3 request" - ); - return Err(error); - } else if !should_retry { - return Err(error); - } else { - debug!(?error, attempts, "retrying AWS S3 request"); - let wait_time = Duration::from_millis(2u64.pow(attempts) * 50); - tokio::time::sleep(wait_time).await; - } - } - } - } -} - -impl Error { - #[cfg(test)] - fn s3_error_due_to_credentials(&self) -> bool { - use rusoto_core::RusotoError; - use Error::*; - - matches!( - self, - UnableToPutData { - source: RusotoError::Credentials(_), - bucket: _, - path: _, - } | UnableToGetData { - source: RusotoError::Credentials(_), - bucket: _, - path: _, - } | UnableToDeleteData { - source: RusotoError::Credentials(_), - bucket: _, - path: _, - } | UnableToListData { - source: RusotoError::Credentials(_), - bucket: _, - } - ) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::{ - tests::{ - get_nonexistent_object, list_uses_directories_correctly, list_with_delimiter, - put_get_delete_list, - }, - Error as ObjectStoreError, ObjectStoreApi, - }; - use bytes::Bytes; - use std::env; - - type TestError = Box; - type Result = std::result::Result; - - const NON_EXISTENT_NAME: &str = "nonexistentname"; - - #[derive(Debug)] - struct AwsConfig { - access_key_id: String, - secret_access_key: String, - region: String, - bucket: String, - endpoint: Option, - token: Option, - } - - // Helper macro to skip tests if TEST_INTEGRATION and the AWS environment variables are not set. - macro_rules! maybe_skip_integration { - () => {{ - dotenv::dotenv().ok(); - - let required_vars = [ - "AWS_DEFAULT_REGION", - "OBJECT_STORE_BUCKET", - "AWS_ACCESS_KEY_ID", - "AWS_SECRET_ACCESS_KEY", - ]; - let unset_vars: Vec<_> = required_vars - .iter() - .filter_map(|&name| match env::var(name) { - Ok(_) => None, - Err(_) => Some(name), - }) - .collect(); - let unset_var_names = unset_vars.join(", "); - - let force = env::var("TEST_INTEGRATION"); - - if force.is_ok() && !unset_var_names.is_empty() { - panic!( - "TEST_INTEGRATION is set, \ - but variable(s) {} need to be set", - unset_var_names - ); - } else if force.is_err() { - eprintln!( - "skipping AWS integration test - set {}TEST_INTEGRATION to run", - if unset_var_names.is_empty() { - String::new() - } else { - format!("{} and ", unset_var_names) - } - ); - return; - } else { - AwsConfig { - access_key_id: env::var("AWS_ACCESS_KEY_ID") - .expect("already checked AWS_ACCESS_KEY_ID"), - secret_access_key: env::var("AWS_SECRET_ACCESS_KEY") - .expect("already checked AWS_SECRET_ACCESS_KEY"), - region: env::var("AWS_DEFAULT_REGION") - .expect("already checked AWS_DEFAULT_REGION"), - bucket: env::var("OBJECT_STORE_BUCKET") - .expect("already checked OBJECT_STORE_BUCKET"), - endpoint: env::var("AWS_ENDPOINT").ok(), - token: env::var("AWS_SESSION_TOKEN").ok(), - } - } - }}; - } - - fn check_credentials(r: Result) -> Result { - if let Err(e) = &r { - let e = &**e; - if let Some(e) = e.downcast_ref::() { - if e.s3_error_due_to_credentials() { - eprintln!( - "Try setting the AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY \ - environment variables" - ); - } - } - } - - r - } - - fn make_integration(config: AwsConfig) -> AmazonS3 { - new_s3( - Some(config.access_key_id), - Some(config.secret_access_key), - config.region, - config.bucket, - config.endpoint, - config.token, - NonZeroUsize::new(16).unwrap(), - true, - ) - .expect("Valid S3 config") - } - - #[tokio::test] - async fn s3_test() { - let config = maybe_skip_integration!(); - let integration = make_integration(config); - - check_credentials(put_get_delete_list(&integration).await).unwrap(); - check_credentials(list_uses_directories_correctly(&integration).await).unwrap(); - check_credentials(list_with_delimiter(&integration).await).unwrap(); - } - - #[tokio::test] - async fn s3_test_get_nonexistent_location() { - let config = maybe_skip_integration!(); - let integration = make_integration(config); - - let location = Path::from_iter([NON_EXISTENT_NAME]); - - let err = get_nonexistent_object(&integration, Some(location)) - .await - .unwrap_err(); - if let Some(ObjectStoreError::NotFound { path, source }) = - err.downcast_ref::() - { - let source_variant = source.downcast_ref::>(); - assert!( - matches!( - source_variant, - Some(rusoto_core::RusotoError::Service( - rusoto_s3::GetObjectError::NoSuchKey(_) - )), - ), - "got: {:?}", - source_variant - ); - assert_eq!(path, NON_EXISTENT_NAME); - } else { - panic!("unexpected error type: {:?}", err); - } - } - - #[tokio::test] - async fn s3_test_get_nonexistent_bucket() { - let mut config = maybe_skip_integration!(); - config.bucket = NON_EXISTENT_NAME.into(); - let integration = make_integration(config); - - let location = Path::from_iter([NON_EXISTENT_NAME]); - - let err = integration.get(&location).await.unwrap_err().to_string(); - assert!( - err.contains("The specified bucket does not exist"), - "{}", - err - ) - } - - #[tokio::test] - async fn s3_test_put_nonexistent_bucket() { - let mut config = maybe_skip_integration!(); - config.bucket = NON_EXISTENT_NAME.into(); - let integration = make_integration(config); - - let location = Path::from_iter([NON_EXISTENT_NAME]); - let data = Bytes::from("arbitrary data"); - - let err = integration - .put(&location, data) - .await - .unwrap_err() - .to_string(); - - assert!( - err.contains("The specified bucket does not exist") - && err.contains("Unable to PUT data"), - "{}", - err - ) - } - - #[tokio::test] - async fn s3_test_delete_nonexistent_location() { - let config = maybe_skip_integration!(); - let integration = make_integration(config); - - let location = Path::from_iter([NON_EXISTENT_NAME]); - - integration.delete(&location).await.unwrap(); - } - - #[tokio::test] - async fn s3_test_delete_nonexistent_bucket() { - let mut config = maybe_skip_integration!(); - config.bucket = NON_EXISTENT_NAME.into(); - let integration = make_integration(config); - - let location = Path::from_iter([NON_EXISTENT_NAME]); - - let err = integration.delete(&location).await.unwrap_err().to_string(); - assert!( - err.contains("The specified bucket does not exist") - && err.contains("Unable to DELETE data"), - "{}", - err - ) - } -} diff --git a/object_store/src/azure.rs b/object_store/src/azure.rs deleted file mode 100644 index 6e107bea39..0000000000 --- a/object_store/src/azure.rs +++ /dev/null @@ -1,376 +0,0 @@ -//! This module contains the IOx implementation for using Azure Blob storage as -//! the object store. -use crate::{ - path::{Path, DELIMITER}, - GetResult, ListResult, ObjectMeta, ObjectStoreApi, Result, -}; -use async_trait::async_trait; -use azure_core::{prelude::*, HttpClient}; -use azure_storage::core::prelude::*; -use azure_storage_blobs::blob::Blob; -use azure_storage_blobs::{ - prelude::{AsBlobClient, AsContainerClient, ContainerClient}, - DeleteSnapshotsMethod, -}; -use bytes::Bytes; -use futures::{ - stream::{self, BoxStream}, - StreamExt, TryStreamExt, -}; -use snafu::{ResultExt, Snafu}; -use std::{convert::TryInto, sync::Arc}; - -/// A specialized `Error` for Azure object store-related errors -#[derive(Debug, Snafu)] -#[allow(missing_docs)] -enum Error { - #[snafu(display("Unable to DELETE data. Location: {}, Error: {}", path, source,))] - Delete { - source: Box, - path: String, - }, - - #[snafu(display("Unable to GET data. Location: {}, Error: {}", path, source,))] - Get { - source: Box, - path: String, - }, - - #[snafu(display("Unable to HEAD data. Location: {}, Error: {}", path, source,))] - Head { - source: Box, - path: String, - }, - - #[snafu(display("Unable to PUT data. Location: {}, Error: {}", path, source,))] - Put { - source: Box, - path: String, - }, - - #[snafu(display("Unable to list data. Error: {}", source))] - List { - source: Box, - }, - - #[cfg(not(feature = "azure_test"))] - #[snafu(display( - "Azurite (azure emulator) support not compiled in, please add `azure_test` feature" - ))] - NoEmulatorFeature, -} - -impl From for super::Error { - fn from(source: Error) -> Self { - Self::Generic { - store: "Azure Blob Storage", - source: Box::new(source), - } - } -} - -/// Configuration for connecting to [Microsoft Azure Blob Storage](https://azure.microsoft.com/en-us/services/storage/blobs/). -#[derive(Debug)] -pub struct MicrosoftAzure { - container_client: Arc, - #[allow(dead_code)] - container_name: String, -} - -impl std::fmt::Display for MicrosoftAzure { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "MicrosoftAzure({})", self.container_name) - } -} - -#[async_trait] -impl ObjectStoreApi for MicrosoftAzure { - async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> { - let location = location.to_raw(); - - let bytes = bytes::BytesMut::from(&*bytes); - - self.container_client - .as_blob_client(location) - .put_block_blob(bytes) - .execute() - .await - .context(PutSnafu { - path: location.to_owned(), - })?; - - Ok(()) - } - - async fn get(&self, location: &Path) -> Result { - let blob = self - .container_client - .as_blob_client(location.to_raw()) - .get() - .execute() - .await - .context(GetSnafu { - path: location.to_raw(), - })?; - - Ok(GetResult::Stream( - futures::stream::once(async move { Ok(blob.data) }).boxed(), - )) - } - - async fn head(&self, location: &Path) -> Result { - let s = self - .container_client - .as_blob_client(location.to_raw()) - .get_properties() - .execute() - .await - .context(HeadSnafu { - path: location.to_raw(), - })?; - - convert_object_meta(s.blob) - } - - async fn delete(&self, location: &Path) -> Result<()> { - let location = location.to_raw(); - self.container_client - .as_blob_client(location) - .delete() - .delete_snapshots_method(DeleteSnapshotsMethod::Include) - .execute() - .await - .context(DeleteSnafu { - path: location.to_owned(), - })?; - - Ok(()) - } - - async fn list<'a>( - &'a self, - prefix: Option<&'a Path>, - ) -> Result>> { - #[derive(Clone)] - enum ListState { - Start, - HasMore(String), - Done, - } - - Ok(stream::unfold(ListState::Start, move |state| async move { - let mut request = self.container_client.list_blobs(); - - let prefix_raw = prefix.map(|p| format!("{}{}", p.to_raw(), DELIMITER)); - if let Some(ref p) = prefix_raw { - request = request.prefix(p as &str); - } - - match state { - ListState::HasMore(ref marker) => { - request = request.next_marker(marker as &str); - } - ListState::Done => { - return None; - } - ListState::Start => {} - } - - let resp = match request.execute().await.context(ListSnafu) { - Ok(resp) => resp, - Err(err) => return Some((Err(crate::Error::from(err)), state)), - }; - - let next_state = if let Some(marker) = resp.next_marker { - ListState::HasMore(marker.as_str().to_string()) - } else { - ListState::Done - }; - - let names = resp.blobs.blobs.into_iter().map(convert_object_meta); - Some((Ok(futures::stream::iter(names)), next_state)) - }) - .try_flatten() - .boxed()) - } - - async fn list_with_delimiter(&self, prefix: &Path) -> Result { - let mut request = self.container_client.list_blobs(); - - let prefix_raw = format!("{}{}", prefix, DELIMITER); - - request = request.delimiter(Delimiter::new(DELIMITER)); - request = request.prefix(&*prefix_raw); - - let resp = request.execute().await.context(ListSnafu)?; - - let next_token = resp.next_marker.as_ref().map(|m| m.as_str().to_string()); - - let common_prefixes = resp - .blobs - .blob_prefix - .map(|prefixes| { - prefixes - .iter() - .map(|prefix| Path::from_raw(&prefix.name)) - .collect() - }) - .unwrap_or_else(Vec::new); - - let objects = resp - .blobs - .blobs - .into_iter() - .map(convert_object_meta) - .collect::>()?; - - Ok(ListResult { - next_token, - common_prefixes, - objects, - }) - } -} - -fn convert_object_meta(blob: Blob) -> Result { - let location = Path::from_raw(blob.name); - let last_modified = blob.properties.last_modified; - let size = blob - .properties - .content_length - .try_into() - .expect("unsupported size on this platform"); - - Ok(ObjectMeta { - location, - last_modified, - size, - }) -} - -#[cfg(feature = "azure_test")] -fn check_if_emulator_works() -> Result<()> { - Ok(()) -} - -#[cfg(not(feature = "azure_test"))] -fn check_if_emulator_works() -> Result<()> { - Err(Error::NoEmulatorFeature.into()) -} - -/// Configure a connection to container with given name on Microsoft Azure -/// Blob store. -/// -/// The credentials `account` and `access_key` must provide access to the -/// store. -pub fn new_azure( - account: impl Into, - access_key: impl Into, - container_name: impl Into, - use_emulator: bool, -) -> Result { - let account = account.into(); - let access_key = access_key.into(); - let http_client: Arc = Arc::new(reqwest::Client::new()); - - let storage_account_client = if use_emulator { - check_if_emulator_works()?; - StorageAccountClient::new_emulator_default() - } else { - StorageAccountClient::new_access_key(Arc::clone(&http_client), &account, &access_key) - }; - - let storage_client = storage_account_client.as_storage_client(); - - let container_name = container_name.into(); - - let container_client = storage_client.as_container_client(&container_name); - - Ok(MicrosoftAzure { - container_client, - container_name, - }) -} - -#[cfg(test)] -mod tests { - use crate::azure::new_azure; - use crate::tests::{list_uses_directories_correctly, list_with_delimiter, put_get_delete_list}; - use std::env; - - #[derive(Debug)] - struct AzureConfig { - storage_account: String, - access_key: String, - bucket: String, - use_emulator: bool, - } - - // Helper macro to skip tests if TEST_INTEGRATION and the Azure environment - // variables are not set. - macro_rules! maybe_skip_integration { - () => {{ - dotenv::dotenv().ok(); - - let use_emulator = std::env::var("AZURE_USE_EMULATOR").is_ok(); - - let mut required_vars = vec!["OBJECT_STORE_BUCKET"]; - if !use_emulator { - required_vars.push("AZURE_STORAGE_ACCOUNT"); - required_vars.push("AZURE_STORAGE_ACCESS_KEY"); - } - let unset_vars: Vec<_> = required_vars - .iter() - .filter_map(|&name| match env::var(name) { - Ok(_) => None, - Err(_) => Some(name), - }) - .collect(); - let unset_var_names = unset_vars.join(", "); - - let force = std::env::var("TEST_INTEGRATION"); - - if force.is_ok() && !unset_var_names.is_empty() { - panic!( - "TEST_INTEGRATION is set, \ - but variable(s) {} need to be set", - unset_var_names - ) - } else if force.is_err() { - eprintln!( - "skipping Azure integration test - set {}TEST_INTEGRATION to run", - if unset_var_names.is_empty() { - String::new() - } else { - format!("{} and ", unset_var_names) - } - ); - return; - } else { - AzureConfig { - storage_account: env::var("AZURE_STORAGE_ACCOUNT").unwrap_or_default(), - access_key: env::var("AZURE_STORAGE_ACCESS_KEY").unwrap_or_default(), - bucket: env::var("OBJECT_STORE_BUCKET") - .expect("already checked OBJECT_STORE_BUCKET"), - use_emulator, - } - } - }}; - } - - #[tokio::test] - async fn azure_blob_test() { - let config = maybe_skip_integration!(); - let integration = new_azure( - config.storage_account, - config.access_key, - config.bucket, - config.use_emulator, - ) - .unwrap(); - - put_get_delete_list(&integration).await.unwrap(); - list_uses_directories_correctly(&integration).await.unwrap(); - list_with_delimiter(&integration).await.unwrap(); - } -} diff --git a/object_store/src/disk.rs b/object_store/src/disk.rs deleted file mode 100644 index bf736c2afa..0000000000 --- a/object_store/src/disk.rs +++ /dev/null @@ -1,452 +0,0 @@ -//! This module contains the IOx implementation for using local disk as the -//! object store. -use crate::path::{Path, DELIMITER}; -use crate::{GetResult, ListResult, ObjectMeta, ObjectStoreApi, Result}; -use async_trait::async_trait; -use bytes::Bytes; -use futures::{ - stream::{self, BoxStream}, - StreamExt, -}; -use snafu::{OptionExt, ResultExt, Snafu}; -use std::{collections::BTreeSet, convert::TryFrom, io}; -use tokio::fs; -use walkdir::{DirEntry, WalkDir}; - -/// A specialized `Error` for filesystem object store-related errors -#[derive(Debug, Snafu)] -#[allow(missing_docs)] -pub enum Error { - #[snafu(display("File size for {} did not fit in a usize: {}", path, source))] - FileSizeOverflowedUsize { - source: std::num::TryFromIntError, - path: String, - }, - - #[snafu(display("Unable to walk dir: {}", source))] - UnableToWalkDir { - source: walkdir::Error, - }, - - #[snafu(display("Unable to access metadata for {}: {}", path, source))] - UnableToAccessMetadata { - source: Box, - path: String, - }, - - #[snafu(display("Unable to copy data to file: {}", source))] - UnableToCopyDataToFile { - source: io::Error, - }, - - #[snafu(display("Unable to create dir {}: {}", path.display(), source))] - UnableToCreateDir { - source: io::Error, - path: std::path::PathBuf, - }, - - #[snafu(display("Unable to create file {}: {}", path.display(), err))] - UnableToCreateFile { - path: std::path::PathBuf, - err: io::Error, - }, - - #[snafu(display("Unable to delete file {}: {}", path.display(), source))] - UnableToDeleteFile { - source: io::Error, - path: std::path::PathBuf, - }, - - #[snafu(display("Unable to open file {}: {}", path.display(), source))] - UnableToOpenFile { - source: io::Error, - path: std::path::PathBuf, - }, - - #[snafu(display("Unable to read data from file {}: {}", path.display(), source))] - UnableToReadBytes { - source: io::Error, - path: std::path::PathBuf, - }, - - NotFound { - path: String, - source: io::Error, - }, -} - -impl From for super::Error { - fn from(source: Error) -> Self { - match source { - Error::NotFound { path, source } => Self::NotFound { - path, - source: source.into(), - }, - _ => Self::Generic { - store: "LocalFileSystem", - source: Box::new(source), - }, - } - } -} - -/// Local filesystem storage suitable for testing or for opting out of using a -/// cloud storage provider. -#[derive(Debug)] -pub struct LocalFileSystem { - root: std::path::PathBuf, -} - -impl std::fmt::Display for LocalFileSystem { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "LocalFileSystem({})", self.root.display()) - } -} - -#[async_trait] -impl ObjectStoreApi for LocalFileSystem { - async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> { - let content = bytes::BytesMut::from(&*bytes); - - let path = self.path_to_filesystem(location); - - let mut file = match fs::File::create(&path).await { - Ok(f) => f, - Err(err) if err.kind() == std::io::ErrorKind::NotFound => { - let parent = path - .parent() - .context(UnableToCreateFileSnafu { path: &path, err })?; - fs::create_dir_all(&parent) - .await - .context(UnableToCreateDirSnafu { path: parent })?; - - match fs::File::create(&path).await { - Ok(f) => f, - Err(err) => return Err(Error::UnableToCreateFile { path, err }.into()), - } - } - Err(err) => return Err(Error::UnableToCreateFile { path, err }.into()), - }; - - tokio::io::copy(&mut &content[..], &mut file) - .await - .context(UnableToCopyDataToFileSnafu)?; - - Ok(()) - } - - async fn get(&self, location: &Path) -> Result { - let (file, path) = self.get_file(location).await?; - Ok(GetResult::File(file, path)) - } - - async fn head(&self, location: &Path) -> Result { - let (file, _) = self.get_file(location).await?; - let metadata = file - .metadata() - .await - .map_err(|e| Error::UnableToAccessMetadata { - source: e.into(), - path: location.to_string(), - })?; - - convert_metadata(metadata, location.clone()) - } - - async fn delete(&self, location: &Path) -> Result<()> { - let path = self.path_to_filesystem(location); - fs::remove_file(&path) - .await - .context(UnableToDeleteFileSnafu { path })?; - Ok(()) - } - - async fn list<'a>( - &'a self, - prefix: Option<&'a Path>, - ) -> Result>> { - let root_path = match prefix { - Some(prefix) => self.path_to_filesystem(prefix), - None => self.root.to_path_buf(), - }; - - let walkdir = WalkDir::new(&root_path) - // Don't include the root directory itself - .min_depth(1); - - let s = - walkdir.into_iter().flat_map(move |result_dir_entry| { - match convert_walkdir_result(result_dir_entry) { - Err(e) => Some(Err(e)), - Ok(None) => None, - Ok(entry @ Some(_)) => entry - .filter(|dir_entry| dir_entry.file_type().is_file()) - .map(|entry| { - let location = self.filesystem_to_path(entry.path()).unwrap(); - convert_entry(entry, location) - }), - } - }); - - Ok(stream::iter(s).boxed()) - } - - async fn list_with_delimiter(&self, prefix: &Path) -> Result { - let resolved_prefix = self.path_to_filesystem(prefix); - - let walkdir = WalkDir::new(&resolved_prefix).min_depth(1).max_depth(1); - - let mut common_prefixes = BTreeSet::new(); - let mut objects = Vec::new(); - - for entry_res in walkdir.into_iter().map(convert_walkdir_result) { - if let Some(entry) = entry_res? { - let is_directory = entry.file_type().is_dir(); - let entry_location = self.filesystem_to_path(entry.path()).unwrap(); - - let mut parts = match entry_location.prefix_match(prefix) { - Some(parts) => parts, - None => continue, - }; - - let common_prefix = match parts.next() { - Some(p) => p, - None => continue, - }; - - drop(parts); - - if is_directory { - common_prefixes.insert(prefix.child(common_prefix)); - } else { - objects.push(convert_entry(entry, entry_location)?); - } - } - } - - Ok(ListResult { - next_token: None, - common_prefixes: common_prefixes.into_iter().collect(), - objects, - }) - } -} - -fn convert_entry(entry: DirEntry, location: Path) -> Result { - let metadata = entry - .metadata() - .map_err(|e| Error::UnableToAccessMetadata { - source: e.into(), - path: location.to_string(), - })?; - convert_metadata(metadata, location) -} - -fn convert_metadata(metadata: std::fs::Metadata, location: Path) -> Result { - let last_modified = metadata - .modified() - .expect("Modified file time should be supported on this platform") - .into(); - - let size = usize::try_from(metadata.len()).context(FileSizeOverflowedUsizeSnafu { - path: location.to_raw(), - })?; - - Ok(ObjectMeta { - location, - last_modified, - size, - }) -} - -/// Convert walkdir results and converts not-found errors into `None`. -fn convert_walkdir_result( - res: std::result::Result, -) -> Result> { - match res { - Ok(entry) => Ok(Some(entry)), - Err(walkdir_err) => match walkdir_err.io_error() { - Some(io_err) => match io_err.kind() { - io::ErrorKind::NotFound => Ok(None), - _ => Err(Error::UnableToWalkDir { - source: walkdir_err, - } - .into()), - }, - None => Err(Error::UnableToWalkDir { - source: walkdir_err, - } - .into()), - }, - } -} - -impl LocalFileSystem { - /// Create new filesystem storage. - pub fn new(root: impl Into) -> Self { - Self { root: root.into() } - } - - /// Return filesystem path of the given location - fn path_to_filesystem(&self, location: &Path) -> std::path::PathBuf { - let mut path = self.root.clone(); - for component in location.to_raw().split(DELIMITER) { - path.push(component) - } - path.to_path_buf() - } - - fn filesystem_to_path(&self, location: &std::path::Path) -> Option { - let stripped = location.strip_prefix(&self.root).ok()?; - let path = stripped.to_string_lossy(); - let split = path.split(std::path::MAIN_SEPARATOR); - - Some(Path::from_iter(split)) - } - - async fn get_file(&self, location: &Path) -> Result<(fs::File, std::path::PathBuf)> { - let path = self.path_to_filesystem(location); - - let file = fs::File::open(&path).await.map_err(|e| { - if e.kind() == std::io::ErrorKind::NotFound { - Error::NotFound { - path: location.to_string(), - source: e, - } - } else { - Error::UnableToOpenFile { - path: path.clone(), - source: e, - } - } - })?; - Ok((file, path)) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::{ - tests::{ - get_nonexistent_object, list_uses_directories_correctly, list_with_delimiter, - put_get_delete_list, - }, - Error as ObjectStoreError, ObjectStoreApi, - }; - use std::{fs::set_permissions, os::unix::prelude::PermissionsExt}; - use tempfile::TempDir; - - #[tokio::test] - async fn file_test() { - let root = TempDir::new().unwrap(); - let integration = LocalFileSystem::new(root.path()); - - put_get_delete_list(&integration).await.unwrap(); - list_uses_directories_correctly(&integration).await.unwrap(); - list_with_delimiter(&integration).await.unwrap(); - } - - #[tokio::test] - async fn creates_dir_if_not_present() { - let root = TempDir::new().unwrap(); - let integration = LocalFileSystem::new(root.path()); - - let location = Path::from_raw("nested/file/test_file"); - - let data = Bytes::from("arbitrary data"); - let expected_data = data.clone(); - - integration.put(&location, data).await.unwrap(); - - let read_data = integration - .get(&location) - .await - .unwrap() - .bytes() - .await - .unwrap(); - assert_eq!(&*read_data, expected_data); - } - - #[tokio::test] - async fn unknown_length() { - let root = TempDir::new().unwrap(); - let integration = LocalFileSystem::new(root.path()); - - let location = Path::from_raw("some_file"); - - let data = Bytes::from("arbitrary data"); - let expected_data = data.clone(); - - integration.put(&location, data).await.unwrap(); - - let read_data = integration - .get(&location) - .await - .unwrap() - .bytes() - .await - .unwrap(); - assert_eq!(&*read_data, expected_data); - } - - #[tokio::test] - async fn bubble_up_io_errors() { - let root = TempDir::new().unwrap(); - - // make non-readable - let metadata = root.path().metadata().unwrap(); - let mut permissions = metadata.permissions(); - permissions.set_mode(0o000); - set_permissions(root.path(), permissions).unwrap(); - - let store = LocalFileSystem::new(root.path()); - - // `list` must fail - match store.list(None).await { - Err(_) => { - // ok, error found - } - Ok(mut stream) => { - let mut any_err = false; - while let Some(res) = stream.next().await { - if res.is_err() { - any_err = true; - } - } - assert!(any_err); - } - } - - // `list_with_delimiter - assert!(store.list_with_delimiter(&Path::default()).await.is_err()); - } - - const NON_EXISTENT_NAME: &str = "nonexistentname"; - - #[tokio::test] - async fn get_nonexistent_location() { - let root = TempDir::new().unwrap(); - let integration = LocalFileSystem::new(root.path()); - - let location = Path::from_raw(NON_EXISTENT_NAME); - - let err = get_nonexistent_object(&integration, Some(location)) - .await - .unwrap_err(); - if let Some(ObjectStoreError::NotFound { path, source }) = - err.downcast_ref::() - { - let source_variant = source.downcast_ref::(); - assert!( - matches!(source_variant, Some(std::io::Error { .. }),), - "got: {:?}", - source_variant - ); - assert_eq!(path, NON_EXISTENT_NAME); - } else { - panic!("unexpected error type: {:?}", err); - } - } -} diff --git a/object_store/src/gcp.rs b/object_store/src/gcp.rs deleted file mode 100644 index f581b79b14..0000000000 --- a/object_store/src/gcp.rs +++ /dev/null @@ -1,461 +0,0 @@ -//! This module contains the IOx implementation for using Google Cloud Storage -//! as the object store. -use crate::{ - path::{Path, DELIMITER}, - GetResult, ListResult, ObjectMeta, ObjectStoreApi, Result, -}; -use async_trait::async_trait; -use bytes::Bytes; -use cloud_storage::{Client, Object}; -use futures::{stream::BoxStream, StreamExt, TryStreamExt}; -use snafu::{ResultExt, Snafu}; -use std::{convert::TryFrom, env}; - -/// A specialized `Error` for Google Cloud Storage object store-related errors -#[derive(Debug, Snafu)] -#[allow(missing_docs)] -pub enum Error { - #[snafu(display("Expected streamed data to have length {}, got {}", expected, actual))] - DataDoesNotMatchLength { expected: usize, actual: usize }, - - #[snafu(display( - "Unable to PUT data. Bucket: {}, Location: {}, Error: {}", - bucket, - path, - source - ))] - UnableToPutData { - source: cloud_storage::Error, - bucket: String, - path: String, - }, - - #[snafu(display("Unable to list data. Bucket: {}, Error: {}", bucket, source,))] - UnableToListData { - source: cloud_storage::Error, - bucket: String, - }, - - #[snafu(display("Unable to stream list data. Bucket: {}, Error: {}", bucket, source,))] - UnableToStreamListData { - source: cloud_storage::Error, - bucket: String, - }, - - #[snafu(display( - "Unable to DELETE data. Bucket: {}, Location: {}, Error: {}", - bucket, - path, - source, - ))] - UnableToDeleteData { - source: cloud_storage::Error, - bucket: String, - path: String, - }, - - #[snafu(display( - "Unable to GET data. Bucket: {}, Location: {}, Error: {}", - bucket, - path, - source, - ))] - UnableToGetData { - source: cloud_storage::Error, - bucket: String, - path: String, - }, - - #[snafu(display( - "Unable to GET data. Bucket: {}, Location: {}, Error: {}", - bucket, - path, - source, - ))] - UnableToHeadData { - source: cloud_storage::Error, - bucket: String, - path: String, - }, - - NotFound { - path: String, - source: cloud_storage::Error, - }, -} - -impl From for super::Error { - fn from(source: Error) -> Self { - match source { - Error::NotFound { path, source } => Self::NotFound { - path, - source: source.into(), - }, - _ => Self::Generic { - store: "GCS", - source: Box::new(source), - }, - } - } -} - -/// Configuration for connecting to [Google Cloud Storage](https://cloud.google.com/storage/). -#[derive(Debug)] -pub struct GoogleCloudStorage { - client: Client, - bucket_name: String, -} - -impl std::fmt::Display for GoogleCloudStorage { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "GoogleCloudStorage({})", self.bucket_name) - } -} - -#[async_trait] -impl ObjectStoreApi for GoogleCloudStorage { - async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> { - let location = location.to_raw(); - let bucket_name = self.bucket_name.clone(); - - self.client - .object() - .create( - &bucket_name, - bytes.to_vec(), - location, - "application/octet-stream", - ) - .await - .context(UnableToPutDataSnafu { - bucket: &self.bucket_name, - path: location, - })?; - - Ok(()) - } - - async fn get(&self, location: &Path) -> Result { - let location = location.to_raw(); - let bucket_name = self.bucket_name.clone(); - - let bytes = self - .client - .object() - .download(&bucket_name, location) - .await - .map_err(|e| match e { - cloud_storage::Error::Other(ref text) if text.starts_with("No such object") => { - Error::NotFound { - path: location.to_string(), - source: e, - } - } - _ => Error::UnableToGetData { - bucket: bucket_name.clone(), - path: location.to_string(), - source: e, - }, - })?; - - let s = futures::stream::once(async move { Ok(bytes.into()) }).boxed(); - Ok(GetResult::Stream(s)) - } - - async fn head(&self, location: &Path) -> Result { - let object = self - .client - .object() - .read(&self.bucket_name, location.to_raw()) - .await - .map_err(|e| match e { - cloud_storage::Error::Google(ref error) if error.error.code == 404 => { - Error::NotFound { - path: location.to_string(), - source: e, - } - } - _ => Error::UnableToHeadData { - bucket: self.bucket_name.clone(), - path: location.to_string(), - source: e, - }, - })?; - - Ok(convert_object_meta(&object)) - } - - async fn delete(&self, location: &Path) -> Result<()> { - let location = location.to_raw(); - let bucket_name = self.bucket_name.clone(); - - self.client - .object() - .delete(&bucket_name, location) - .await - .context(UnableToDeleteDataSnafu { - bucket: &self.bucket_name, - path: location, - })?; - - Ok(()) - } - - async fn list<'a>( - &'a self, - prefix: Option<&'a Path>, - ) -> Result>> { - let converted_prefix = prefix.map(|p| format!("{}{}", p.to_raw(), DELIMITER)); - let list_request = cloud_storage::ListRequest { - prefix: converted_prefix, - ..Default::default() - }; - let object_lists = self - .client - .object() - .list(&self.bucket_name, list_request) - .await - .context(UnableToListDataSnafu { - bucket: &self.bucket_name, - })?; - - let bucket_name = self.bucket_name.clone(); - let objects = object_lists - .map_ok(move |list| { - futures::stream::iter(list.items.into_iter().map(|o| Ok(convert_object_meta(&o)))) - }) - .map_err(move |source| { - crate::Error::from(Error::UnableToStreamListData { - source, - bucket: bucket_name.clone(), - }) - }); - - Ok(objects.try_flatten().boxed()) - } - - async fn list_with_delimiter(&self, prefix: &Path) -> Result { - let converted_prefix = format!("{}{}", prefix, DELIMITER); - let list_request = cloud_storage::ListRequest { - prefix: Some(converted_prefix), - delimiter: Some(DELIMITER.to_string()), - ..Default::default() - }; - - let mut object_lists = Box::pin( - self.client - .object() - .list(&self.bucket_name, list_request) - .await - .context(UnableToListDataSnafu { - bucket: &self.bucket_name, - })?, - ); - - let result = match object_lists.next().await { - None => ListResult { - objects: vec![], - common_prefixes: vec![], - next_token: None, - }, - Some(list_response) => { - let list_response = list_response.context(UnableToStreamListDataSnafu { - bucket: &self.bucket_name, - })?; - - ListResult { - objects: list_response - .items - .iter() - .map(convert_object_meta) - .collect(), - common_prefixes: list_response.prefixes.iter().map(Path::from_raw).collect(), - next_token: list_response.next_page_token, - } - } - }; - - Ok(result) - } -} - -fn convert_object_meta(object: &Object) -> ObjectMeta { - let location = Path::from_raw(&object.name); - let last_modified = object.updated; - let size = usize::try_from(object.size).expect("unsupported size on this platform"); - - ObjectMeta { - location, - last_modified, - size, - } -} - -/// Configure a connection to Google Cloud Storage. -pub fn new_gcs( - service_account_path: impl AsRef, - bucket_name: impl Into, -) -> Result { - // The cloud storage crate currently only supports authentication via - // environment variables. Set the environment variable explicitly so - // that we can optionally accept command line arguments instead. - env::set_var("SERVICE_ACCOUNT", service_account_path); - Ok(GoogleCloudStorage { - client: Default::default(), - bucket_name: bucket_name.into(), - }) -} - -#[cfg(test)] -mod test { - use super::*; - use crate::{ - tests::{ - get_nonexistent_object, list_uses_directories_correctly, list_with_delimiter, - put_get_delete_list, - }, - Error as ObjectStoreError, ObjectStoreApi, - }; - use bytes::Bytes; - use std::env; - - const NON_EXISTENT_NAME: &str = "nonexistentname"; - - #[derive(Debug)] - struct GoogleCloudConfig { - bucket: String, - service_account: String, - } - - // Helper macro to skip tests if TEST_INTEGRATION and the GCP environment variables are not set. - macro_rules! maybe_skip_integration { - () => {{ - dotenv::dotenv().ok(); - - let required_vars = ["OBJECT_STORE_BUCKET", "GOOGLE_SERVICE_ACCOUNT"]; - let unset_vars: Vec<_> = required_vars - .iter() - .filter_map(|&name| match env::var(name) { - Ok(_) => None, - Err(_) => Some(name), - }) - .collect(); - let unset_var_names = unset_vars.join(", "); - - let force = std::env::var("TEST_INTEGRATION"); - - if force.is_ok() && !unset_var_names.is_empty() { - panic!( - "TEST_INTEGRATION is set, \ - but variable(s) {} need to be set", - unset_var_names - ) - } else if force.is_err() { - eprintln!( - "skipping Google Cloud integration test - set {}TEST_INTEGRATION to run", - if unset_var_names.is_empty() { - String::new() - } else { - format!("{} and ", unset_var_names) - } - ); - return; - } else { - GoogleCloudConfig { - bucket: env::var("OBJECT_STORE_BUCKET") - .expect("already checked OBJECT_STORE_BUCKET"), - service_account: env::var("GOOGLE_SERVICE_ACCOUNT") - .expect("already checked GOOGLE_SERVICE_ACCOUNT"), - } - } - }}; - } - - #[tokio::test] - async fn gcs_test() { - let config = maybe_skip_integration!(); - let integration = new_gcs(config.service_account, config.bucket).unwrap(); - - put_get_delete_list(&integration).await.unwrap(); - list_uses_directories_correctly(&integration).await.unwrap(); - list_with_delimiter(&integration).await.unwrap(); - } - - #[tokio::test] - async fn gcs_test_get_nonexistent_location() { - let config = maybe_skip_integration!(); - let integration = new_gcs(config.service_account, &config.bucket).unwrap(); - - let location = Path::from_iter([NON_EXISTENT_NAME]); - - let err = integration.get(&location).await.unwrap_err(); - - if let ObjectStoreError::NotFound { path, source } = err { - let source_variant = source.downcast_ref::(); - assert!( - matches!(source_variant, Some(cloud_storage::Error::Other(_))), - "got: {:?}", - source_variant - ); - assert_eq!(path, NON_EXISTENT_NAME); - } else { - panic!("unexpected error type: {:?}", err) - } - } - - #[tokio::test] - async fn gcs_test_get_nonexistent_bucket() { - let mut config = maybe_skip_integration!(); - config.bucket = NON_EXISTENT_NAME.into(); - let integration = new_gcs(config.service_account, &config.bucket).unwrap(); - - let location = Path::from_iter([NON_EXISTENT_NAME]); - - let err = get_nonexistent_object(&integration, Some(location)) - .await - .unwrap_err() - .to_string(); - - assert!(err.contains("Unable to GET data"), "{}", err) - } - - #[tokio::test] - async fn gcs_test_delete_nonexistent_location() { - let config = maybe_skip_integration!(); - let integration = new_gcs(config.service_account, &config.bucket).unwrap(); - - let location = Path::from_iter([NON_EXISTENT_NAME]); - - let err = integration.delete(&location).await.unwrap_err().to_string(); - assert!(err.contains("Unable to DELETE data"), "{}", err) - } - - #[tokio::test] - async fn gcs_test_delete_nonexistent_bucket() { - let mut config = maybe_skip_integration!(); - config.bucket = NON_EXISTENT_NAME.into(); - let integration = new_gcs(config.service_account, &config.bucket).unwrap(); - - let location = Path::from_iter([NON_EXISTENT_NAME]); - - let err = integration.delete(&location).await.unwrap_err().to_string(); - assert!(err.contains("Unable to DELETE data"), "{}", err) - } - - #[tokio::test] - async fn gcs_test_put_nonexistent_bucket() { - let mut config = maybe_skip_integration!(); - config.bucket = NON_EXISTENT_NAME.into(); - let integration = new_gcs(config.service_account, &config.bucket).unwrap(); - - let location = Path::from_iter([NON_EXISTENT_NAME]); - let data = Bytes::from("arbitrary data"); - - let err = integration - .put(&location, data) - .await - .unwrap_err() - .to_string(); - assert!(err.contains("Unable to PUT data"), "{}", err) - } -} diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs deleted file mode 100644 index 3171213b9a..0000000000 --- a/object_store/src/lib.rs +++ /dev/null @@ -1,406 +0,0 @@ -#![deny(rustdoc::broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)] -#![warn( - missing_copy_implementations, - missing_debug_implementations, - missing_docs, - clippy::explicit_iter_loop, - clippy::future_not_send, - clippy::use_self, - clippy::clone_on_ref_ptr -)] - -//! # object_store -//! -//! This crate provides APIs for interacting with object storage services. It -//! currently supports PUT, GET, DELETE, and list for Google Cloud Storage, -//! Amazon S3, in-memory and local file storage. -//! -//! Future compatibility will include Azure Blob Storage, Minio, and Ceph. - -#[cfg(feature = "aws")] -pub mod aws; -#[cfg(feature = "azure")] -pub mod azure; -pub mod disk; -#[cfg(feature = "gcp")] -pub mod gcp; -pub mod memory; -pub mod path; -pub mod throttle; - -use crate::path::Path; -use async_trait::async_trait; -use bytes::Bytes; -use chrono::{DateTime, Utc}; -use futures::{stream::BoxStream, StreamExt, TryStreamExt}; -use snafu::Snafu; -use std::fmt::{Debug, Formatter}; - -/// An alias for a dynamically dispatched object store implementation. -pub type DynObjectStore = dyn ObjectStoreApi; - -/// Universal API to multiple object store services. -#[async_trait] -pub trait ObjectStoreApi: std::fmt::Display + Send + Sync + Debug + 'static { - /// Save the provided bytes to the specified location. - async fn put(&self, location: &Path, bytes: Bytes) -> Result<()>; - - /// Return the bytes that are stored at the specified location. - async fn get(&self, location: &Path) -> Result; - - /// Return the metadata for the specified location - async fn head(&self, location: &Path) -> Result; - - /// Delete the object at the specified location. - async fn delete(&self, location: &Path) -> Result<()>; - - /// List all the objects with the given prefix. - /// - /// Prefixes are evaluated on a path segment basis, i.e. `foo/bar/` is a prefix of `foo/bar/x` but not of - /// `foo/bar_baz/x`. - async fn list<'a>( - &'a self, - prefix: Option<&'a Path>, - ) -> Result>>; - - /// List objects with the given prefix and an implementation specific - /// delimiter. Returns common prefixes (directories) in addition to object - /// metadata. - /// - /// Prefixes are evaluated on a path segment basis, i.e. `foo/bar/` is a prefix of `foo/bar/x` but not of - /// `foo/bar_baz/x`. - async fn list_with_delimiter(&self, prefix: &Path) -> Result; -} - -/// Result of a list call that includes objects, prefixes (directories) and a -/// token for the next set of results. Individual result sets may be limited to -/// 1,000 objects based on the underlying object storage's limitations. -#[derive(Debug)] -pub struct ListResult { - /// Token passed to the API for the next page of list results. - pub next_token: Option, - /// Prefixes that are common (like directories) - pub common_prefixes: Vec, - /// Object metadata for the listing - pub objects: Vec, -} - -/// The metadata that describes an object. -#[derive(Debug)] -pub struct ObjectMeta { - /// The full path to the object - pub location: Path, - /// The last modified time - pub last_modified: DateTime, - /// The size in bytes of the object - pub size: usize, -} - -/// Result for a get request -pub enum GetResult { - /// A file - File(tokio::fs::File, std::path::PathBuf), - /// An asynchronous stream - Stream(BoxStream<'static, Result>), -} - -impl Debug for GetResult { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - match self { - GetResult::File(_, _) => write!(f, "GetResult(File)"), - GetResult::Stream(_) => write!(f, "GetResult(Stream)"), - } - } -} - -impl GetResult { - /// Collects the data into a [`Vec`] - pub async fn bytes(self) -> Result> { - let mut stream = self.into_stream(); - let mut bytes = Vec::new(); - - while let Some(next) = stream.next().await { - bytes.extend_from_slice(next?.as_ref()) - } - - Ok(bytes) - } - - /// Converts this into a byte stream - pub fn into_stream(self) -> BoxStream<'static, Result> { - match self { - Self::File(file, path) => { - tokio_util::codec::FramedRead::new(file, tokio_util::codec::BytesCodec::new()) - .map_ok(|b| b.freeze()) - .map_err(move |source| { - disk::Error::UnableToReadBytes { - source, - path: path.clone(), - } - .into() - }) - .boxed() - } - Self::Stream(s) => s, - } - } -} - -/// A specialized `Result` for object store-related errors -pub type Result = std::result::Result; - -/// A specialized `Error` for object store-related errors -#[derive(Debug, Snafu)] -#[allow(missing_docs)] -pub enum Error { - #[snafu(display("Generic {} error: {}", store, source))] - Generic { - store: &'static str, - source: Box, - }, - - #[snafu(display("Object at location {} not found: {}", path, source))] - NotFound { - path: String, - source: Box, - }, -} - -/// Convenience functions for object stores that are only appropriate to use in tests. Not marked -/// with cfg(test) to make this accessible to other crates. -#[async_trait] -pub trait ObjectStoreTestConvenience { - /// A convenience function for getting all results from a list operation without a prefix. Only - /// appropriate for tests because production code should handle the stream of potentially a - /// large number of returned paths. - async fn list_all(&self) -> Result>; -} - -#[async_trait] -impl ObjectStoreTestConvenience for dyn ObjectStoreApi { - async fn list_all(&self) -> Result> { - self.list(None).await?.try_collect().await - } -} - -#[cfg(test)] -mod tests { - use super::*; - - type Error = Box; - type Result = std::result::Result; - - async fn flatten_list_stream( - storage: &DynObjectStore, - prefix: Option<&Path>, - ) -> super::Result> { - storage - .list(prefix) - .await? - .map_ok(|meta| meta.location) - .try_collect::>() - .await - } - - pub(crate) async fn put_get_delete_list(storage: &DynObjectStore) -> Result<()> { - delete_fixtures(storage).await; - - let content_list = flatten_list_stream(storage, None).await?; - assert!( - content_list.is_empty(), - "Expected list to be empty; found: {:?}", - content_list - ); - - let location = Path::from_raw("test_dir/test_file.json"); - - let data = Bytes::from("arbitrary data"); - let expected_data = data.clone(); - storage.put(&location, data).await?; - - // List everything - let content_list = flatten_list_stream(storage, None).await?; - assert_eq!(content_list, &[location.clone()]); - - // List everything starting with a prefix that should return results - let prefix = Path::from_raw("test_dir"); - let content_list = flatten_list_stream(storage, Some(&prefix)).await?; - assert_eq!(content_list, &[location.clone()]); - - // List everything starting with a prefix that shouldn't return results - let prefix = Path::from_raw("something"); - let content_list = flatten_list_stream(storage, Some(&prefix)).await?; - assert!(content_list.is_empty()); - - let read_data = storage.get(&location).await?.bytes().await?; - assert_eq!(&*read_data, expected_data); - - let head = storage.head(&location).await?; - assert_eq!(head.size, expected_data.len()); - - storage.delete(&location).await?; - - let content_list = flatten_list_stream(storage, None).await?; - assert!(content_list.is_empty()); - - // Azure doesn't report semantic errors - let is_azure = storage.to_string().starts_with("MicrosoftAzure"); - - let err = storage.get(&location).await.unwrap_err(); - assert!( - matches!(err, crate::Error::NotFound { .. }) || is_azure, - "{}", - err - ); - - let err = storage.head(&location).await.unwrap_err(); - assert!( - matches!(err, crate::Error::NotFound { .. }) || is_azure, - "{}", - err - ); - - Ok(()) - } - - pub(crate) async fn list_uses_directories_correctly(storage: &DynObjectStore) -> Result<()> { - delete_fixtures(storage).await; - - let content_list = flatten_list_stream(storage, None).await?; - assert!( - content_list.is_empty(), - "Expected list to be empty; found: {:?}", - content_list - ); - - let location1 = Path::from_raw("foo/x.json"); - let location2 = Path::from_raw("foo.bar/y.json"); - - let data = Bytes::from("arbitrary data"); - storage.put(&location1, data.clone()).await?; - storage.put(&location2, data).await?; - - let prefix = Path::from_raw("foo"); - let content_list = flatten_list_stream(storage, Some(&prefix)).await?; - assert_eq!(content_list, &[location1.clone()]); - - let prefix = Path::from_raw("foo/x"); - let content_list = flatten_list_stream(storage, Some(&prefix)).await?; - assert_eq!(content_list, &[]); - - Ok(()) - } - - pub(crate) async fn list_with_delimiter(storage: &DynObjectStore) -> Result<()> { - delete_fixtures(storage).await; - - // ==================== check: store is empty ==================== - let content_list = flatten_list_stream(storage, None).await?; - assert!(content_list.is_empty()); - - // ==================== do: create files ==================== - let data = Bytes::from("arbitrary data"); - - let files: Vec<_> = [ - "test_file", - "mydb/wb/000/000/000.segment", - "mydb/wb/000/000/001.segment", - "mydb/wb/000/000/002.segment", - "mydb/wb/001/001/000.segment", - "mydb/wb/foo.json", - "mydb/wbwbwb/111/222/333.segment", - "mydb/data/whatevs", - ] - .iter() - .map(|&s| Path::from_raw(s)) - .collect(); - - for f in &files { - let data = data.clone(); - storage.put(f, data).await.unwrap(); - } - - // ==================== check: prefix-list `mydb/wb` (directory) ==================== - let prefix = Path::from_raw("mydb/wb"); - - let expected_000 = Path::from_raw("mydb/wb/000"); - let expected_001 = Path::from_raw("mydb/wb/001"); - let expected_location = Path::from_raw("mydb/wb/foo.json"); - - let result = storage.list_with_delimiter(&prefix).await.unwrap(); - - assert_eq!(result.common_prefixes, vec![expected_000, expected_001]); - assert_eq!(result.objects.len(), 1); - - let object = &result.objects[0]; - - assert_eq!(object.location, expected_location); - assert_eq!(object.size, data.len()); - - // ==================== check: prefix-list `mydb/wb/000/000/001` (partial filename doesn't match) ==================== - let prefix = Path::from_raw("mydb/wb/000/000/001"); - - let result = storage.list_with_delimiter(&prefix).await.unwrap(); - assert!(result.common_prefixes.is_empty()); - assert_eq!(result.objects.len(), 0); - - // ==================== check: prefix-list `not_there` (non-existing prefix) ==================== - let prefix = Path::from_raw("not_there"); - - let result = storage.list_with_delimiter(&prefix).await.unwrap(); - assert!(result.common_prefixes.is_empty()); - assert!(result.objects.is_empty()); - - // ==================== do: remove all files ==================== - for f in &files { - storage.delete(f).await.unwrap(); - } - - // ==================== check: store is empty ==================== - let content_list = flatten_list_stream(storage, None).await?; - assert!(content_list.is_empty()); - - Ok(()) - } - - pub(crate) async fn get_nonexistent_object( - storage: &DynObjectStore, - location: Option, - ) -> Result> { - let location = location.unwrap_or_else(|| Path::from_raw("this_file_should_not_exist")); - - let err = storage.head(&location).await.unwrap_err(); - assert!(matches!(err, crate::Error::NotFound { .. })); - - Ok(storage.get(&location).await?.bytes().await?) - } - - async fn delete_fixtures(storage: &DynObjectStore) { - let files: Vec<_> = [ - "test_file", - "test_dir/test_file.json", - "mydb/wb/000/000/000.segment", - "mydb/wb/000/000/001.segment", - "mydb/wb/000/000/002.segment", - "mydb/wb/001/001/000.segment", - "mydb/wb/foo.json", - "mydb/data/whatevs", - "mydb/wbwbwb/111/222/333.segment", - "foo/x.json", - "foo.bar/y.json", - ] - .iter() - .map(|&s| Path::from_raw(s)) - .collect(); - - for f in &files { - // don't care if it errors, should fail elsewhere - let _ = storage.delete(f).await; - } - } - - // Tests TODO: - // GET nonexisting location (in_memory/file) - // DELETE nonexisting location - // PUT overwriting -} diff --git a/object_store/src/memory.rs b/object_store/src/memory.rs deleted file mode 100644 index 3c1adf5903..0000000000 --- a/object_store/src/memory.rs +++ /dev/null @@ -1,235 +0,0 @@ -//! This module contains the IOx implementation for using memory as the object -//! store. -use crate::{path::Path, GetResult, ListResult, ObjectMeta, ObjectStoreApi, Result}; -use async_trait::async_trait; -use bytes::Bytes; -use chrono::Utc; -use futures::{stream::BoxStream, StreamExt}; -use snafu::{OptionExt, Snafu}; -use std::collections::BTreeMap; -use std::collections::BTreeSet; -use tokio::sync::RwLock; - -/// A specialized `Error` for in-memory object store-related errors -#[derive(Debug, Snafu)] -#[allow(missing_docs)] -enum Error { - #[snafu(display("No data in memory found. Location: {path}"))] - NoDataInMemory { path: String }, -} - -impl From for super::Error { - fn from(source: Error) -> Self { - match source { - Error::NoDataInMemory { ref path } => Self::NotFound { - path: path.into(), - source: source.into(), - }, - // currently "not found" is the only error that can happen with the in-memory store - } - } -} - -/// In-memory storage suitable for testing or for opting out of using a cloud -/// storage provider. -#[derive(Debug, Default)] -pub struct InMemory { - storage: RwLock>, -} - -impl std::fmt::Display for InMemory { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "InMemory") - } -} - -#[async_trait] -impl ObjectStoreApi for InMemory { - async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> { - self.storage.write().await.insert(location.clone(), bytes); - Ok(()) - } - - async fn get(&self, location: &Path) -> Result { - let data = self.get_bytes(location).await?; - - Ok(GetResult::Stream( - futures::stream::once(async move { Ok(data) }).boxed(), - )) - } - - async fn head(&self, location: &Path) -> Result { - let last_modified = Utc::now(); - let bytes = self.get_bytes(location).await?; - Ok(ObjectMeta { - location: location.clone(), - last_modified, - size: bytes.len(), - }) - } - - async fn delete(&self, location: &Path) -> Result<()> { - self.storage.write().await.remove(location); - Ok(()) - } - - async fn list<'a>( - &'a self, - prefix: Option<&'a Path>, - ) -> Result>> { - let last_modified = Utc::now(); - - let storage = self.storage.read().await; - let values: Vec<_> = storage - .iter() - .filter(move |(key, _)| prefix.map(|p| key.prefix_matches(p)).unwrap_or(true)) - .map(move |(key, value)| { - Ok(ObjectMeta { - location: key.clone(), - last_modified, - size: value.len(), - }) - }) - .collect(); - - Ok(futures::stream::iter(values).boxed()) - } - - /// The memory implementation returns all results, as opposed to the cloud - /// versions which limit their results to 1k or more because of API - /// limitations. - async fn list_with_delimiter(&self, prefix: &Path) -> Result { - let mut common_prefixes = BTreeSet::new(); - let last_modified = Utc::now(); - - // Only objects in this base level should be returned in the - // response. Otherwise, we just collect the common prefixes. - let mut objects = vec![]; - for (k, v) in self.storage.read().await.range((prefix)..) { - let mut parts = match k.prefix_match(prefix) { - Some(parts) => parts, - None => break, - }; - - // Pop first element - let common_prefix = match parts.next() { - Some(p) => p, - None => continue, - }; - - if parts.next().is_some() { - common_prefixes.insert(prefix.child(common_prefix)); - } else { - let object = ObjectMeta { - location: k.clone(), - last_modified, - size: v.len(), - }; - objects.push(object); - } - } - - Ok(ListResult { - objects, - common_prefixes: common_prefixes.into_iter().collect(), - next_token: None, - }) - } -} - -impl InMemory { - /// Create new in-memory storage. - pub fn new() -> Self { - Self::default() - } - - /// Creates a clone of the store - pub async fn clone(&self) -> Self { - let storage = self.storage.read().await; - let storage = storage.clone(); - - Self { - storage: RwLock::new(storage), - } - } - - async fn get_bytes(&self, location: &Path) -> Result { - let storage = self.storage.read().await; - let bytes = storage - .get(location) - .cloned() - .context(NoDataInMemorySnafu { - path: location.to_string(), - })?; - Ok(bytes) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - use crate::{ - tests::{ - get_nonexistent_object, list_uses_directories_correctly, list_with_delimiter, - put_get_delete_list, - }, - Error as ObjectStoreError, ObjectStoreApi, - }; - - #[tokio::test] - async fn in_memory_test() { - let integration = InMemory::new(); - - put_get_delete_list(&integration).await.unwrap(); - list_uses_directories_correctly(&integration).await.unwrap(); - list_with_delimiter(&integration).await.unwrap(); - } - - #[tokio::test] - async fn unknown_length() { - let integration = InMemory::new(); - - let location = Path::from_raw("some_file"); - - let data = Bytes::from("arbitrary data"); - let expected_data = data.clone(); - - integration.put(&location, data).await.unwrap(); - - let read_data = integration - .get(&location) - .await - .unwrap() - .bytes() - .await - .unwrap(); - assert_eq!(&*read_data, expected_data); - } - - const NON_EXISTENT_NAME: &str = "nonexistentname"; - - #[tokio::test] - async fn nonexistent_location() { - let integration = InMemory::new(); - - let location = Path::from_raw(NON_EXISTENT_NAME); - - let err = get_nonexistent_object(&integration, Some(location)) - .await - .unwrap_err(); - if let Some(ObjectStoreError::NotFound { path, source }) = - err.downcast_ref::() - { - let source_variant = source.downcast_ref::(); - assert!( - matches!(source_variant, Some(Error::NoDataInMemory { .. }),), - "got: {:?}", - source_variant - ); - assert_eq!(path, NON_EXISTENT_NAME); - } else { - panic!("unexpected error type: {:?}", err); - } - } -} diff --git a/object_store/src/path/mod.rs b/object_store/src/path/mod.rs deleted file mode 100644 index d25ac81839..0000000000 --- a/object_store/src/path/mod.rs +++ /dev/null @@ -1,297 +0,0 @@ -//! Path abstraction for Object Storage - -use itertools::Itertools; -use std::fmt::Formatter; - -/// The delimiter to separate object namespaces, creating a directory structure. -pub const DELIMITER: &str = "/"; - -mod parts; - -pub(crate) use parts::PathPart; - -/// An object storage location suitable for passing to cloud storage APIs such -/// as AWS, GCS, and Azure. -#[derive(Debug, Clone, Default, PartialEq, Eq, Hash, Ord, PartialOrd)] -pub struct Path { - /// The raw path with no leading or trailing delimiters - raw: String, -} - -impl Path { - /// Create a new [`Path`] from a string - pub fn from_raw(path: impl AsRef) -> Self { - let path = path.as_ref(); - Self::from_iter(path.split(DELIMITER)) - } - - /// Returns the raw encoded path in object storage - pub fn to_raw(&self) -> &str { - &self.raw - } - - /// Returns the [`PathPart`] of this [`Path`] - pub fn parts(&self) -> impl Iterator> { - match self.raw.is_empty() { - true => itertools::Either::Left(std::iter::empty()), - false => itertools::Either::Right( - self.raw - .split(DELIMITER) - .map(|s| PathPart { raw: s.into() }), - ), - } - } - - pub(crate) fn prefix_match( - &self, - prefix: &Self, - ) -> Option> + '_> { - let diff = itertools::diff_with( - self.raw.split(DELIMITER), - prefix.raw.split(DELIMITER), - |a, b| a == b, - ); - - match diff { - // Both were equal - None => Some(itertools::Either::Left(std::iter::empty())), - // Mismatch or prefix was longer => None - Some(itertools::Diff::FirstMismatch(_, _, _) | itertools::Diff::Longer(_, _)) => None, - // Match with remaining - Some(itertools::Diff::Shorter(_, back)) => Some(itertools::Either::Right( - back.map(|s| PathPart { raw: s.into() }), - )), - } - } - - pub(crate) fn prefix_matches(&self, prefix: &Self) -> bool { - self.prefix_match(prefix).is_some() - } - - /// Creates a new child of this [`Path`] - pub fn child<'a>(&self, child: impl Into>) -> Self { - let raw = match self.raw.is_empty() { - true => format!("{}", child.into().raw), - false => format!("{}{}{}", self.raw, DELIMITER, child.into().raw), - }; - - Self { raw } - } -} - -impl std::fmt::Display for Path { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - self.raw.fmt(f) - } -} - -impl<'a, I> FromIterator for Path -where - I: Into>, -{ - fn from_iter>(iter: T) -> Self { - let raw = T::into_iter(iter) - .map(|s| s.into()) - .filter(|s| !s.raw.is_empty()) - .map(|s| s.raw) - .join(DELIMITER); - - Self { raw } - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn cloud_prefix_with_trailing_delimiter() { - // Use case: files exist in object storage named `foo/bar.json` and - // `foo_test.json`. A search for the prefix `foo/` should return - // `foo/bar.json` but not `foo_test.json'. - let prefix = Path::from_iter(["test"]); - - let converted = prefix.to_raw(); - assert_eq!(converted, "test"); - } - - #[test] - fn push_encodes() { - let location = Path::from_iter(["foo/bar", "baz%2Ftest"]); - let converted = location.to_raw(); - assert_eq!(converted, "foo%2Fbar/baz%252Ftest"); - } - - #[test] - fn convert_raw_before_partial_eq() { - // dir and file_name - let cloud = Path::from_raw("test_dir/test_file.json"); - let built = Path::from_iter(["test_dir", "test_file.json"]); - - assert_eq!(built, cloud); - - // dir and file_name w/o dot - let cloud = Path::from_raw("test_dir/test_file"); - let built = Path::from_iter(["test_dir", "test_file"]); - - assert_eq!(built, cloud); - - // dir, no file - let cloud = Path::from_raw("test_dir/"); - let built = Path::from_iter(["test_dir"]); - assert_eq!(built, cloud); - - // file_name, no dir - let cloud = Path::from_raw("test_file.json"); - let built = Path::from_iter(["test_file.json"]); - assert_eq!(built, cloud); - - // empty - let cloud = Path::from_raw(""); - let built = Path::from_iter(["", ""]); - - assert_eq!(built, cloud); - } - - #[test] - fn parts_after_prefix_behavior() { - let existing_path = Path::from_raw("apple/bear/cow/dog/egg.json"); - - // Prefix with one directory - let prefix = Path::from_raw("apple"); - let expected_parts: Vec> = vec!["bear", "cow", "dog", "egg.json"] - .into_iter() - .map(Into::into) - .collect(); - let parts: Vec<_> = existing_path.prefix_match(&prefix).unwrap().collect(); - assert_eq!(parts, expected_parts); - - // Prefix with two directories - let prefix = Path::from_raw("apple/bear"); - let expected_parts: Vec> = vec!["cow", "dog", "egg.json"] - .into_iter() - .map(Into::into) - .collect(); - let parts: Vec<_> = existing_path.prefix_match(&prefix).unwrap().collect(); - assert_eq!(parts, expected_parts); - - // Not a prefix - let prefix = Path::from_raw("cow"); - assert!(existing_path.prefix_match(&prefix).is_none()); - - // Prefix with a partial directory - let prefix = Path::from_raw("ap"); - assert!(existing_path.prefix_match(&prefix).is_none()); - - // Prefix matches but there aren't any parts after it - let existing_path = Path::from_raw("apple/bear/cow/dog"); - - let prefix = existing_path.clone(); - assert_eq!(existing_path.prefix_match(&prefix).unwrap().count(), 0); - } - - #[test] - fn prefix_matches() { - let haystack = Path::from_iter(["foo/bar", "baz%2Ftest", "something"]); - let needle = haystack.clone(); - // self starts with self - assert!( - haystack.prefix_matches(&haystack), - "{:?} should have started with {:?}", - haystack, - haystack - ); - - // a longer prefix doesn't match - let needle = needle.child("longer now"); - assert!( - !haystack.prefix_matches(&needle), - "{:?} shouldn't have started with {:?}", - haystack, - needle - ); - - // one dir prefix matches - let needle = Path::from_iter(["foo/bar"]); - assert!( - haystack.prefix_matches(&needle), - "{:?} should have started with {:?}", - haystack, - needle - ); - - // two dir prefix matches - let needle = needle.child("baz%2Ftest"); - assert!( - haystack.prefix_matches(&needle), - "{:?} should have started with {:?}", - haystack, - needle - ); - - // partial dir prefix doesn't match - let needle = Path::from_iter(["f"]); - assert!( - !haystack.prefix_matches(&needle), - "{:?} should not have started with {:?}", - haystack, - needle - ); - - // one dir and one partial dir doesn't match - let needle = Path::from_iter(["foo/bar", "baz"]); - assert!( - !haystack.prefix_matches(&needle), - "{:?} should not have started with {:?}", - haystack, - needle - ); - } - - #[test] - fn prefix_matches_with_file_name() { - let haystack = Path::from_iter(["foo/bar", "baz%2Ftest", "something", "foo.segment"]); - - // All directories match and file name is a prefix - let needle = Path::from_iter(["foo/bar", "baz%2Ftest", "something", "foo"]); - - assert!( - !haystack.prefix_matches(&needle), - "{:?} should not have started with {:?}", - haystack, - needle - ); - - // All directories match but file name is not a prefix - let needle = Path::from_iter(["foo/bar", "baz%2Ftest", "something", "e"]); - - assert!( - !haystack.prefix_matches(&needle), - "{:?} should not have started with {:?}", - haystack, - needle - ); - - // Not all directories match; file name is a prefix of the next directory; this - // does not match - let needle = Path::from_iter(["foo/bar", "baz%2Ftest", "s"]); - - assert!( - !haystack.prefix_matches(&needle), - "{:?} should not have started with {:?}", - haystack, - needle - ); - - // Not all directories match; file name is NOT a prefix of the next directory; - // no match - let needle = Path::from_iter(["foo/bar", "baz%2Ftest", "p"]); - - assert!( - !haystack.prefix_matches(&needle), - "{:?} should not have started with {:?}", - haystack, - needle - ); - } -} diff --git a/object_store/src/path/parts.rs b/object_store/src/path/parts.rs deleted file mode 100644 index 7da35ca352..0000000000 --- a/object_store/src/path/parts.rs +++ /dev/null @@ -1,108 +0,0 @@ -use percent_encoding::{percent_decode_str, percent_encode, AsciiSet, CONTROLS}; -use std::borrow::Cow; - -use super::DELIMITER; - -// percent_encode's API needs this as a byte -const DELIMITER_BYTE: u8 = DELIMITER.as_bytes()[0]; - -/// The PathPart type exists to validate the directory/file names that form part -/// of a path. -/// -/// A PathPart instance is guaranteed to be non-empty and to contain no `/` -/// characters as it can only be constructed by going through the `from` impl. -#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Default, Hash)] -pub struct PathPart<'a> { - pub(super) raw: Cow<'a, str>, -} - -/// Characters we want to encode. -const INVALID: &AsciiSet = &CONTROLS - // The delimiter we are reserving for internal hierarchy - .add(DELIMITER_BYTE) - // Characters AWS recommends avoiding for object keys - // https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingMetadata.html - .add(b'\\') - .add(b'{') - // TODO: Non-printable ASCII characters (128–255 decimal characters) - .add(b'^') - .add(b'}') - .add(b'%') - .add(b'`') - .add(b']') - .add(b'"') // " <-- my editor is confused about double quotes within single quotes - .add(b'>') - .add(b'[') - .add(b'~') - .add(b'<') - .add(b'#') - .add(b'|') - // Characters Google Cloud Storage recommends avoiding for object names - // https://cloud.google.com/storage/docs/naming-objects - .add(b'\r') - .add(b'\n') - .add(b'*') - .add(b'?'); - -impl<'a> From<&'a str> for PathPart<'a> { - fn from(v: &'a str) -> Self { - let inner = match v { - // We don't want to encode `.` generally, but we do want to disallow parts of paths - // to be equal to `.` or `..` to prevent file system traversal shenanigans. - "." => "%2E".into(), - ".." => "%2E%2E".into(), - other => percent_encode(other.as_bytes(), INVALID).into(), - }; - Self { raw: inner } - } -} - -impl From for PathPart<'static> { - fn from(s: String) -> Self { - Self { - raw: Cow::Owned(PathPart::from(s.as_ref()).raw.into_owned()), - } - } -} - -impl<'a> std::fmt::Display for PathPart<'a> { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - percent_decode_str(self.raw.as_ref()) - .decode_utf8() - .expect("Valid UTF-8 that came from String") - .fmt(f) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn path_part_delimiter_gets_encoded() { - let part: PathPart<'_> = "foo/bar".into(); - assert_eq!(part.raw, "foo%2Fbar"); - assert_eq!(part.to_string(), "foo/bar") - } - - #[test] - fn path_part_given_already_encoded_string() { - let part: PathPart<'_> = "foo%2Fbar".into(); - assert_eq!(part.raw, "foo%252Fbar"); - assert_eq!(part.to_string(), "foo%2Fbar"); - } - - #[test] - fn path_part_cant_be_one_dot() { - let part: PathPart<'_> = ".".into(); - assert_eq!(part.raw, "%2E"); - assert_eq!(part.to_string(), "."); - } - - #[test] - fn path_part_cant_be_two_dots() { - let part: PathPart<'_> = "..".into(); - assert_eq!(part.raw, "%2E%2E"); - assert_eq!(part.to_string(), ".."); - } -} diff --git a/object_store/src/throttle.rs b/object_store/src/throttle.rs deleted file mode 100644 index feddd6ea58..0000000000 --- a/object_store/src/throttle.rs +++ /dev/null @@ -1,467 +0,0 @@ -//! This module contains the IOx implementation for wrapping existing object store types into an artificial "sleep" wrapper. -use std::{ - convert::TryInto, - sync::{Arc, Mutex}, -}; - -use crate::{GetResult, ListResult, ObjectMeta, ObjectStoreApi, Path, Result}; -use async_trait::async_trait; -use bytes::Bytes; -use futures::{stream::BoxStream, StreamExt}; -use tokio::time::{sleep, Duration}; - -/// Configuration settings for throttled store -#[derive(Debug, Default, Clone, Copy)] -pub struct ThrottleConfig { - /// Sleep duration for every call to [`delete`](ThrottledStore::delete). - /// - /// Sleeping is done before the underlying store is called and independently of the success of - /// the operation. - pub wait_delete_per_call: Duration, - - /// Sleep duration for every byte received during [`get`](ThrottledStore::get). - /// - /// Sleeping is performed after the underlying store returned and only for successful gets. The - /// sleep duration is additive to [`wait_get_per_call`](Self::wait_get_per_call). - /// - /// Note that the per-byte sleep only happens as the user consumes the output bytes. Should - /// there be an intermediate failure (i.e. after partly consuming the output bytes), the - /// resulting sleep time will be partial as well. - pub wait_get_per_byte: Duration, - - /// Sleep duration for every call to [`get`](ThrottledStore::get). - /// - /// Sleeping is done before the underlying store is called and independently of the success of - /// the operation. The sleep duration is additive to - /// [`wait_get_per_byte`](Self::wait_get_per_byte). - pub wait_get_per_call: Duration, - - /// Sleep duration for every call to [`list`](ThrottledStore::list). - /// - /// Sleeping is done before the underlying store is called and independently of the success of - /// the operation. The sleep duration is additive to - /// [`wait_list_per_entry`](Self::wait_list_per_entry). - pub wait_list_per_call: Duration, - - /// Sleep duration for every entry received during [`list`](ThrottledStore::list). - /// - /// Sleeping is performed after the underlying store returned and only for successful lists. - /// The sleep duration is additive to [`wait_list_per_call`](Self::wait_list_per_call). - /// - /// Note that the per-entry sleep only happens as the user consumes the output entries. Should - /// there be an intermediate failure (i.e. after partly consuming the output entries), the - /// resulting sleep time will be partial as well. - pub wait_list_per_entry: Duration, - - /// Sleep duration for every call to - /// [`list_with_delimiter`](ThrottledStore::list_with_delimiter). - /// - /// Sleeping is done before the underlying store is called and independently of the success of - /// the operation. The sleep duration is additive to - /// [`wait_list_with_delimiter_per_entry`](Self::wait_list_with_delimiter_per_entry). - pub wait_list_with_delimiter_per_call: Duration, - - /// Sleep duration for every entry received during - /// [`list_with_delimiter`](ThrottledStore::list_with_delimiter). - /// - /// Sleeping is performed after the underlying store returned and only for successful gets. The - /// sleep duration is additive to - /// [`wait_list_with_delimiter_per_call`](Self::wait_list_with_delimiter_per_call). - pub wait_list_with_delimiter_per_entry: Duration, - - /// Sleep duration for every call to [`put`](ThrottledStore::put). - /// - /// Sleeping is done before the underlying store is called and independently of the success of - /// the operation. - pub wait_put_per_call: Duration, -} - -/// Store wrapper that wraps an inner store with some `sleep` calls. -/// -/// This can be used for performance testing. -/// -/// **Note that the behavior of the wrapper is deterministic and might not reflect real-world -/// conditions!** -#[derive(Debug)] -pub struct ThrottledStore { - inner: T, - config: Arc>, -} - -impl ThrottledStore { - /// Create new wrapper with zero waiting times. - pub fn new(inner: T, config: ThrottleConfig) -> Self { - Self { - inner, - config: Arc::new(Mutex::new(config)), - } - } - - /// Mutate config. - pub fn config_mut(&self, f: F) - where - F: Fn(&mut ThrottleConfig), - { - let mut guard = self.config.lock().expect("lock poissened"); - f(&mut guard) - } - - /// Return copy of current config. - pub fn config(&self) -> ThrottleConfig { - *self.config.lock().expect("lock poissened") - } -} - -impl std::fmt::Display for ThrottledStore { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "ThrottledStore({})", self.inner) - } -} - -#[async_trait] -impl ObjectStoreApi for ThrottledStore { - async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> { - sleep(self.config().wait_put_per_call).await; - - self.inner.put(location, bytes).await - } - - async fn get(&self, location: &Path) -> Result { - sleep(self.config().wait_get_per_call).await; - - // need to copy to avoid moving / referencing `self` - let wait_get_per_byte = self.config().wait_get_per_byte; - - self.inner.get(location).await.map(|result| { - let s = match result { - GetResult::Stream(s) => s, - GetResult::File(_, _) => unimplemented!(), - }; - - GetResult::Stream( - s.then(move |bytes_result| async move { - match bytes_result { - Ok(bytes) => { - let bytes_len: u32 = usize_to_u32_saturate(bytes.len()); - sleep(wait_get_per_byte * bytes_len).await; - Ok(bytes) - } - Err(err) => Err(err), - } - }) - .boxed(), - ) - }) - } - - async fn head(&self, location: &Path) -> Result { - sleep(self.config().wait_put_per_call).await; - self.inner.head(location).await - } - - async fn delete(&self, location: &Path) -> Result<()> { - sleep(self.config().wait_delete_per_call).await; - - self.inner.delete(location).await - } - - async fn list<'a>( - &'a self, - prefix: Option<&'a Path>, - ) -> Result>> { - sleep(self.config().wait_list_per_call).await; - - // need to copy to avoid moving / referencing `self` - let wait_list_per_entry = self.config().wait_list_per_entry; - - self.inner.list(prefix).await.map(|stream| { - stream - .then(move |result| async move { - match result { - Ok(entry) => { - sleep(wait_list_per_entry).await; - Ok(entry) - } - Err(err) => Err(err), - } - }) - .boxed() - }) - } - - async fn list_with_delimiter(&self, prefix: &Path) -> Result { - sleep(self.config().wait_list_with_delimiter_per_call).await; - - match self.inner.list_with_delimiter(prefix).await { - Ok(list_result) => { - let entries_len = usize_to_u32_saturate(list_result.objects.len()); - sleep(self.config().wait_list_with_delimiter_per_entry * entries_len).await; - Ok(list_result) - } - Err(err) => Err(err), - } - } -} - -/// Saturated `usize` to `u32` cast. -fn usize_to_u32_saturate(x: usize) -> u32 { - x.try_into().unwrap_or(u32::MAX) -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::{ - memory::InMemory, - tests::{list_uses_directories_correctly, list_with_delimiter, put_get_delete_list}, - }; - use bytes::Bytes; - use futures::TryStreamExt; - use tokio::time::Duration; - use tokio::time::Instant; - - const WAIT_TIME: Duration = Duration::from_millis(100); - const ZERO: Duration = Duration::from_millis(0); // Duration::default isn't constant - - macro_rules! assert_bounds { - ($d:expr, $lower:expr) => { - assert_bounds!($d, $lower, $lower + 1); - }; - ($d:expr, $lower:expr, $upper:expr) => { - let d = $d; - let lower = $lower * WAIT_TIME; - let upper = $upper * WAIT_TIME; - assert!(d >= lower, "{:?} must be >= than {:?}", d, lower); - assert!(d < upper, "{:?} must be < than {:?}", d, upper); - }; - } - - #[tokio::test] - async fn throttle_test() { - let inner = InMemory::new(); - let store = ThrottledStore::new(inner, ThrottleConfig::default()); - - put_get_delete_list(&store).await.unwrap(); - list_uses_directories_correctly(&store).await.unwrap(); - list_with_delimiter(&store).await.unwrap(); - } - - #[tokio::test] - async fn delete_test() { - let inner = InMemory::new(); - let store = ThrottledStore::new(inner, ThrottleConfig::default()); - - assert_bounds!(measure_delete(&store, None).await, 0); - assert_bounds!(measure_delete(&store, Some(0)).await, 0); - assert_bounds!(measure_delete(&store, Some(10)).await, 0); - - store.config_mut(|cfg| cfg.wait_delete_per_call = WAIT_TIME); - assert_bounds!(measure_delete(&store, None).await, 1); - assert_bounds!(measure_delete(&store, Some(0)).await, 1); - assert_bounds!(measure_delete(&store, Some(10)).await, 1); - } - - #[tokio::test] - async fn get_test() { - let inner = InMemory::new(); - let store = ThrottledStore::new(inner, ThrottleConfig::default()); - - assert_bounds!(measure_get(&store, None).await, 0); - assert_bounds!(measure_get(&store, Some(0)).await, 0); - assert_bounds!(measure_get(&store, Some(10)).await, 0); - - store.config_mut(|cfg| cfg.wait_get_per_call = WAIT_TIME); - assert_bounds!(measure_get(&store, None).await, 1); - assert_bounds!(measure_get(&store, Some(0)).await, 1); - assert_bounds!(measure_get(&store, Some(10)).await, 1); - - store.config_mut(|cfg| { - cfg.wait_get_per_call = ZERO; - cfg.wait_get_per_byte = WAIT_TIME; - }); - assert_bounds!(measure_get(&store, Some(2)).await, 2); - - store.config_mut(|cfg| { - cfg.wait_get_per_call = WAIT_TIME; - cfg.wait_get_per_byte = WAIT_TIME; - }); - assert_bounds!(measure_get(&store, Some(2)).await, 3); - } - - #[tokio::test] - async fn list_test() { - let inner = InMemory::new(); - let store = ThrottledStore::new(inner, ThrottleConfig::default()); - - assert_bounds!(measure_list(&store, 0).await, 0); - assert_bounds!(measure_list(&store, 10).await, 0); - - store.config_mut(|cfg| cfg.wait_list_per_call = WAIT_TIME); - assert_bounds!(measure_list(&store, 0).await, 1); - assert_bounds!(measure_list(&store, 10).await, 1); - - store.config_mut(|cfg| { - cfg.wait_list_per_call = ZERO; - cfg.wait_list_per_entry = WAIT_TIME; - }); - assert_bounds!(measure_list(&store, 2).await, 2); - - store.config_mut(|cfg| { - cfg.wait_list_per_call = WAIT_TIME; - cfg.wait_list_per_entry = WAIT_TIME; - }); - assert_bounds!(measure_list(&store, 2).await, 3); - } - - #[tokio::test] - async fn list_with_delimiter_test() { - let inner = InMemory::new(); - let store = ThrottledStore::new(inner, ThrottleConfig::default()); - - assert_bounds!(measure_list_with_delimiter(&store, 0).await, 0); - assert_bounds!(measure_list_with_delimiter(&store, 10).await, 0); - - store.config_mut(|cfg| cfg.wait_list_with_delimiter_per_call = WAIT_TIME); - assert_bounds!(measure_list_with_delimiter(&store, 0).await, 1); - assert_bounds!(measure_list_with_delimiter(&store, 10).await, 1); - - store.config_mut(|cfg| { - cfg.wait_list_with_delimiter_per_call = ZERO; - cfg.wait_list_with_delimiter_per_entry = WAIT_TIME; - }); - assert_bounds!(measure_list_with_delimiter(&store, 2).await, 2); - - store.config_mut(|cfg| { - cfg.wait_list_with_delimiter_per_call = WAIT_TIME; - cfg.wait_list_with_delimiter_per_entry = WAIT_TIME; - }); - assert_bounds!(measure_list_with_delimiter(&store, 2).await, 3); - } - - #[tokio::test] - async fn put_test() { - let inner = InMemory::new(); - let store = ThrottledStore::new(inner, ThrottleConfig::default()); - - assert_bounds!(measure_put(&store, 0).await, 0); - assert_bounds!(measure_put(&store, 10).await, 0); - - store.config_mut(|cfg| cfg.wait_put_per_call = WAIT_TIME); - assert_bounds!(measure_put(&store, 0).await, 1); - assert_bounds!(measure_put(&store, 10).await, 1); - - store.config_mut(|cfg| cfg.wait_put_per_call = ZERO); - assert_bounds!(measure_put(&store, 0).await, 0); - } - - async fn place_test_object(store: &ThrottledStore, n_bytes: Option) -> Path { - let path = Path::from_raw("foo"); - - if let Some(n_bytes) = n_bytes { - let data: Vec<_> = std::iter::repeat(1u8).take(n_bytes).collect(); - let bytes = Bytes::from(data); - store.put(&path, bytes).await.unwrap(); - } else { - // ensure object is absent - store.delete(&path).await.unwrap(); - } - - path - } - - async fn place_test_objects(store: &ThrottledStore, n_entries: usize) -> Path { - let prefix = Path::from_raw("foo"); - - // clean up store - let entries: Vec<_> = store - .list(Some(&prefix)) - .await - .unwrap() - .try_collect() - .await - .unwrap(); - - for entry in entries { - store.delete(&entry.location).await.unwrap(); - } - - // create new entries - for i in 0..n_entries { - let path = prefix.child(i.to_string().as_str()); - - let data = Bytes::from("bar"); - store.put(&path, data).await.unwrap(); - } - - prefix - } - - async fn measure_delete(store: &ThrottledStore, n_bytes: Option) -> Duration { - let path = place_test_object(store, n_bytes).await; - - let t0 = Instant::now(); - store.delete(&path).await.unwrap(); - - t0.elapsed() - } - - async fn measure_get(store: &ThrottledStore, n_bytes: Option) -> Duration { - let path = place_test_object(store, n_bytes).await; - - let t0 = Instant::now(); - let res = store.get(&path).await; - if n_bytes.is_some() { - // need to consume bytes to provoke sleep times - let s = match res.unwrap() { - GetResult::Stream(s) => s, - GetResult::File(_, _) => unimplemented!(), - }; - - s.map_ok(|b| bytes::BytesMut::from(&b[..])) - .try_concat() - .await - .unwrap(); - } else { - assert!(res.is_err()); - } - - t0.elapsed() - } - - async fn measure_list(store: &ThrottledStore, n_entries: usize) -> Duration { - let prefix = place_test_objects(store, n_entries).await; - - let t0 = Instant::now(); - store - .list(Some(&prefix)) - .await - .unwrap() - .try_collect::>() - .await - .unwrap(); - - t0.elapsed() - } - - async fn measure_list_with_delimiter( - store: &ThrottledStore, - n_entries: usize, - ) -> Duration { - let prefix = place_test_objects(store, n_entries).await; - - let t0 = Instant::now(); - store.list_with_delimiter(&prefix).await.unwrap(); - - t0.elapsed() - } - - async fn measure_put(store: &ThrottledStore, n_bytes: usize) -> Duration { - let data: Vec<_> = std::iter::repeat(1u8).take(n_bytes).collect(); - let bytes = Bytes::from(data); - - let t0 = Instant::now(); - store.put(&Path::from_raw("foo"), bytes).await.unwrap(); - - t0.elapsed() - } -} diff --git a/object_store_metrics/Cargo.toml b/object_store_metrics/Cargo.toml index ec21e1673b..5a0c150c5d 100644 --- a/object_store_metrics/Cargo.toml +++ b/object_store_metrics/Cargo.toml @@ -10,7 +10,7 @@ bytes = "1.1" futures = "0.3" iox_time = { version = "0.1.0", path = "../iox_time" } metric = { version = "0.1.0", path = "../metric" } -object_store = { path = "../object_store" } +object_store = "0.0.1" pin-project = "1.0.10" workspace-hack = { path = "../workspace-hack" } diff --git a/object_store_metrics/src/dummy.rs b/object_store_metrics/src/dummy.rs index 1f6d8eb977..2cd18feb73 100644 --- a/object_store_metrics/src/dummy.rs +++ b/object_store_metrics/src/dummy.rs @@ -5,7 +5,7 @@ use async_trait::async_trait; use bytes::Bytes; use snafu::Snafu; -use object_store::{path::Path, GetResult, ListResult, ObjectMeta, ObjectStoreApi, Result}; +use object_store::{path::Path, GetResult, ListResult, ObjectMeta, ObjectStore, Result}; /// A specialized `Error` for Azure object store-related errors #[derive(Debug, Snafu, Clone)] @@ -50,7 +50,7 @@ impl std::fmt::Display for DummyObjectStore { } #[async_trait] -impl ObjectStoreApi for DummyObjectStore { +impl ObjectStore for DummyObjectStore { async fn put(&self, _location: &Path, _bytes: Bytes) -> Result<()> { Ok(NotSupportedSnafu { name: self.name }.fail()?) } diff --git a/object_store_metrics/src/lib.rs b/object_store_metrics/src/lib.rs index c0725b56e5..8fbaa7e016 100644 --- a/object_store_metrics/src/lib.rs +++ b/object_store_metrics/src/lib.rs @@ -1,4 +1,4 @@ -//! A metric instrumentation wrapper over [`ObjectStoreApi`] implementations. +//! A metric instrumentation wrapper over [`ObjectStore`] implementations. use std::sync::Arc; use std::{ @@ -14,23 +14,23 @@ use iox_time::{SystemProvider, Time, TimeProvider}; use metric::{Metric, U64Counter, U64Histogram, U64HistogramOptions}; use pin_project::{pin_project, pinned_drop}; -use object_store::{path::Path, GetResult, ListResult, ObjectMeta, ObjectStoreApi, Result}; +use object_store::{path::Path, GetResult, ListResult, ObjectMeta, ObjectStore, Result}; #[cfg(test)] mod dummy; -/// An instrumentation decorator, wrapping an underlying [`ObjectStoreApi`] +/// An instrumentation decorator, wrapping an underlying [`ObjectStore`] /// implementation and recording bytes transferred and call latency. /// /// # Stream Duration /// -/// The [`ObjectStoreApi::get()`] call can return a [`Stream`] which is polled +/// The [`ObjectStore::get()`] call can return a [`Stream`] which is polled /// by the caller and may yield chunks of a file over a series of polls (as /// opposed to all of the file data in one go). Because the caller drives the /// polling and therefore fetching of data from the object store over the -/// lifetime of the [`Stream`], the duration of a [`ObjectStoreApi::get()`] +/// lifetime of the [`Stream`], the duration of a [`ObjectStore::get()`] /// request is measured to be the wall clock difference between the moment the -/// caller executes the [`ObjectStoreApi::get()`] call, up until the last chunk +/// caller executes the [`ObjectStore::get()`] call, up until the last chunk /// of data is yielded to the caller. /// /// This means the duration metrics measuring consumption of returned streams @@ -39,7 +39,7 @@ mod dummy; /// /// # Stream Errors /// -/// The [`ObjectStoreApi::get()`] method can return a [`Stream`] of [`Result`] +/// The [`ObjectStore::get()`] method can return a [`Stream`] of [`Result`] /// instances, and returning an error when polled is not necessarily a terminal /// state. The metric recorder allows for a caller to observe a transient error /// and subsequently go on to complete reading the stream, recording this read @@ -67,7 +67,7 @@ mod dummy; /// are not recorded. The bytes transferred metric is not affected. #[derive(Debug)] pub struct ObjectStoreMetrics { - inner: Arc, + inner: Arc, time_provider: Arc, put_success_duration_ms: U64Histogram, @@ -88,7 +88,7 @@ pub struct ObjectStoreMetrics { impl ObjectStoreMetrics { /// Instrument `T`, pushing to `registry`. pub fn new( - inner: Arc, + inner: Arc, time_provider: Arc, registry: &metric::Registry, ) -> Self { @@ -162,7 +162,7 @@ impl std::fmt::Display for ObjectStoreMetrics { } #[async_trait] -impl ObjectStoreApi for ObjectStoreMetrics { +impl ObjectStore for ObjectStoreMetrics { async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> { let t = self.time_provider.now(); @@ -309,7 +309,7 @@ trait MetricDelegate { /// A [`MetricDelegate`] for instrumented streams of [`Bytes`]. /// /// This impl is used to record the number of bytes yielded for -/// [`ObjectStoreApi::get()`] calls. +/// [`ObjectStore::get()`] calls. #[derive(Debug)] struct BytesStreamDelegate(U64Counter); @@ -522,7 +522,7 @@ mod tests { use tokio::io::AsyncReadExt; use dummy::DummyObjectStore; - use object_store::{disk::LocalFileSystem, memory::InMemory}; + use object_store::{local::LocalFileSystem, memory::InMemory}; use super::*; diff --git a/parquet_file/Cargo.toml b/parquet_file/Cargo.toml index 81d26920e2..827a57d8e0 100644 --- a/parquet_file/Cargo.toml +++ b/parquet_file/Cargo.toml @@ -15,7 +15,7 @@ futures = "0.3" generated_types = { path = "../generated_types" } iox_time = { path = "../iox_time" } metric = { path = "../metric" } -object_store = { path = "../object_store" } +object_store = "0.0.1" observability_deps = { path = "../observability_deps" } parking_lot = "0.12" parquet = {version = "13", features = ["experimental"]} diff --git a/querier/Cargo.toml b/querier/Cargo.toml index ba380a4f92..360d73e5c1 100644 --- a/querier/Cargo.toml +++ b/querier/Cargo.toml @@ -17,7 +17,7 @@ generated_types = { path = "../generated_types" } influxdb_iox_client = { path = "../influxdb_iox_client" } iox_catalog = { path = "../iox_catalog" } metric = { path = "../metric" } -object_store = { path = "../object_store" } +object_store = "0.0.1" observability_deps = { path = "../observability_deps" } parking_lot = "0.12" parquet_file = { path = "../parquet_file" } diff --git a/router/Cargo.toml b/router/Cargo.toml index 8d5e1a0dfb..8953add387 100644 --- a/router/Cargo.toml +++ b/router/Cargo.toml @@ -20,7 +20,7 @@ metric = { path = "../metric" } mutable_batch = { path = "../mutable_batch" } mutable_batch_lp = { path = "../mutable_batch_lp" } mutable_batch_pb = { version = "0.1.0", path = "../mutable_batch_pb" } -object_store = { path = "../object_store" } +object_store = "0.0.1" observability_deps = { path = "../observability_deps" } parking_lot = "0.12" predicate = { path = "../predicate" } diff --git a/service_grpc_object_store/Cargo.toml b/service_grpc_object_store/Cargo.toml index 585d8ebd7b..8c2c418160 100644 --- a/service_grpc_object_store/Cargo.toml +++ b/service_grpc_object_store/Cargo.toml @@ -8,7 +8,7 @@ data_types = { path = "../data_types" } futures = "0.3" generated_types = { path = "../generated_types" } iox_catalog = { path = "../iox_catalog" } -object_store = { path = "../object_store" } +object_store = "0.0.1" observability_deps = { path = "../observability_deps" } parquet_file = { path = "../parquet_file" } tokio = { version = "1", features = ["rt-multi-thread", "macros"] } diff --git a/service_grpc_object_store/src/lib.rs b/service_grpc_object_store/src/lib.rs index 86afaa969a..e33e6a7978 100644 --- a/service_grpc_object_store/src/lib.rs +++ b/service_grpc_object_store/src/lib.rs @@ -98,7 +98,7 @@ mod tests { use data_types::{KafkaPartition, ParquetFileParams, SequenceNumber, Timestamp}; use generated_types::influxdata::iox::object_store::v1::object_store_service_server::ObjectStoreService; use iox_catalog::mem::MemCatalog; - use object_store::{memory::InMemory, ObjectStoreApi}; + use object_store::{memory::InMemory, ObjectStore}; use uuid::Uuid; #[tokio::test] diff --git a/workspace-hack/Cargo.toml b/workspace-hack/Cargo.toml index 2ef7f2290b..c731a91525 100644 --- a/workspace-hack/Cargo.toml +++ b/workspace-hack/Cargo.toml @@ -42,6 +42,7 @@ nom = { version = "7", features = ["alloc", "std"] } num-bigint = { version = "0.4", features = ["std"] } num-integer = { version = "0.1", default-features = false, features = ["i128", "std"] } num-traits = { version = "0.2", features = ["i128", "libm", "std"] } +object_store = { version = "0.0.1", default-features = false, features = ["aws", "azure", "azure_core", "azure_storage", "azure_storage_blobs", "cloud-storage", "gcp", "hyper", "hyper-rustls", "reqwest", "rusoto_core", "rusoto_credential", "rusoto_s3"] } once_cell = { version = "1", features = ["alloc", "parking_lot", "race", "std"] } parquet = { version = "13", features = ["arrow", "base64", "brotli", "experimental", "flate2", "lz4", "snap", "zstd"] } predicates = { version = "2", features = ["diff", "difflib", "float-cmp", "normalize-line-endings", "regex"] }