From 835e1c91c71a5929928269fa87a4e4b86dcedffb Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> Date: Wed, 29 Jun 2022 22:44:03 +0100 Subject: [PATCH] chore: update object_store to 0.3.0 (#4707) * chore: update object_store to 0.3.0 * chore: review feedback Co-authored-by: Andrew Lamb Co-authored-by: Andrew Lamb Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- Cargo.lock | 7 ++--- clap_blocks/Cargo.toml | 2 +- clap_blocks/src/object_store.rs | 20 +++++++++---- compactor/Cargo.toml | 2 +- influxdb_iox/Cargo.toml | 2 +- ingester/Cargo.toml | 2 +- iox_tests/Cargo.toml | 2 +- ioxd_compactor/Cargo.toml | 2 +- ioxd_ingester/Cargo.toml | 2 +- ioxd_querier/Cargo.toml | 2 +- ioxd_router/Cargo.toml | 2 +- object_store_metrics/Cargo.toml | 2 +- object_store_metrics/src/dummy.rs | 23 +++++++++++---- object_store_metrics/src/lib.rs | 44 ++++++++++++++++++---------- parquet_file/Cargo.toml | 2 +- parquet_file/src/storage.rs | 3 +- querier/Cargo.toml | 2 +- router/Cargo.toml | 2 +- service_grpc_object_store/Cargo.toml | 2 +- workspace-hack/Cargo.toml | 4 +-- 20 files changed, 82 insertions(+), 47 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ad75a8e702..47250beaac 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3188,9 +3188,9 @@ dependencies = [ [[package]] name = "object_store" -version = "0.0.1" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "71fc15440e94ba9b3a8bc83c8f94a0d5a2861d72254cdb9ede72fbbd18e015db" +checksum = "857af043f5d9f36ed4f71815857f79b841412dda1cf0ca5a29608874f6f038e2" dependencies = [ "async-trait", "azure_core", @@ -3209,10 +3209,9 @@ dependencies = [ "rusoto_credential", "rusoto_s3", "snafu", - "tempfile", "tokio", - "tokio-util", "tracing", + "url", "walkdir", ] diff --git a/clap_blocks/Cargo.toml b/clap_blocks/Cargo.toml index 4e49796c79..fb4fad9a6b 100644 --- a/clap_blocks/Cargo.toml +++ b/clap_blocks/Cargo.toml @@ -10,7 +10,7 @@ humantime = "2.1.0" iox_catalog = { path = "../iox_catalog" } iox_time = { path = "../iox_time" } metric = { path = "../metric" } -object_store = "0.0.1" +object_store = "0.3.0" observability_deps = { path = "../observability_deps" } snafu = "0.7" tempfile = "3.1.0" diff --git a/clap_blocks/src/object_store.rs b/clap_blocks/src/object_store.rs index cfe4d56e43..9f9b70789c 100644 --- a/clap_blocks/src/object_store.rs +++ b/clap_blocks/src/object_store.rs @@ -19,6 +19,12 @@ pub enum ParseError { source: std::io::Error, }, + #[snafu(display("Unable to create local store {:?}: {}", path, source))] + CreateLocalFileSystem { + path: PathBuf, + source: object_store::Error, + }, + #[snafu(display( "Specified {:?} for the object store, required configuration missing for {}", object_store, @@ -388,7 +394,10 @@ pub fn make_object_store(config: &ObjectStoreConfig) -> Result { fs::create_dir_all(db_dir) .context(CreatingDatabaseDirectorySnafu { path: db_dir })?; - Ok(Arc::new(object_store::local::LocalFileSystem::new(&db_dir))) + + let store = object_store::local::LocalFileSystem::new_with_prefix(db_dir) + .context(CreateLocalFileSystemSnafu { path: db_dir })?; + Ok(Arc::new(store)) } None => MissingObjectStoreConfigSnafu { object_store: ObjectStoreType::File, @@ -578,10 +587,11 @@ mod tests { ]) .unwrap(); - let object_store = make_object_store(&config).unwrap(); - assert_eq!( - object_store.to_string(), - format!("LocalFileSystem({})", root_path) + let object_store = make_object_store(&config).unwrap().to_string(); + assert!( + object_store.starts_with("LocalFileSystem"), + "{}", + object_store ) } diff --git a/compactor/Cargo.toml b/compactor/Cargo.toml index d2f441215e..533ddd2f62 100644 --- a/compactor/Cargo.toml +++ b/compactor/Cargo.toml @@ -14,7 +14,7 @@ datafusion = { path = "../datafusion" } futures = "0.3" iox_catalog = { path = "../iox_catalog" } metric = { path = "../metric" } -object_store = "0.0.1" +object_store = "0.3.0" observability_deps = { path = "../observability_deps" } parquet_file = { path = "../parquet_file" } predicate = { path = "../predicate" } diff --git a/influxdb_iox/Cargo.toml b/influxdb_iox/Cargo.toml index 5b10ec8809..60cf4c87e3 100644 --- a/influxdb_iox/Cargo.toml +++ b/influxdb_iox/Cargo.toml @@ -22,7 +22,7 @@ ioxd_router = { path = "../ioxd_router"} ioxd_querier = { path = "../ioxd_querier"} ioxd_test = { path = "../ioxd_test"} metric = { path = "../metric" } -object_store = "0.0.1" +object_store = "0.3.0" object_store_metrics = { path = "../object_store_metrics" } observability_deps = { path = "../observability_deps" } panic_logging = { path = "../panic_logging" } diff --git a/ingester/Cargo.toml b/ingester/Cargo.toml index 8e25bcb91d..9a6c67c256 100644 --- a/ingester/Cargo.toml +++ b/ingester/Cargo.toml @@ -24,7 +24,7 @@ iox_catalog = { path = "../iox_catalog" } metric = { path = "../metric" } mutable_batch = { path = "../mutable_batch"} mutable_batch_lp = { path = "../mutable_batch_lp" } -object_store = "0.0.1" +object_store = "0.3.0" observability_deps = { path = "../observability_deps" } parking_lot = "0.12" parquet_file = { path = "../parquet_file" } diff --git a/iox_tests/Cargo.toml b/iox_tests/Cargo.toml index 4bd573c99e..ffc6cdb87c 100644 --- a/iox_tests/Cargo.toml +++ b/iox_tests/Cargo.toml @@ -14,7 +14,7 @@ iox_catalog = { path = "../iox_catalog" } iox_time = { path = "../iox_time" } metric = { path = "../metric" } mutable_batch_lp = { path = "../mutable_batch_lp" } -object_store = "0.0.1" +object_store = "0.3.0" observability_deps = { path = "../observability_deps" } parquet_file = { path = "../parquet_file" } iox_query = { path = "../iox_query" } diff --git a/ioxd_compactor/Cargo.toml b/ioxd_compactor/Cargo.toml index f919ee69f3..b7205d9152 100644 --- a/ioxd_compactor/Cargo.toml +++ b/ioxd_compactor/Cargo.toml @@ -14,7 +14,7 @@ iox_catalog = { path = "../iox_catalog" } ioxd_common = { path = "../ioxd_common" } metric = { path = "../metric" } iox_query = { path = "../iox_query" } -object_store = "0.0.1" +object_store = "0.3.0" iox_time = { path = "../iox_time" } trace = { path = "../trace" } diff --git a/ioxd_ingester/Cargo.toml b/ioxd_ingester/Cargo.toml index ddb1fe7e05..8405a52be2 100644 --- a/ioxd_ingester/Cargo.toml +++ b/ioxd_ingester/Cargo.toml @@ -11,7 +11,7 @@ ingester = { path = "../ingester" } iox_catalog = { path = "../iox_catalog" } ioxd_common = { path = "../ioxd_common" } metric = { path = "../metric" } -object_store = "0.0.1" +object_store = "0.3.0" iox_query = { path = "../iox_query" } trace = { path = "../trace" } write_buffer = { path = "../write_buffer" } diff --git a/ioxd_querier/Cargo.toml b/ioxd_querier/Cargo.toml index 47c619780e..331b2d9b5d 100644 --- a/ioxd_querier/Cargo.toml +++ b/ioxd_querier/Cargo.toml @@ -11,7 +11,7 @@ generated_types = { path = "../generated_types" } iox_catalog = { path = "../iox_catalog" } ioxd_common = { path = "../ioxd_common" } metric = { path = "../metric" } -object_store = "0.0.1" +object_store = "0.3.0" querier = { path = "../querier" } iox_query = { path = "../iox_query" } router = { path = "../router" } diff --git a/ioxd_router/Cargo.toml b/ioxd_router/Cargo.toml index 7225e44f34..833a2ade8e 100644 --- a/ioxd_router/Cargo.toml +++ b/ioxd_router/Cargo.toml @@ -11,7 +11,7 @@ iox_catalog = { path = "../iox_catalog" } ioxd_common = { path = "../ioxd_common" } metric = { path = "../metric" } mutable_batch = { path = "../mutable_batch" } -object_store = "0.0.1" +object_store = "0.3.0" observability_deps = { path = "../observability_deps" } router = { path = "../router" } sharder = { path = "../sharder" } diff --git a/object_store_metrics/Cargo.toml b/object_store_metrics/Cargo.toml index dc4a9a3718..5002a723ab 100644 --- a/object_store_metrics/Cargo.toml +++ b/object_store_metrics/Cargo.toml @@ -10,7 +10,7 @@ bytes = "1.1" futures = "0.3" iox_time = { version = "0.1.0", path = "../iox_time" } metric = { version = "0.1.0", path = "../metric" } -object_store = "0.0.1" +object_store = "0.3.0" pin-project = "1.0.10" workspace-hack = { path = "../workspace-hack" } diff --git a/object_store_metrics/src/dummy.rs b/object_store_metrics/src/dummy.rs index 2cd18feb73..2a360c6a82 100644 --- a/object_store_metrics/src/dummy.rs +++ b/object_store_metrics/src/dummy.rs @@ -4,6 +4,7 @@ use async_trait::async_trait; use bytes::Bytes; use snafu::Snafu; +use std::ops::Range; use object_store::{path::Path, GetResult, ListResult, ObjectMeta, ObjectStore, Result}; @@ -59,6 +60,10 @@ impl ObjectStore for DummyObjectStore { Ok(NotSupportedSnafu { name: self.name }.fail()?) } + async fn get_range(&self, _location: &Path, _range: Range) -> Result { + Ok(NotSupportedSnafu { name: self.name }.fail()?) + } + async fn head(&self, _location: &Path) -> Result { Ok(NotSupportedSnafu { name: self.name }.fail()?) } @@ -67,14 +72,22 @@ impl ObjectStore for DummyObjectStore { Ok(NotSupportedSnafu { name: self.name }.fail()?) } - async fn list<'a>( - &'a self, - _prefix: Option<&'a Path>, - ) -> Result>> { + async fn list( + &self, + _prefix: Option<&Path>, + ) -> Result>> { Ok(NotSupportedSnafu { name: self.name }.fail()?) } - async fn list_with_delimiter(&self, _prefix: &Path) -> Result { + async fn list_with_delimiter(&self, _prefix: Option<&Path>) -> Result { + Ok(NotSupportedSnafu { name: self.name }.fail()?) + } + + async fn copy(&self, _from: &Path, _to: &Path) -> Result<()> { + Ok(NotSupportedSnafu { name: self.name }.fail()?) + } + + async fn copy_if_not_exists(&self, _from: &Path, _to: &Path) -> Result<()> { Ok(NotSupportedSnafu { name: self.name }.fail()?) } } diff --git a/object_store_metrics/src/lib.rs b/object_store_metrics/src/lib.rs index d673bc77ae..a46b1ca259 100644 --- a/object_store_metrics/src/lib.rs +++ b/object_store_metrics/src/lib.rs @@ -1,5 +1,6 @@ //! A metric instrumentation wrapper over [`ObjectStore`] implementations. +use std::ops::Range; use std::sync::Arc; use std::{ marker::PhantomData, @@ -170,7 +171,7 @@ impl ObjectStore for ObjectStoreMetrics { match res { Ok(GetResult::File(file, path)) => { // Record the file size in bytes and time the inner call took. - if let Ok(m) = file.metadata().await { + if let Ok(m) = file.metadata() { self.get_bytes.inc(m.len()); if let Some(d) = self.time_provider.now().checked_duration_since(started_at) { self.get_success_duration.record(d) @@ -202,6 +203,11 @@ impl ObjectStore for ObjectStoreMetrics { } } + async fn get_range(&self, location: &Path, range: Range) -> Result { + // TODO: Add instrumentation of get_range requests + self.inner.get_range(location, range).await + } + async fn head(&self, location: &Path) -> Result { // TODO: Add instrumentation of head requests self.inner.head(location).await @@ -224,10 +230,7 @@ impl ObjectStore for ObjectStoreMetrics { res } - async fn list<'a>( - &'a self, - prefix: Option<&'a Path>, - ) -> Result>> { + async fn list(&self, prefix: Option<&Path>) -> Result>> { let started_at = self.time_provider.now(); let res = self.inner.list(prefix).await; @@ -257,7 +260,7 @@ impl ObjectStore for ObjectStoreMetrics { } } - async fn list_with_delimiter(&self, prefix: &Path) -> Result { + async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result { let t = self.time_provider.now(); let res = self.inner.list_with_delimiter(prefix).await; @@ -273,6 +276,16 @@ impl ObjectStore for ObjectStoreMetrics { res } + + async fn copy(&self, from: &Path, to: &Path) -> Result<()> { + // TODO: Instrument me + self.inner.copy(from, to).await + } + + async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> { + // TODO: Instrument me + self.inner.copy_if_not_exists(from, to).await + } } /// A [`MetricDelegate`] is called whenever the [`StreamMetricRecorder`] @@ -497,7 +510,7 @@ mod tests { use futures::stream; use metric::Attributes; - use tokio::io::AsyncReadExt; + use std::io::Read; use dummy::DummyObjectStore; use object_store::{local::LocalFileSystem, memory::InMemory}; @@ -544,7 +557,7 @@ mod tests { store .put( - &Path::from_raw("test"), + &Path::from("test"), Bytes::from([42_u8, 42, 42, 42, 42].as_slice()), ) .await @@ -567,7 +580,7 @@ mod tests { store .put( - &Path::from_raw("test"), + &Path::from("test"), Bytes::from([42_u8, 42, 42, 42, 42].as_slice()), ) .await @@ -621,7 +634,7 @@ mod tests { let store = ObjectStoreMetrics::new(store, time, &metrics); store - .list_with_delimiter(&Path::from_raw("test")) + .list_with_delimiter(Some(&Path::from("test"))) .await .expect("list should succeed"); @@ -641,7 +654,7 @@ mod tests { assert!( store - .list_with_delimiter(&Path::from_raw("test")) + .list_with_delimiter(Some(&Path::from("test"))) .await .is_err(), "mock configured to fail" @@ -662,7 +675,7 @@ mod tests { let store = ObjectStoreMetrics::new(store, time, &metrics); store - .get(&Path::from_raw("test")) + .get(&Path::from("test")) .await .expect_err("mock configured to fail"); @@ -676,12 +689,12 @@ mod tests { #[tokio::test] async fn test_put_get_delete_file() { let metrics = Arc::new(metric::Registry::default()); - let store = Arc::new(LocalFileSystem::new("./")); + let store = Arc::new(LocalFileSystem::new_with_prefix("./").unwrap()); let time = Arc::new(SystemProvider::new()); let store = ObjectStoreMetrics::new(store, time, &metrics); let data = [42_u8, 42, 42, 42, 42]; - let path = Path::from_raw("test"); + let path = Path::from("test"); store .put(&path, Bytes::copy_from_slice(&data)) .await @@ -692,7 +705,6 @@ mod tests { GetResult::File(mut file, _) => { let mut contents = vec![]; file.read_to_end(&mut contents) - .await .expect("failed to read file data"); assert_eq!(contents, &data); } @@ -726,7 +738,7 @@ mod tests { let store = ObjectStoreMetrics::new(store, time, &metrics); let data = [42_u8, 42, 42, 42, 42]; - let path = Path::from_raw("test"); + let path = Path::from("test"); store .put(&path, Bytes::copy_from_slice(&data)) .await diff --git a/parquet_file/Cargo.toml b/parquet_file/Cargo.toml index d86d875050..862b6fdd36 100644 --- a/parquet_file/Cargo.toml +++ b/parquet_file/Cargo.toml @@ -14,7 +14,7 @@ datafusion_util = { path = "../datafusion_util" } futures = "0.3" generated_types = { path = "../generated_types" } iox_time = { path = "../iox_time" } -object_store = "0.0.1" +object_store = "0.3.0" observability_deps = { path = "../observability_deps" } parking_lot = "0.12" parquet = {version = "16.0.0", features = ["experimental"]} diff --git a/parquet_file/src/storage.rs b/parquet_file/src/storage.rs index 6df5015362..3f865a19c5 100644 --- a/parquet_file/src/storage.rs +++ b/parquet_file/src/storage.rs @@ -269,8 +269,9 @@ async fn download_and_scan_parquet( let read_stream = object_store.get(&path).await?; let data = match read_stream { - GetResult::File(mut f, _) => { + GetResult::File(f, _) => { trace!(?path, "Using file directly"); + let mut f = tokio::fs::File::from_std(f); let l = f.metadata().await?.len(); let mut buf = Vec::with_capacity(l as usize); f.read_to_end(&mut buf).await?; diff --git a/querier/Cargo.toml b/querier/Cargo.toml index ffa83929b6..dc513b6cb1 100644 --- a/querier/Cargo.toml +++ b/querier/Cargo.toml @@ -17,7 +17,7 @@ generated_types = { path = "../generated_types" } influxdb_iox_client = { path = "../influxdb_iox_client" } iox_catalog = { path = "../iox_catalog" } metric = { path = "../metric" } -object_store = "0.0.1" +object_store = "0.3.0" observability_deps = { path = "../observability_deps" } parking_lot = "0.12" parquet_file = { path = "../parquet_file" } diff --git a/router/Cargo.toml b/router/Cargo.toml index 6b222808f9..b88badfa7e 100644 --- a/router/Cargo.toml +++ b/router/Cargo.toml @@ -20,7 +20,7 @@ metric = { path = "../metric" } mutable_batch = { path = "../mutable_batch" } mutable_batch_lp = { path = "../mutable_batch_lp" } mutable_batch_pb = { version = "0.1.0", path = "../mutable_batch_pb" } -object_store = "0.0.1" +object_store = "0.3.0" observability_deps = { path = "../observability_deps" } parking_lot = "0.12" predicate = { path = "../predicate" } diff --git a/service_grpc_object_store/Cargo.toml b/service_grpc_object_store/Cargo.toml index c7d9a6e387..87c9144a23 100644 --- a/service_grpc_object_store/Cargo.toml +++ b/service_grpc_object_store/Cargo.toml @@ -8,7 +8,7 @@ data_types = { path = "../data_types" } futures = "0.3" generated_types = { path = "../generated_types" } iox_catalog = { path = "../iox_catalog" } -object_store = "0.0.1" +object_store = "0.3.0" observability_deps = { path = "../observability_deps" } parquet_file = { path = "../parquet_file" } tokio = { version = "1", features = ["rt-multi-thread", "macros"] } diff --git a/workspace-hack/Cargo.toml b/workspace-hack/Cargo.toml index bda1e8b9eb..5809d03156 100644 --- a/workspace-hack/Cargo.toml +++ b/workspace-hack/Cargo.toml @@ -40,7 +40,7 @@ nom = { version = "7", features = ["alloc", "std"] } num-bigint = { version = "0.4", features = ["std"] } num-integer = { version = "0.1", default-features = false, features = ["i128", "std"] } num-traits = { version = "0.2", features = ["i128", "libm", "std"] } -object_store = { version = "0.0.1", default-features = false, features = ["aws", "azure", "azure_core", "azure_storage", "azure_storage_blobs", "cloud-storage", "gcp", "hyper", "hyper-rustls", "reqwest", "rusoto_core", "rusoto_credential", "rusoto_s3"] } +object_store = { version = "0.3", default-features = false, features = ["aws", "azure", "azure_core", "azure_storage", "azure_storage_blobs", "cloud-storage", "gcp", "hyper", "hyper-rustls", "reqwest", "rusoto_core", "rusoto_credential", "rusoto_s3"] } once_cell = { version = "1", features = ["alloc", "parking_lot", "parking_lot_core", "race", "std"] } parquet = { version = "16", features = ["arrow", "base64", "brotli", "experimental", "flate2", "lz4", "snap", "zstd"] } predicates = { version = "2", features = ["diff", "difflib", "float-cmp", "normalize-line-endings", "regex"] } @@ -59,7 +59,7 @@ sqlx = { version = "0.6", features = ["_rt-tokio", "json", "macros", "migrate", sqlx-core = { version = "0.6", default-features = false, features = ["_rt-tokio", "_tls-rustls", "any", "base64", "crc", "dirs", "hkdf", "hmac", "json", "md-5", "migrate", "postgres", "rand", "runtime-tokio-rustls", "rustls", "rustls-pemfile", "serde", "serde_json", "sha-1", "sha2", "tokio-stream", "uuid", "webpki-roots", "whoami"] } tokio = { version = "1", features = ["bytes", "fs", "io-std", "io-util", "libc", "macros", "memchr", "mio", "net", "num_cpus", "once_cell", "parking_lot", "rt", "rt-multi-thread", "signal", "signal-hook-registry", "socket2", "sync", "time", "tokio-macros", "tracing"] } tokio-stream = { version = "0.1", features = ["fs", "net", "time"] } -tokio-util = { version = "0.7", features = ["codec", "io", "tracing"] } +tokio-util = { version = "0.7", features = ["codec", "tracing"] } tower = { version = "0.4", features = ["__common", "balance", "buffer", "discover", "futures-core", "futures-util", "indexmap", "limit", "load", "log", "make", "pin-project", "pin-project-lite", "rand", "ready-cache", "slab", "timeout", "tokio", "tokio-util", "tracing", "util"] } tower-http = { version = "0.3", features = ["catch-panic", "map-response-body", "tower", "tracing", "util"] } tracing = { version = "0.1", features = ["attributes", "log", "max_level_trace", "release_max_level_trace", "std", "tracing-attributes"] }