chore: update object_store to 0.3.0 (#4707)
* chore: update object_store to 0.3.0 * chore: review feedback Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org> Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org> Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>pull/24376/head
parent
0cca975167
commit
835e1c91c7
|
|
@ -3188,9 +3188,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "object_store"
|
||||
version = "0.0.1"
|
||||
version = "0.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "71fc15440e94ba9b3a8bc83c8f94a0d5a2861d72254cdb9ede72fbbd18e015db"
|
||||
checksum = "857af043f5d9f36ed4f71815857f79b841412dda1cf0ca5a29608874f6f038e2"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"azure_core",
|
||||
|
|
@ -3209,10 +3209,9 @@ dependencies = [
|
|||
"rusoto_credential",
|
||||
"rusoto_s3",
|
||||
"snafu",
|
||||
"tempfile",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
"tracing",
|
||||
"url",
|
||||
"walkdir",
|
||||
]
|
||||
|
||||
|
|
|
|||
|
|
@ -10,7 +10,7 @@ humantime = "2.1.0"
|
|||
iox_catalog = { path = "../iox_catalog" }
|
||||
iox_time = { path = "../iox_time" }
|
||||
metric = { path = "../metric" }
|
||||
object_store = "0.0.1"
|
||||
object_store = "0.3.0"
|
||||
observability_deps = { path = "../observability_deps" }
|
||||
snafu = "0.7"
|
||||
tempfile = "3.1.0"
|
||||
|
|
|
|||
|
|
@ -19,6 +19,12 @@ pub enum ParseError {
|
|||
source: std::io::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Unable to create local store {:?}: {}", path, source))]
|
||||
CreateLocalFileSystem {
|
||||
path: PathBuf,
|
||||
source: object_store::Error,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Specified {:?} for the object store, required configuration missing for {}",
|
||||
object_store,
|
||||
|
|
@ -388,7 +394,10 @@ pub fn make_object_store(config: &ObjectStoreConfig) -> Result<Arc<DynObjectStor
|
|||
Some(db_dir) => {
|
||||
fs::create_dir_all(db_dir)
|
||||
.context(CreatingDatabaseDirectorySnafu { path: db_dir })?;
|
||||
Ok(Arc::new(object_store::local::LocalFileSystem::new(&db_dir)))
|
||||
|
||||
let store = object_store::local::LocalFileSystem::new_with_prefix(db_dir)
|
||||
.context(CreateLocalFileSystemSnafu { path: db_dir })?;
|
||||
Ok(Arc::new(store))
|
||||
}
|
||||
None => MissingObjectStoreConfigSnafu {
|
||||
object_store: ObjectStoreType::File,
|
||||
|
|
@ -578,10 +587,11 @@ mod tests {
|
|||
])
|
||||
.unwrap();
|
||||
|
||||
let object_store = make_object_store(&config).unwrap();
|
||||
assert_eq!(
|
||||
object_store.to_string(),
|
||||
format!("LocalFileSystem({})", root_path)
|
||||
let object_store = make_object_store(&config).unwrap().to_string();
|
||||
assert!(
|
||||
object_store.starts_with("LocalFileSystem"),
|
||||
"{}",
|
||||
object_store
|
||||
)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -14,7 +14,7 @@ datafusion = { path = "../datafusion" }
|
|||
futures = "0.3"
|
||||
iox_catalog = { path = "../iox_catalog" }
|
||||
metric = { path = "../metric" }
|
||||
object_store = "0.0.1"
|
||||
object_store = "0.3.0"
|
||||
observability_deps = { path = "../observability_deps" }
|
||||
parquet_file = { path = "../parquet_file" }
|
||||
predicate = { path = "../predicate" }
|
||||
|
|
|
|||
|
|
@ -22,7 +22,7 @@ ioxd_router = { path = "../ioxd_router"}
|
|||
ioxd_querier = { path = "../ioxd_querier"}
|
||||
ioxd_test = { path = "../ioxd_test"}
|
||||
metric = { path = "../metric" }
|
||||
object_store = "0.0.1"
|
||||
object_store = "0.3.0"
|
||||
object_store_metrics = { path = "../object_store_metrics" }
|
||||
observability_deps = { path = "../observability_deps" }
|
||||
panic_logging = { path = "../panic_logging" }
|
||||
|
|
|
|||
|
|
@ -24,7 +24,7 @@ iox_catalog = { path = "../iox_catalog" }
|
|||
metric = { path = "../metric" }
|
||||
mutable_batch = { path = "../mutable_batch"}
|
||||
mutable_batch_lp = { path = "../mutable_batch_lp" }
|
||||
object_store = "0.0.1"
|
||||
object_store = "0.3.0"
|
||||
observability_deps = { path = "../observability_deps" }
|
||||
parking_lot = "0.12"
|
||||
parquet_file = { path = "../parquet_file" }
|
||||
|
|
|
|||
|
|
@ -14,7 +14,7 @@ iox_catalog = { path = "../iox_catalog" }
|
|||
iox_time = { path = "../iox_time" }
|
||||
metric = { path = "../metric" }
|
||||
mutable_batch_lp = { path = "../mutable_batch_lp" }
|
||||
object_store = "0.0.1"
|
||||
object_store = "0.3.0"
|
||||
observability_deps = { path = "../observability_deps" }
|
||||
parquet_file = { path = "../parquet_file" }
|
||||
iox_query = { path = "../iox_query" }
|
||||
|
|
|
|||
|
|
@ -14,7 +14,7 @@ iox_catalog = { path = "../iox_catalog" }
|
|||
ioxd_common = { path = "../ioxd_common" }
|
||||
metric = { path = "../metric" }
|
||||
iox_query = { path = "../iox_query" }
|
||||
object_store = "0.0.1"
|
||||
object_store = "0.3.0"
|
||||
iox_time = { path = "../iox_time" }
|
||||
trace = { path = "../trace" }
|
||||
|
||||
|
|
|
|||
|
|
@ -11,7 +11,7 @@ ingester = { path = "../ingester" }
|
|||
iox_catalog = { path = "../iox_catalog" }
|
||||
ioxd_common = { path = "../ioxd_common" }
|
||||
metric = { path = "../metric" }
|
||||
object_store = "0.0.1"
|
||||
object_store = "0.3.0"
|
||||
iox_query = { path = "../iox_query" }
|
||||
trace = { path = "../trace" }
|
||||
write_buffer = { path = "../write_buffer" }
|
||||
|
|
|
|||
|
|
@ -11,7 +11,7 @@ generated_types = { path = "../generated_types" }
|
|||
iox_catalog = { path = "../iox_catalog" }
|
||||
ioxd_common = { path = "../ioxd_common" }
|
||||
metric = { path = "../metric" }
|
||||
object_store = "0.0.1"
|
||||
object_store = "0.3.0"
|
||||
querier = { path = "../querier" }
|
||||
iox_query = { path = "../iox_query" }
|
||||
router = { path = "../router" }
|
||||
|
|
|
|||
|
|
@ -11,7 +11,7 @@ iox_catalog = { path = "../iox_catalog" }
|
|||
ioxd_common = { path = "../ioxd_common" }
|
||||
metric = { path = "../metric" }
|
||||
mutable_batch = { path = "../mutable_batch" }
|
||||
object_store = "0.0.1"
|
||||
object_store = "0.3.0"
|
||||
observability_deps = { path = "../observability_deps" }
|
||||
router = { path = "../router" }
|
||||
sharder = { path = "../sharder" }
|
||||
|
|
|
|||
|
|
@ -10,7 +10,7 @@ bytes = "1.1"
|
|||
futures = "0.3"
|
||||
iox_time = { version = "0.1.0", path = "../iox_time" }
|
||||
metric = { version = "0.1.0", path = "../metric" }
|
||||
object_store = "0.0.1"
|
||||
object_store = "0.3.0"
|
||||
pin-project = "1.0.10"
|
||||
workspace-hack = { path = "../workspace-hack" }
|
||||
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@
|
|||
use async_trait::async_trait;
|
||||
use bytes::Bytes;
|
||||
use snafu::Snafu;
|
||||
use std::ops::Range;
|
||||
|
||||
use object_store::{path::Path, GetResult, ListResult, ObjectMeta, ObjectStore, Result};
|
||||
|
||||
|
|
@ -59,6 +60,10 @@ impl ObjectStore for DummyObjectStore {
|
|||
Ok(NotSupportedSnafu { name: self.name }.fail()?)
|
||||
}
|
||||
|
||||
async fn get_range(&self, _location: &Path, _range: Range<usize>) -> Result<Bytes> {
|
||||
Ok(NotSupportedSnafu { name: self.name }.fail()?)
|
||||
}
|
||||
|
||||
async fn head(&self, _location: &Path) -> Result<ObjectMeta> {
|
||||
Ok(NotSupportedSnafu { name: self.name }.fail()?)
|
||||
}
|
||||
|
|
@ -67,14 +72,22 @@ impl ObjectStore for DummyObjectStore {
|
|||
Ok(NotSupportedSnafu { name: self.name }.fail()?)
|
||||
}
|
||||
|
||||
async fn list<'a>(
|
||||
&'a self,
|
||||
_prefix: Option<&'a Path>,
|
||||
) -> Result<futures::stream::BoxStream<'a, Result<ObjectMeta>>> {
|
||||
async fn list(
|
||||
&self,
|
||||
_prefix: Option<&Path>,
|
||||
) -> Result<futures::stream::BoxStream<'_, Result<ObjectMeta>>> {
|
||||
Ok(NotSupportedSnafu { name: self.name }.fail()?)
|
||||
}
|
||||
|
||||
async fn list_with_delimiter(&self, _prefix: &Path) -> Result<ListResult> {
|
||||
async fn list_with_delimiter(&self, _prefix: Option<&Path>) -> Result<ListResult> {
|
||||
Ok(NotSupportedSnafu { name: self.name }.fail()?)
|
||||
}
|
||||
|
||||
async fn copy(&self, _from: &Path, _to: &Path) -> Result<()> {
|
||||
Ok(NotSupportedSnafu { name: self.name }.fail()?)
|
||||
}
|
||||
|
||||
async fn copy_if_not_exists(&self, _from: &Path, _to: &Path) -> Result<()> {
|
||||
Ok(NotSupportedSnafu { name: self.name }.fail()?)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,5 +1,6 @@
|
|||
//! A metric instrumentation wrapper over [`ObjectStore`] implementations.
|
||||
|
||||
use std::ops::Range;
|
||||
use std::sync::Arc;
|
||||
use std::{
|
||||
marker::PhantomData,
|
||||
|
|
@ -170,7 +171,7 @@ impl ObjectStore for ObjectStoreMetrics {
|
|||
match res {
|
||||
Ok(GetResult::File(file, path)) => {
|
||||
// Record the file size in bytes and time the inner call took.
|
||||
if let Ok(m) = file.metadata().await {
|
||||
if let Ok(m) = file.metadata() {
|
||||
self.get_bytes.inc(m.len());
|
||||
if let Some(d) = self.time_provider.now().checked_duration_since(started_at) {
|
||||
self.get_success_duration.record(d)
|
||||
|
|
@ -202,6 +203,11 @@ impl ObjectStore for ObjectStoreMetrics {
|
|||
}
|
||||
}
|
||||
|
||||
async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
|
||||
// TODO: Add instrumentation of get_range requests
|
||||
self.inner.get_range(location, range).await
|
||||
}
|
||||
|
||||
async fn head(&self, location: &Path) -> Result<ObjectMeta> {
|
||||
// TODO: Add instrumentation of head requests
|
||||
self.inner.head(location).await
|
||||
|
|
@ -224,10 +230,7 @@ impl ObjectStore for ObjectStoreMetrics {
|
|||
res
|
||||
}
|
||||
|
||||
async fn list<'a>(
|
||||
&'a self,
|
||||
prefix: Option<&'a Path>,
|
||||
) -> Result<BoxStream<'a, Result<ObjectMeta>>> {
|
||||
async fn list(&self, prefix: Option<&Path>) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
|
||||
let started_at = self.time_provider.now();
|
||||
|
||||
let res = self.inner.list(prefix).await;
|
||||
|
|
@ -257,7 +260,7 @@ impl ObjectStore for ObjectStoreMetrics {
|
|||
}
|
||||
}
|
||||
|
||||
async fn list_with_delimiter(&self, prefix: &Path) -> Result<ListResult> {
|
||||
async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
|
||||
let t = self.time_provider.now();
|
||||
|
||||
let res = self.inner.list_with_delimiter(prefix).await;
|
||||
|
|
@ -273,6 +276,16 @@ impl ObjectStore for ObjectStoreMetrics {
|
|||
|
||||
res
|
||||
}
|
||||
|
||||
async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
|
||||
// TODO: Instrument me
|
||||
self.inner.copy(from, to).await
|
||||
}
|
||||
|
||||
async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
|
||||
// TODO: Instrument me
|
||||
self.inner.copy_if_not_exists(from, to).await
|
||||
}
|
||||
}
|
||||
|
||||
/// A [`MetricDelegate`] is called whenever the [`StreamMetricRecorder`]
|
||||
|
|
@ -497,7 +510,7 @@ mod tests {
|
|||
|
||||
use futures::stream;
|
||||
use metric::Attributes;
|
||||
use tokio::io::AsyncReadExt;
|
||||
use std::io::Read;
|
||||
|
||||
use dummy::DummyObjectStore;
|
||||
use object_store::{local::LocalFileSystem, memory::InMemory};
|
||||
|
|
@ -544,7 +557,7 @@ mod tests {
|
|||
|
||||
store
|
||||
.put(
|
||||
&Path::from_raw("test"),
|
||||
&Path::from("test"),
|
||||
Bytes::from([42_u8, 42, 42, 42, 42].as_slice()),
|
||||
)
|
||||
.await
|
||||
|
|
@ -567,7 +580,7 @@ mod tests {
|
|||
|
||||
store
|
||||
.put(
|
||||
&Path::from_raw("test"),
|
||||
&Path::from("test"),
|
||||
Bytes::from([42_u8, 42, 42, 42, 42].as_slice()),
|
||||
)
|
||||
.await
|
||||
|
|
@ -621,7 +634,7 @@ mod tests {
|
|||
let store = ObjectStoreMetrics::new(store, time, &metrics);
|
||||
|
||||
store
|
||||
.list_with_delimiter(&Path::from_raw("test"))
|
||||
.list_with_delimiter(Some(&Path::from("test")))
|
||||
.await
|
||||
.expect("list should succeed");
|
||||
|
||||
|
|
@ -641,7 +654,7 @@ mod tests {
|
|||
|
||||
assert!(
|
||||
store
|
||||
.list_with_delimiter(&Path::from_raw("test"))
|
||||
.list_with_delimiter(Some(&Path::from("test")))
|
||||
.await
|
||||
.is_err(),
|
||||
"mock configured to fail"
|
||||
|
|
@ -662,7 +675,7 @@ mod tests {
|
|||
let store = ObjectStoreMetrics::new(store, time, &metrics);
|
||||
|
||||
store
|
||||
.get(&Path::from_raw("test"))
|
||||
.get(&Path::from("test"))
|
||||
.await
|
||||
.expect_err("mock configured to fail");
|
||||
|
||||
|
|
@ -676,12 +689,12 @@ mod tests {
|
|||
#[tokio::test]
|
||||
async fn test_put_get_delete_file() {
|
||||
let metrics = Arc::new(metric::Registry::default());
|
||||
let store = Arc::new(LocalFileSystem::new("./"));
|
||||
let store = Arc::new(LocalFileSystem::new_with_prefix("./").unwrap());
|
||||
let time = Arc::new(SystemProvider::new());
|
||||
let store = ObjectStoreMetrics::new(store, time, &metrics);
|
||||
|
||||
let data = [42_u8, 42, 42, 42, 42];
|
||||
let path = Path::from_raw("test");
|
||||
let path = Path::from("test");
|
||||
store
|
||||
.put(&path, Bytes::copy_from_slice(&data))
|
||||
.await
|
||||
|
|
@ -692,7 +705,6 @@ mod tests {
|
|||
GetResult::File(mut file, _) => {
|
||||
let mut contents = vec![];
|
||||
file.read_to_end(&mut contents)
|
||||
.await
|
||||
.expect("failed to read file data");
|
||||
assert_eq!(contents, &data);
|
||||
}
|
||||
|
|
@ -726,7 +738,7 @@ mod tests {
|
|||
let store = ObjectStoreMetrics::new(store, time, &metrics);
|
||||
|
||||
let data = [42_u8, 42, 42, 42, 42];
|
||||
let path = Path::from_raw("test");
|
||||
let path = Path::from("test");
|
||||
store
|
||||
.put(&path, Bytes::copy_from_slice(&data))
|
||||
.await
|
||||
|
|
|
|||
|
|
@ -14,7 +14,7 @@ datafusion_util = { path = "../datafusion_util" }
|
|||
futures = "0.3"
|
||||
generated_types = { path = "../generated_types" }
|
||||
iox_time = { path = "../iox_time" }
|
||||
object_store = "0.0.1"
|
||||
object_store = "0.3.0"
|
||||
observability_deps = { path = "../observability_deps" }
|
||||
parking_lot = "0.12"
|
||||
parquet = {version = "16.0.0", features = ["experimental"]}
|
||||
|
|
|
|||
|
|
@ -269,8 +269,9 @@ async fn download_and_scan_parquet(
|
|||
let read_stream = object_store.get(&path).await?;
|
||||
|
||||
let data = match read_stream {
|
||||
GetResult::File(mut f, _) => {
|
||||
GetResult::File(f, _) => {
|
||||
trace!(?path, "Using file directly");
|
||||
let mut f = tokio::fs::File::from_std(f);
|
||||
let l = f.metadata().await?.len();
|
||||
let mut buf = Vec::with_capacity(l as usize);
|
||||
f.read_to_end(&mut buf).await?;
|
||||
|
|
|
|||
|
|
@ -17,7 +17,7 @@ generated_types = { path = "../generated_types" }
|
|||
influxdb_iox_client = { path = "../influxdb_iox_client" }
|
||||
iox_catalog = { path = "../iox_catalog" }
|
||||
metric = { path = "../metric" }
|
||||
object_store = "0.0.1"
|
||||
object_store = "0.3.0"
|
||||
observability_deps = { path = "../observability_deps" }
|
||||
parking_lot = "0.12"
|
||||
parquet_file = { path = "../parquet_file" }
|
||||
|
|
|
|||
|
|
@ -20,7 +20,7 @@ metric = { path = "../metric" }
|
|||
mutable_batch = { path = "../mutable_batch" }
|
||||
mutable_batch_lp = { path = "../mutable_batch_lp" }
|
||||
mutable_batch_pb = { version = "0.1.0", path = "../mutable_batch_pb" }
|
||||
object_store = "0.0.1"
|
||||
object_store = "0.3.0"
|
||||
observability_deps = { path = "../observability_deps" }
|
||||
parking_lot = "0.12"
|
||||
predicate = { path = "../predicate" }
|
||||
|
|
|
|||
|
|
@ -8,7 +8,7 @@ data_types = { path = "../data_types" }
|
|||
futures = "0.3"
|
||||
generated_types = { path = "../generated_types" }
|
||||
iox_catalog = { path = "../iox_catalog" }
|
||||
object_store = "0.0.1"
|
||||
object_store = "0.3.0"
|
||||
observability_deps = { path = "../observability_deps" }
|
||||
parquet_file = { path = "../parquet_file" }
|
||||
tokio = { version = "1", features = ["rt-multi-thread", "macros"] }
|
||||
|
|
|
|||
|
|
@ -40,7 +40,7 @@ nom = { version = "7", features = ["alloc", "std"] }
|
|||
num-bigint = { version = "0.4", features = ["std"] }
|
||||
num-integer = { version = "0.1", default-features = false, features = ["i128", "std"] }
|
||||
num-traits = { version = "0.2", features = ["i128", "libm", "std"] }
|
||||
object_store = { version = "0.0.1", default-features = false, features = ["aws", "azure", "azure_core", "azure_storage", "azure_storage_blobs", "cloud-storage", "gcp", "hyper", "hyper-rustls", "reqwest", "rusoto_core", "rusoto_credential", "rusoto_s3"] }
|
||||
object_store = { version = "0.3", default-features = false, features = ["aws", "azure", "azure_core", "azure_storage", "azure_storage_blobs", "cloud-storage", "gcp", "hyper", "hyper-rustls", "reqwest", "rusoto_core", "rusoto_credential", "rusoto_s3"] }
|
||||
once_cell = { version = "1", features = ["alloc", "parking_lot", "parking_lot_core", "race", "std"] }
|
||||
parquet = { version = "16", features = ["arrow", "base64", "brotli", "experimental", "flate2", "lz4", "snap", "zstd"] }
|
||||
predicates = { version = "2", features = ["diff", "difflib", "float-cmp", "normalize-line-endings", "regex"] }
|
||||
|
|
@ -59,7 +59,7 @@ sqlx = { version = "0.6", features = ["_rt-tokio", "json", "macros", "migrate",
|
|||
sqlx-core = { version = "0.6", default-features = false, features = ["_rt-tokio", "_tls-rustls", "any", "base64", "crc", "dirs", "hkdf", "hmac", "json", "md-5", "migrate", "postgres", "rand", "runtime-tokio-rustls", "rustls", "rustls-pemfile", "serde", "serde_json", "sha-1", "sha2", "tokio-stream", "uuid", "webpki-roots", "whoami"] }
|
||||
tokio = { version = "1", features = ["bytes", "fs", "io-std", "io-util", "libc", "macros", "memchr", "mio", "net", "num_cpus", "once_cell", "parking_lot", "rt", "rt-multi-thread", "signal", "signal-hook-registry", "socket2", "sync", "time", "tokio-macros", "tracing"] }
|
||||
tokio-stream = { version = "0.1", features = ["fs", "net", "time"] }
|
||||
tokio-util = { version = "0.7", features = ["codec", "io", "tracing"] }
|
||||
tokio-util = { version = "0.7", features = ["codec", "tracing"] }
|
||||
tower = { version = "0.4", features = ["__common", "balance", "buffer", "discover", "futures-core", "futures-util", "indexmap", "limit", "load", "log", "make", "pin-project", "pin-project-lite", "rand", "ready-cache", "slab", "timeout", "tokio", "tokio-util", "tracing", "util"] }
|
||||
tower-http = { version = "0.3", features = ["catch-panic", "map-response-body", "tower", "tracing", "util"] }
|
||||
tracing = { version = "0.1", features = ["attributes", "log", "max_level_trace", "release_max_level_trace", "std", "tracing-attributes"] }
|
||||
|
|
|
|||
Loading…
Reference in New Issue