fix: ensure "local" ObjectStore.list returns ordered elements (#26081)

cli/allow-equal-sign-in-trigger-arguments
wayne 2025-02-28 10:46:44 -07:00 committed by GitHub
parent 208b82a32d
commit a84c4a9a8b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 197 additions and 7 deletions

1
Cargo.lock generated
View File

@ -2885,6 +2885,7 @@ name = "influxdb3_clap_blocks"
version = "0.1.0"
dependencies = [
"async-trait",
"bytes",
"clap",
"datafusion",
"futures",

View File

@ -32,6 +32,8 @@ tokio.workspace = true
trace_exporters.workspace = true
trogging.workspace = true
url.workspace = true
bytes.workspace = true
futures.workspace = true
[dev-dependencies]
tempfile.workspace = true

View File

@ -1,9 +1,12 @@
//! CLI handling for object store config (via CLI arguments and environment variables).
use async_trait::async_trait;
use bytes::Bytes;
use futures::{StreamExt, stream::BoxStream};
use non_empty_string::NonEmptyString;
use object_store::{
DynObjectStore,
DynObjectStore, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore,
PutMultipartOpts, PutOptions, PutPayload, PutResult,
local::LocalFileSystem,
memory::InMemory,
path::Path,
@ -11,7 +14,10 @@ use object_store::{
};
use observability_deps::tracing::{info, warn};
use snafu::{ResultExt, Snafu};
use std::{convert::Infallible, fs, num::NonZeroUsize, path::PathBuf, sync::Arc, time::Duration};
use std::{
cmp::Ordering, convert::Infallible, fs, num::NonZeroUsize, ops::Range, path::PathBuf,
sync::Arc, time::Duration,
};
use url::Url;
#[derive(Debug, Snafu)]
@ -97,6 +103,188 @@ impl std::str::FromStr for Endpoint {
}
}
#[derive(Debug)]
struct LocalFileSystemWithSortedListOp {
inner: Arc<LocalFileSystem>,
}
impl std::fmt::Display for LocalFileSystemWithSortedListOp {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.inner.fmt(f)
}
}
impl LocalFileSystemWithSortedListOp {
fn new_with_prefix(prefix: impl AsRef<std::path::Path>) -> Result<Self, ParseError> {
Ok(Self {
inner: Arc::new(LocalFileSystem::new_with_prefix(prefix.as_ref()).context(
CreateLocalFileSystemSnafu {
path: prefix.as_ref().to_path_buf(),
},
)?),
})
}
}
#[async_trait]
impl ObjectStore for LocalFileSystemWithSortedListOp {
async fn put(&self, location: &Path, bytes: PutPayload) -> object_store::Result<PutResult> {
self.inner.put(location, bytes).await
}
async fn put_opts(
&self,
location: &Path,
bytes: PutPayload,
opts: PutOptions,
) -> object_store::Result<PutResult> {
self.inner.put_opts(location, bytes, opts).await
}
async fn put_multipart(
&self,
location: &Path,
) -> object_store::Result<Box<dyn MultipartUpload>> {
self.inner.put_multipart(location).await
}
async fn put_multipart_opts(
&self,
location: &Path,
opts: PutMultipartOpts,
) -> object_store::Result<Box<dyn MultipartUpload>> {
self.inner.put_multipart_opts(location, opts).await
}
async fn get(&self, location: &Path) -> object_store::Result<GetResult> {
self.inner.get(location).await
}
async fn get_opts(
&self,
location: &Path,
options: GetOptions,
) -> object_store::Result<GetResult> {
self.inner.get_opts(location, options).await
}
async fn get_range(&self, location: &Path, range: Range<usize>) -> object_store::Result<Bytes> {
self.inner.get_range(location, range).await
}
async fn get_ranges(
&self,
location: &Path,
ranges: &[Range<usize>],
) -> object_store::Result<Vec<Bytes>> {
self.inner.get_ranges(location, ranges).await
}
async fn head(&self, location: &Path) -> object_store::Result<ObjectMeta> {
self.inner.head(location).await
}
async fn delete(&self, location: &Path) -> object_store::Result<()> {
self.inner.delete(location).await
}
fn delete_stream<'a>(
&'a self,
locations: BoxStream<'a, object_store::Result<Path>>,
) -> BoxStream<'a, object_store::Result<Path>> {
self.inner.delete_stream(locations)
}
/// Collect results from inner object store into a vec, sort them, then return a new boxed
/// stream that iterates over the new vec.
fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, object_store::Result<ObjectMeta>> {
if tokio::runtime::Handle::try_current().is_err() {
// We should never reach this branch, but if we do then warn and return let the
// inner implementation deal with it.
warn!("no tokio runtime started, cannot sort object store list output");
return self.inner.list(prefix);
}
let mut items: Vec<Result<ObjectMeta, _>> = futures::executor::block_on(async {
// we could use TryStreamExt.collect() here to drop all collected results and
// return the first error we encounter, but users of the ObjectStore API will
// probably expect to have to deal with errors one element at a time anyway
self.inner.list(prefix).collect().await
});
items.sort_unstable_by(|left, right| match (left, right) {
(Ok(left_meta), Ok(right_meta)) => left_meta.location.cmp(&right_meta.location),
// basically just move all the Err(E) instances to the end of the results.
(Err(_), Ok(_)) => Ordering::Less,
(Ok(_), Err(_)) => Ordering::Greater,
(Err(_), Err(_)) => Ordering::Equal,
});
futures::stream::iter(items).boxed()
}
/// Collect results from inner object store into a vec, sort them, then return a new boxed
/// stream that iterates over the new vec.
fn list_with_offset(
&self,
prefix: Option<&Path>,
offset: &Path,
) -> BoxStream<'_, object_store::Result<ObjectMeta>> {
if tokio::runtime::Handle::try_current().is_err() {
// We should never reach this branch, but if we do then warn and return let the
// inner implementation deal with it.
warn!("no tokio runtime started, cannot sort object store list output");
return self.inner.list_with_offset(prefix, offset);
}
let mut items: Vec<Result<ObjectMeta, _>> = futures::executor::block_on(async {
// we could use TryStreamExt.collect() here to drop all collected results and
// return the first error we encounter, but users of the ObjectStore API will
// probably expect to have to deal with errors one element at a time anyway
self.inner.list_with_offset(prefix, offset).collect().await
});
items.sort_unstable_by(|left, right| match (left, right) {
(Ok(left_meta), Ok(right_meta)) => left_meta.location.cmp(&right_meta.location),
// basically just move all the Err(E) instances to the end of the results.
(Err(_), Ok(_)) => Ordering::Less,
(Ok(_), Err(_)) => Ordering::Greater,
(Err(_), Err(_)) => Ordering::Equal,
});
futures::stream::iter(items).boxed()
}
/// Collect results from inner object store into a vec, sort them, then return a new boxed
/// stream that iterates over the new vec.
async fn list_with_delimiter(&self, prefix: Option<&Path>) -> object_store::Result<ListResult> {
let mut items: ListResult = self.inner.list_with_delimiter(prefix).await?;
items
.objects
.sort_unstable_by(|left, right| left.location.cmp(&right.location));
items.common_prefixes.sort();
Ok(items)
}
async fn copy(&self, from: &Path, to: &Path) -> object_store::Result<()> {
self.inner.copy(from, to).await
}
async fn rename(&self, from: &Path, to: &Path) -> object_store::Result<()> {
self.inner.rename(from, to).await
}
async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> {
self.inner.copy_if_not_exists(from, to).await
}
async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> {
self.inner.rename_if_not_exists(from, to).await
}
}
/// Creation of an `ObjectStoreConfig` struct for `clap` argument handling.
///
/// This allows for multiple object store configurations to be produced when
@ -702,14 +890,13 @@ macro_rules! object_store_config_inner {
Ok(remote_store)
}
fn new_local_file_system(&self) -> Result<Arc<LocalFileSystem>, ParseError> {
fn new_local_file_system(&self) -> Result<Arc<LocalFileSystemWithSortedListOp>, ParseError> {
match self.database_directory.as_ref() {
Some(db_dir) => {
info!(?db_dir, object_store_type = "Directory", "Object Store");
fs::create_dir_all(db_dir).context(CreatingDatabaseDirectorySnafu { path: db_dir })?;
let store = LocalFileSystem::new_with_prefix(db_dir)
.context(CreateLocalFileSystemSnafu { path: db_dir })?;
let store = LocalFileSystemWithSortedListOp::new_with_prefix(db_dir)?;
Ok(Arc::new(store))
}
None => MissingObjectStoreConfigSnafu {
@ -799,7 +986,7 @@ pub fn make_presigned_url_signer(
/// Again, will not work and not intended to work in production, but is useful in local testing.
#[derive(Debug)]
pub struct LocalUploadSigner {
inner: Arc<LocalFileSystem>,
inner: Arc<LocalFileSystemWithSortedListOp>,
}
impl LocalUploadSigner {
@ -818,7 +1005,7 @@ impl object_store::signer::Signer for LocalUploadSigner {
path: &Path,
_expires_in: Duration,
) -> Result<Url, object_store::Error> {
self.inner.path_to_filesystem(path).and_then(|path| {
self.inner.inner.path_to_filesystem(path).and_then(|path| {
Url::from_file_path(&path).map_err(|_| object_store::Error::InvalidPath {
source: object_store::path::Error::InvalidPath { path },
})