Merge pull request #629 from influxdata/cn/better-osp-api
commit
0c0fa42050
|
@ -200,9 +200,8 @@ impl AmazonS3 {
|
|||
prefix: &'a ObjectStorePath,
|
||||
next_token: &Option<String>,
|
||||
) -> Result<ListResult> {
|
||||
dbg!(&prefix);
|
||||
let converted_prefix = CloudConverter::convert(prefix);
|
||||
dbg!(&converted_prefix);
|
||||
|
||||
let mut list_request = rusoto_s3::ListObjectsV2Request {
|
||||
bucket: self.bucket_name.clone(),
|
||||
prefix: Some(converted_prefix),
|
||||
|
@ -225,7 +224,7 @@ impl AmazonS3 {
|
|||
};
|
||||
|
||||
let contents = resp.contents.unwrap_or_default();
|
||||
dbg!(&contents);
|
||||
|
||||
let objects: Vec<_> = contents
|
||||
.into_iter()
|
||||
.map(|object| {
|
||||
|
@ -256,8 +255,6 @@ impl AmazonS3 {
|
|||
})
|
||||
.collect();
|
||||
|
||||
dbg!(&resp.common_prefixes);
|
||||
|
||||
let common_prefixes = resp
|
||||
.common_prefixes
|
||||
.unwrap_or_default()
|
||||
|
|
|
@ -124,7 +124,7 @@ impl File {
|
|||
async move { Ok(ObjectStorePath::from_path_buf_unchecked(file_path_buf)) }
|
||||
})
|
||||
.try_filter(move |name| {
|
||||
let matches = prefix.map_or(true, |p| name.starts_with(p));
|
||||
let matches = prefix.map_or(true, |p| name.prefix_matches(p));
|
||||
async move { matches }
|
||||
})
|
||||
.map_ok(|name| vec![name]);
|
||||
|
@ -181,7 +181,7 @@ mod tests {
|
|||
|
||||
let data = Bytes::from("arbitrary data");
|
||||
let mut location = ObjectStorePath::default();
|
||||
location.push_all(&["nested", "file", "test_file"]);
|
||||
location.push_all_dirs(&["nested", "file", "test_file"]);
|
||||
|
||||
let stream_data = std::io::Result::Ok(data.clone());
|
||||
storage
|
||||
|
|
|
@ -377,7 +377,8 @@ mod tests {
|
|||
|
||||
let data = Bytes::from("arbitrary data");
|
||||
let mut location = ObjectStorePath::default();
|
||||
location.push("test_file");
|
||||
location.push_dir("test_dir");
|
||||
location.set_file_name("test_file.json");
|
||||
|
||||
let stream_data = std::io::Result::Ok(data.clone());
|
||||
storage
|
||||
|
@ -394,13 +395,13 @@ mod tests {
|
|||
|
||||
// List everything starting with a prefix that should return results
|
||||
let mut prefix = ObjectStorePath::default();
|
||||
prefix.push("test");
|
||||
prefix.push_dir("test_dir");
|
||||
let content_list = flatten_list_stream(storage, Some(&prefix)).await?;
|
||||
assert_eq!(content_list, &[location.clone()]);
|
||||
|
||||
// List everything starting with a prefix that shouldn't return results
|
||||
let mut prefix = ObjectStorePath::default();
|
||||
prefix.push("something");
|
||||
prefix.push_dir("something");
|
||||
let content_list = flatten_list_stream(storage, Some(&prefix)).await?;
|
||||
assert!(content_list.is_empty());
|
||||
|
||||
|
@ -431,6 +432,7 @@ mod tests {
|
|||
let files: Vec<_> = [
|
||||
"mydb/wal/000/000/000.segment",
|
||||
"mydb/wal/000/000/001.segment",
|
||||
"mydb/wal/000/000/002.segment",
|
||||
"mydb/wal/001/001/000.segment",
|
||||
"mydb/wal/foo.test",
|
||||
"mydb/data/whatevs",
|
||||
|
@ -454,17 +456,15 @@ mod tests {
|
|||
}
|
||||
|
||||
let mut prefix = ObjectStorePath::default();
|
||||
prefix.push_all(&["mydb", "wal"]);
|
||||
prefix.push_all_dirs(&["mydb", "wal"]);
|
||||
|
||||
let mut expected_000 = prefix.clone();
|
||||
expected_000.push("000");
|
||||
expected_000.push_dir("000");
|
||||
let mut expected_001 = prefix.clone();
|
||||
expected_001.push("001");
|
||||
expected_001.push_dir("001");
|
||||
let mut expected_location = prefix.clone();
|
||||
expected_location.push("foo.test");
|
||||
expected_location.set_file_name("foo.test");
|
||||
|
||||
// This is needed because we want a trailing slash on the prefix in this test
|
||||
prefix.push("");
|
||||
let result = storage.list_with_delimiter(&prefix).await.unwrap();
|
||||
|
||||
assert_eq!(result.common_prefixes, vec![expected_000, expected_001]);
|
||||
|
@ -476,6 +476,23 @@ mod tests {
|
|||
assert_eq!(object.size, data.len());
|
||||
assert!(object.last_modified > time_before_creation);
|
||||
|
||||
// List with a prefix containing a partial "file name"
|
||||
let mut prefix = ObjectStorePath::default();
|
||||
prefix.push_all_dirs(&["mydb", "wal", "000", "000"]);
|
||||
prefix.set_file_name("001");
|
||||
|
||||
let mut expected_location = ObjectStorePath::default();
|
||||
expected_location.push_all_dirs(&["mydb", "wal", "000", "000"]);
|
||||
expected_location.set_file_name("001.segment");
|
||||
|
||||
let result = storage.list_with_delimiter(&prefix).await.unwrap();
|
||||
assert!(result.common_prefixes.is_empty());
|
||||
assert_eq!(result.objects.len(), 1);
|
||||
|
||||
let object = &result.objects[0];
|
||||
|
||||
assert_eq!(object.location, expected_location);
|
||||
|
||||
for f in &files {
|
||||
storage.delete(f).await.unwrap();
|
||||
}
|
||||
|
@ -510,6 +527,7 @@ mod tests {
|
|||
"test_file",
|
||||
"mydb/wal/000/000/000.segment",
|
||||
"mydb/wal/000/000/001.segment",
|
||||
"mydb/wal/000/000/002.segment",
|
||||
"mydb/wal/001/001/000.segment",
|
||||
"mydb/wal/foo.test",
|
||||
"mydb/data/whatevs",
|
||||
|
|
|
@ -1,7 +1,8 @@
|
|||
//! This module contains the IOx implementation for using memory as the object
|
||||
//! store.
|
||||
use crate::{
|
||||
path::ObjectStorePath, DataDoesNotMatchLength, ListResult, NoDataInMemory, ObjectMeta, Result,
|
||||
path::{DirsAndFileName, ObjectStorePath},
|
||||
DataDoesNotMatchLength, ListResult, NoDataInMemory, ObjectMeta, Result,
|
||||
UnableToPutDataInMemory,
|
||||
};
|
||||
use bytes::Bytes;
|
||||
|
@ -16,7 +17,7 @@ use tokio::sync::RwLock;
|
|||
/// storage provider.
|
||||
#[derive(Debug, Default)]
|
||||
pub struct InMemory {
|
||||
storage: RwLock<BTreeMap<ObjectStorePath, Bytes>>,
|
||||
storage: RwLock<BTreeMap<DirsAndFileName, Bytes>>,
|
||||
}
|
||||
|
||||
impl InMemory {
|
||||
|
@ -56,7 +57,7 @@ impl InMemory {
|
|||
|
||||
let content = content.freeze();
|
||||
|
||||
self.storage.write().await.insert(location.clone(), content);
|
||||
self.storage.write().await.insert(location.into(), content);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -65,11 +66,12 @@ impl InMemory {
|
|||
&self,
|
||||
location: &ObjectStorePath,
|
||||
) -> Result<impl Stream<Item = Result<Bytes>>> {
|
||||
let location = location.into();
|
||||
let data = self
|
||||
.storage
|
||||
.read()
|
||||
.await
|
||||
.get(location)
|
||||
.get(&location)
|
||||
.cloned()
|
||||
.context(NoDataInMemory)?;
|
||||
|
||||
|
@ -78,7 +80,7 @@ impl InMemory {
|
|||
|
||||
/// Delete the object at the specified location.
|
||||
pub async fn delete(&self, location: &ObjectStorePath) -> Result<()> {
|
||||
self.storage.write().await.remove(location);
|
||||
self.storage.write().await.remove(&location.into());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -87,16 +89,18 @@ impl InMemory {
|
|||
&'a self,
|
||||
prefix: Option<&'a ObjectStorePath>,
|
||||
) -> Result<impl Stream<Item = Result<Vec<ObjectStorePath>>> + 'a> {
|
||||
let list = if let Some(prefix) = prefix {
|
||||
let prefix = prefix.map(Into::into);
|
||||
|
||||
let list = if let Some(prefix) = &prefix {
|
||||
self.storage
|
||||
.read()
|
||||
.await
|
||||
.keys()
|
||||
.filter(|k| k.starts_with(prefix))
|
||||
.cloned()
|
||||
.filter(|k| k.prefix_matches(prefix))
|
||||
.map(Into::into)
|
||||
.collect()
|
||||
} else {
|
||||
self.storage.read().await.keys().cloned().collect()
|
||||
self.storage.read().await.keys().map(Into::into).collect()
|
||||
};
|
||||
|
||||
Ok(futures::stream::once(async move { Ok(list) }))
|
||||
|
@ -115,25 +119,29 @@ impl InMemory {
|
|||
let mut common_prefixes = BTreeSet::new();
|
||||
let last_modified = Utc::now();
|
||||
|
||||
// set the end prefix so we pull back everything that starts with
|
||||
// the passed in prefix
|
||||
let mut end_prefix = prefix.clone();
|
||||
end_prefix.pop();
|
||||
end_prefix.push("0");
|
||||
let prefix: DirsAndFileName = prefix.into();
|
||||
|
||||
// Only objects in this base level should be returned in the
|
||||
// response. Otherwise, we just collect the common prefixes.
|
||||
let mut objects = vec![];
|
||||
for (k, v) in self.storage.read().await.range(prefix.clone()..end_prefix) {
|
||||
let parts = k.parts_after_prefix(&prefix);
|
||||
for (k, v) in self
|
||||
.storage
|
||||
.read()
|
||||
.await
|
||||
.range((&prefix)..)
|
||||
.take_while(|(k, _)| k.prefix_matches(&prefix))
|
||||
{
|
||||
let parts = k
|
||||
.parts_after_prefix(&prefix)
|
||||
.expect("must have prefix if in range");
|
||||
|
||||
if parts.len() >= 2 {
|
||||
let mut full_prefix = prefix.clone();
|
||||
full_prefix.push_part(&parts[0]);
|
||||
full_prefix.push_part_as_dir(&parts[0]);
|
||||
common_prefixes.insert(full_prefix);
|
||||
} else {
|
||||
let object = ObjectMeta {
|
||||
location: k.clone(),
|
||||
location: k.into(),
|
||||
last_modified,
|
||||
size: v.len(),
|
||||
};
|
||||
|
@ -143,7 +151,7 @@ impl InMemory {
|
|||
|
||||
Ok(ListResult {
|
||||
objects,
|
||||
common_prefixes: common_prefixes.into_iter().collect(),
|
||||
common_prefixes: common_prefixes.into_iter().map(Into::into).collect(),
|
||||
next_token: None,
|
||||
})
|
||||
}
|
||||
|
@ -163,7 +171,6 @@ mod tests {
|
|||
use futures::stream;
|
||||
|
||||
#[tokio::test]
|
||||
#[ignore]
|
||||
async fn in_memory_test() -> Result<()> {
|
||||
let integration = ObjectStore::new_in_memory(InMemory::new());
|
||||
|
||||
|
|
|
@ -3,7 +3,7 @@
|
|||
|
||||
use itertools::Itertools;
|
||||
use percent_encoding::{percent_decode_str, percent_encode, AsciiSet, CONTROLS};
|
||||
use std::path::PathBuf;
|
||||
use std::{mem, path::PathBuf};
|
||||
|
||||
/// Universal interface for handling paths and locations for objects and
|
||||
/// directories in the object store.
|
||||
|
@ -13,69 +13,62 @@ use std::path::PathBuf;
|
|||
///
|
||||
/// Deliberately does not implement `Display` or `ToString`! Use one of the
|
||||
/// converters.
|
||||
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Default)]
|
||||
#[derive(Default, Clone, PartialEq, Eq, Debug)]
|
||||
pub struct ObjectStorePath {
|
||||
parts: Vec<PathPart>,
|
||||
inner: PathRepresentation,
|
||||
}
|
||||
|
||||
impl ObjectStorePath {
|
||||
/// For use when receiving a path from an object store API directly, not
|
||||
/// when building a path. Assumes DELIMITER is the separator.
|
||||
///
|
||||
/// TODO: Improve performance by implementing a CoW-type model to delay
|
||||
/// parsing until needed TODO: This should only be available to cloud
|
||||
/// storage
|
||||
/// TODO: This should only be available to cloud storage
|
||||
pub fn from_cloud_unchecked(path: impl Into<String>) -> Self {
|
||||
let path = path.into();
|
||||
Self {
|
||||
parts: path
|
||||
.split_terminator(DELIMITER)
|
||||
.map(|s| PathPart(s.to_string()))
|
||||
.collect(),
|
||||
inner: PathRepresentation::RawCloud(path),
|
||||
}
|
||||
}
|
||||
|
||||
/// For use when receiving a path from a filesystem directly, not
|
||||
/// when building a path. Uses the standard library's path splitting
|
||||
/// implementation to separate into parts.
|
||||
///
|
||||
/// TODO: This should only be available to file storage
|
||||
pub fn from_path_buf_unchecked(path: impl Into<PathBuf>) -> Self {
|
||||
let path = path.into();
|
||||
Self {
|
||||
parts: path
|
||||
.iter()
|
||||
.flat_map(|s| s.to_os_string().into_string().map(PathPart))
|
||||
.collect(),
|
||||
inner: PathRepresentation::RawPathBuf(path),
|
||||
}
|
||||
}
|
||||
|
||||
/// Add a part to the end of the path, encoding any restricted characters.
|
||||
pub fn push(&mut self, part: impl Into<String>) {
|
||||
let part = part.into();
|
||||
self.parts.push((&*part).into());
|
||||
pub fn push_dir(&mut self, part: impl Into<String>) {
|
||||
self.inner = mem::take(&mut self.inner).push_dir(part);
|
||||
}
|
||||
|
||||
/// Add a `PathPart` to the end of the path. Infallible because the
|
||||
/// `PathPart` should already have been checked for restricted
|
||||
/// characters.
|
||||
pub fn push_part(&mut self, part: &PathPart) {
|
||||
self.parts.push(part.to_owned());
|
||||
/// Add a `PathPart` to the end of the path.
|
||||
pub fn push_part_as_dir(&mut self, part: &PathPart) {
|
||||
self.inner = mem::take(&mut self.inner).push_part_as_dir(part);
|
||||
}
|
||||
|
||||
/// Set the file name of this path
|
||||
pub fn set_file_name(&mut self, part: impl Into<String>) {
|
||||
self.inner = mem::take(&mut self.inner).set_file_name(part);
|
||||
}
|
||||
|
||||
/// Add the parts of `ObjectStorePath` to the end of the path. Notably does
|
||||
/// *not* behave as `PathBuf::push` does: no existing part of `self`
|
||||
/// will be replaced as part of this call.
|
||||
/// *not* behave as `PathBuf::push` does: there is no way to replace the
|
||||
/// root. If `self` has a file name, that will be removed, then the
|
||||
/// directories of `path` will be appended, then any file name of `path`
|
||||
/// will be assigned to `self`.
|
||||
pub fn push_path(&mut self, path: &Self) {
|
||||
self.parts.extend_from_slice(&path.parts);
|
||||
self.inner = mem::take(&mut self.inner).push_path(path)
|
||||
}
|
||||
|
||||
/// Push a bunch of parts in one go.
|
||||
pub fn push_all<'a>(&mut self, parts: impl AsRef<[&'a str]>) {
|
||||
self.parts.extend(parts.as_ref().iter().map(|&v| v.into()));
|
||||
}
|
||||
|
||||
/// Return the component parts of the path.
|
||||
pub fn as_parts(&self) -> &[PathPart] {
|
||||
self.parts.as_ref()
|
||||
/// Push a bunch of parts as directories in one go.
|
||||
pub fn push_all_dirs<'a>(&mut self, parts: impl AsRef<[&'a str]>) {
|
||||
self.inner = mem::take(&mut self.inner).push_all_dirs(parts);
|
||||
}
|
||||
|
||||
/// Pops a part from the path and returns it, or `None` if it's empty.
|
||||
|
@ -83,16 +76,170 @@ impl ObjectStorePath {
|
|||
unimplemented!()
|
||||
}
|
||||
|
||||
/// Determines whether `prefix` is a prefix of `self`.
|
||||
pub fn starts_with(&self, prefix: &Self) -> bool {
|
||||
let diff = itertools::diff_with(self.parts.iter(), prefix.parts.iter(), |a, b| a == b);
|
||||
match diff {
|
||||
None => true,
|
||||
Some(itertools::Diff::Shorter(..)) => true,
|
||||
Some(itertools::Diff::FirstMismatch(_, mut remaining_self, mut remaining_prefix)) => {
|
||||
let first_prefix = remaining_prefix.next().expect("must be at least one value");
|
||||
/// Returns true if the directories in `prefix` are the same as the starting
|
||||
/// directories of `self`.
|
||||
pub fn prefix_matches(&self, prefix: &Self) -> bool {
|
||||
use PathRepresentation::*;
|
||||
match (&self.inner, &prefix.inner) {
|
||||
(Parts(self_parts), Parts(other_parts)) => self_parts.prefix_matches(&other_parts),
|
||||
(Parts(self_parts), _) => {
|
||||
let prefix_parts: DirsAndFileName = prefix.into();
|
||||
self_parts.prefix_matches(&prefix_parts)
|
||||
}
|
||||
(_, Parts(prefix_parts)) => {
|
||||
let self_parts: DirsAndFileName = self.into();
|
||||
self_parts.prefix_matches(&prefix_parts)
|
||||
}
|
||||
_ => {
|
||||
let self_parts: DirsAndFileName = self.into();
|
||||
let prefix_parts: DirsAndFileName = prefix.into();
|
||||
self_parts.prefix_matches(&prefix_parts)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// there must not be any other remaining parts in the prefix
|
||||
impl From<&'_ DirsAndFileName> for ObjectStorePath {
|
||||
fn from(other: &'_ DirsAndFileName) -> Self {
|
||||
other.clone().into()
|
||||
}
|
||||
}
|
||||
|
||||
impl From<DirsAndFileName> for ObjectStorePath {
|
||||
fn from(other: DirsAndFileName) -> Self {
|
||||
Self {
|
||||
inner: PathRepresentation::Parts(other),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Eq, Debug)]
|
||||
enum PathRepresentation {
|
||||
RawCloud(String),
|
||||
RawPathBuf(PathBuf),
|
||||
Parts(DirsAndFileName),
|
||||
}
|
||||
|
||||
impl Default for PathRepresentation {
|
||||
fn default() -> Self {
|
||||
Self::Parts(DirsAndFileName::default())
|
||||
}
|
||||
}
|
||||
|
||||
impl PathRepresentation {
|
||||
/// Add a part to the end of the path's directories, encoding any restricted
|
||||
/// characters.
|
||||
fn push_dir(self, part: impl Into<String>) -> Self {
|
||||
let mut dirs_and_file_name: DirsAndFileName = self.into();
|
||||
|
||||
dirs_and_file_name.push_dir(part);
|
||||
Self::Parts(dirs_and_file_name)
|
||||
}
|
||||
|
||||
/// Push a bunch of parts as directories in one go.
|
||||
fn push_all_dirs<'a>(self, parts: impl AsRef<[&'a str]>) -> Self {
|
||||
let mut dirs_and_file_name: DirsAndFileName = self.into();
|
||||
|
||||
dirs_and_file_name.push_all_dirs(parts);
|
||||
|
||||
Self::Parts(dirs_and_file_name)
|
||||
}
|
||||
|
||||
/// Add a `PathPart` to the end of the path's directories.
|
||||
fn push_part_as_dir(self, part: &PathPart) -> Self {
|
||||
let mut dirs_and_file_name: DirsAndFileName = self.into();
|
||||
|
||||
dirs_and_file_name.push_part_as_dir(part);
|
||||
Self::Parts(dirs_and_file_name)
|
||||
}
|
||||
|
||||
/// Add the parts of `ObjectStorePath` to the end of the path. Notably does
|
||||
/// *not* behave as `PathBuf::push` does: there is no way to replace the
|
||||
/// root. If `self` has a file name, that will be removed, then the
|
||||
/// directories of `path` will be appended, then any file name of `path`
|
||||
/// will be assigned to `self`.
|
||||
fn push_path(self, path: &ObjectStorePath) -> Self {
|
||||
let DirsAndFileName {
|
||||
directories: path_dirs,
|
||||
file_name: path_file_name,
|
||||
} = path.inner.to_owned().into();
|
||||
let mut dirs_and_file_name: DirsAndFileName = self.into();
|
||||
|
||||
dirs_and_file_name.directories.extend(path_dirs);
|
||||
dirs_and_file_name.file_name = path_file_name;
|
||||
|
||||
Self::Parts(dirs_and_file_name)
|
||||
}
|
||||
|
||||
/// Set the file name of this path
|
||||
fn set_file_name(self, part: impl Into<String>) -> Self {
|
||||
let part = part.into();
|
||||
let mut dirs_and_file_name: DirsAndFileName = self.into();
|
||||
|
||||
dirs_and_file_name.file_name = Some((&*part).into());
|
||||
Self::Parts(dirs_and_file_name)
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialEq for PathRepresentation {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
use PathRepresentation::*;
|
||||
match (self, other) {
|
||||
(Parts(self_parts), Parts(other_parts)) => self_parts == other_parts,
|
||||
(Parts(self_parts), _) => {
|
||||
let other_parts: DirsAndFileName = other.to_owned().into();
|
||||
*self_parts == other_parts
|
||||
}
|
||||
(_, Parts(other_parts)) => {
|
||||
let self_parts: DirsAndFileName = self.to_owned().into();
|
||||
self_parts == *other_parts
|
||||
}
|
||||
_ => {
|
||||
let self_parts: DirsAndFileName = self.to_owned().into();
|
||||
let other_parts: DirsAndFileName = other.to_owned().into();
|
||||
self_parts == other_parts
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Default)]
|
||||
pub(crate) struct DirsAndFileName {
|
||||
directories: Vec<PathPart>,
|
||||
file_name: Option<PathPart>,
|
||||
}
|
||||
|
||||
impl DirsAndFileName {
|
||||
pub(crate) fn prefix_matches(&self, prefix: &Self) -> bool {
|
||||
let diff = itertools::diff_with(
|
||||
self.directories.iter(),
|
||||
prefix.directories.iter(),
|
||||
|a, b| a == b,
|
||||
);
|
||||
|
||||
use itertools::Diff;
|
||||
match diff {
|
||||
None => match (self.file_name.as_ref(), prefix.file_name.as_ref()) {
|
||||
(Some(self_file), Some(prefix_file)) => self_file.0.starts_with(&prefix_file.0),
|
||||
(Some(_self_file), None) => true,
|
||||
(None, Some(_prefix_file)) => false,
|
||||
(None, None) => true,
|
||||
},
|
||||
Some(Diff::Shorter(_, mut remaining_self)) => {
|
||||
let next_dir = remaining_self
|
||||
.next()
|
||||
.expect("must have at least one mismatch to be in this case");
|
||||
match prefix.file_name.as_ref() {
|
||||
Some(prefix_file) => next_dir.0.starts_with(&prefix_file.0),
|
||||
None => true,
|
||||
}
|
||||
}
|
||||
Some(Diff::FirstMismatch(_, mut remaining_self, mut remaining_prefix)) => {
|
||||
let first_prefix = remaining_prefix
|
||||
.next()
|
||||
.expect("must have at least one mismatch to be in this case");
|
||||
|
||||
// There must not be any other remaining parts in the prefix
|
||||
remaining_prefix.next().is_none()
|
||||
// and the next item in self must start with the last item in the prefix
|
||||
&& remaining_self
|
||||
|
@ -105,9 +252,110 @@ impl ObjectStorePath {
|
|||
}
|
||||
}
|
||||
|
||||
/// Returns delimiter-separated parts contained in `self` after `prefix`.
|
||||
pub fn parts_after_prefix(&self, _prefix: &Self) -> &[PathPart] {
|
||||
unimplemented!()
|
||||
/// Returns all directory and file name `PathParts` in `self` after the
|
||||
/// specified `prefix`. Ignores any `file_name` part of `prefix`.
|
||||
/// Returns `None` if `self` dosen't start with `prefix`.
|
||||
pub(crate) fn parts_after_prefix(&self, prefix: &Self) -> Option<Vec<PathPart>> {
|
||||
let mut dirs_iter = self.directories.iter();
|
||||
let mut prefix_dirs_iter = prefix.directories.iter();
|
||||
|
||||
let mut parts = vec![];
|
||||
|
||||
for dir in &mut dirs_iter {
|
||||
let pre = prefix_dirs_iter.next();
|
||||
|
||||
match pre {
|
||||
None => {
|
||||
parts.push(dir.to_owned());
|
||||
break;
|
||||
}
|
||||
Some(p) if p == dir => continue,
|
||||
Some(_) => return None,
|
||||
}
|
||||
}
|
||||
|
||||
parts.extend(dirs_iter.cloned());
|
||||
|
||||
if let Some(file_name) = &self.file_name {
|
||||
parts.push(file_name.to_owned());
|
||||
}
|
||||
|
||||
Some(parts)
|
||||
}
|
||||
|
||||
/// Add a part to the end of the path's directories, encoding any restricted
|
||||
/// characters.
|
||||
fn push_dir(&mut self, part: impl Into<String>) {
|
||||
let part = part.into();
|
||||
self.directories.push((&*part).into());
|
||||
}
|
||||
|
||||
/// Push a bunch of parts as directories in one go.
|
||||
fn push_all_dirs<'a>(&mut self, parts: impl AsRef<[&'a str]>) {
|
||||
self.directories
|
||||
.extend(parts.as_ref().iter().map(|&v| v.into()));
|
||||
}
|
||||
|
||||
/// Add a `PathPart` to the end of the path's directories.
|
||||
pub(crate) fn push_part_as_dir(&mut self, part: &PathPart) {
|
||||
self.directories.push(part.to_owned());
|
||||
}
|
||||
}
|
||||
|
||||
impl From<PathRepresentation> for DirsAndFileName {
|
||||
fn from(path_rep: PathRepresentation) -> Self {
|
||||
match path_rep {
|
||||
PathRepresentation::RawCloud(path) => {
|
||||
let mut parts: Vec<_> = path
|
||||
.split_terminator(DELIMITER)
|
||||
.map(|s| PathPart(s.to_string()))
|
||||
.collect();
|
||||
let maybe_file_name = match parts.pop() {
|
||||
Some(file) if file.0.contains('.') => Some(file),
|
||||
Some(dir) => {
|
||||
parts.push(dir);
|
||||
None
|
||||
}
|
||||
None => None,
|
||||
};
|
||||
Self {
|
||||
directories: parts,
|
||||
file_name: maybe_file_name,
|
||||
}
|
||||
}
|
||||
PathRepresentation::RawPathBuf(path) => {
|
||||
let mut parts: Vec<_> = path
|
||||
.iter()
|
||||
.flat_map(|s| s.to_os_string().into_string().map(PathPart))
|
||||
.collect();
|
||||
|
||||
let maybe_file_name = match parts.pop() {
|
||||
Some(file) if file.0.contains('.') => Some(file),
|
||||
Some(dir) => {
|
||||
parts.push(dir);
|
||||
None
|
||||
}
|
||||
None => None,
|
||||
};
|
||||
Self {
|
||||
directories: parts,
|
||||
file_name: maybe_file_name,
|
||||
}
|
||||
}
|
||||
PathRepresentation::Parts(dirs_and_file_name) => dirs_and_file_name,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&'_ ObjectStorePath> for DirsAndFileName {
|
||||
fn from(other: &'_ ObjectStorePath) -> Self {
|
||||
other.clone().into()
|
||||
}
|
||||
}
|
||||
|
||||
impl From<ObjectStorePath> for DirsAndFileName {
|
||||
fn from(other: ObjectStorePath) -> Self {
|
||||
other.inner.into()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -124,7 +372,27 @@ impl CloudConverter {
|
|||
/// Creates a cloud storage location by joining this `ObjectStorePath`'s
|
||||
/// parts with `DELIMITER`
|
||||
pub fn convert(object_store_path: &ObjectStorePath) -> String {
|
||||
object_store_path.parts.iter().map(|p| &p.0).join(DELIMITER)
|
||||
match &object_store_path.inner {
|
||||
PathRepresentation::RawCloud(path) => path.to_owned(),
|
||||
PathRepresentation::RawPathBuf(_path) => {
|
||||
todo!("convert");
|
||||
}
|
||||
PathRepresentation::Parts(dirs_and_file_name) => {
|
||||
let mut path = dirs_and_file_name
|
||||
.directories
|
||||
.iter()
|
||||
.map(|p| &p.0)
|
||||
.join(DELIMITER);
|
||||
|
||||
if !path.is_empty() {
|
||||
path.push_str(DELIMITER);
|
||||
}
|
||||
if let Some(file_name) = &dirs_and_file_name.file_name {
|
||||
path.push_str(&file_name.0);
|
||||
}
|
||||
path
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -138,7 +406,23 @@ impl FileConverter {
|
|||
/// `PathBuf` building implementation appropriate for the current
|
||||
/// platform.
|
||||
pub fn convert(object_store_path: &ObjectStorePath) -> PathBuf {
|
||||
object_store_path.parts.iter().map(|p| &p.0).collect()
|
||||
match &object_store_path.inner {
|
||||
PathRepresentation::RawCloud(_path) => {
|
||||
todo!("convert");
|
||||
}
|
||||
PathRepresentation::RawPathBuf(path) => path.to_owned(),
|
||||
PathRepresentation::Parts(dirs_and_file_name) => {
|
||||
let mut path: PathBuf = dirs_and_file_name
|
||||
.directories
|
||||
.iter()
|
||||
.map(|p| &p.0)
|
||||
.collect();
|
||||
if let Some(file_name) = &dirs_and_file_name.file_name {
|
||||
path.set_file_name(&file_name.0);
|
||||
}
|
||||
path
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -254,12 +538,12 @@ mod tests {
|
|||
//
|
||||
|
||||
#[test]
|
||||
fn cloud_prefix_no_trailing_delimiter_or_filename() {
|
||||
fn cloud_prefix_no_trailing_delimiter_or_file_name() {
|
||||
// Use case: a file named `test_file.json` exists in object storage and it
|
||||
// should be returned for a search on prefix `test`, so the prefix path
|
||||
// should not get a trailing delimiter automatically added
|
||||
let mut prefix = ObjectStorePath::default();
|
||||
prefix.push("test");
|
||||
prefix.set_file_name("test");
|
||||
|
||||
let converted = CloudConverter::convert(&prefix);
|
||||
assert_eq!(converted, "test");
|
||||
|
@ -271,7 +555,7 @@ mod tests {
|
|||
// `foo_test.json`. A search for the prefix `foo/` should return
|
||||
// `foo/bar.json` but not `foo_test.json'.
|
||||
let mut prefix = ObjectStorePath::default();
|
||||
prefix.push_all(&["test", ""]);
|
||||
prefix.push_dir("test");
|
||||
|
||||
let converted = CloudConverter::convert(&prefix);
|
||||
assert_eq!(converted, "test/");
|
||||
|
@ -280,82 +564,266 @@ mod tests {
|
|||
#[test]
|
||||
fn push_encodes() {
|
||||
let mut location = ObjectStorePath::default();
|
||||
location.push("foo/bar");
|
||||
location.push("baz%2Ftest");
|
||||
location.push_dir("foo/bar");
|
||||
location.push_dir("baz%2Ftest");
|
||||
|
||||
let converted = CloudConverter::convert(&location);
|
||||
assert_eq!(converted, "foo%2Fbar/baz%252Ftest");
|
||||
assert_eq!(converted, "foo%2Fbar/baz%252Ftest/");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn push_all_encodes() {
|
||||
let mut location = ObjectStorePath::default();
|
||||
location.push_all(&["foo/bar", "baz%2Ftest"]);
|
||||
location.push_all_dirs(&["foo/bar", "baz%2Ftest"]);
|
||||
|
||||
let converted = CloudConverter::convert(&location);
|
||||
assert_eq!(converted, "foo%2Fbar/baz%252Ftest");
|
||||
assert_eq!(converted, "foo%2Fbar/baz%252Ftest/");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn starts_with_parts() {
|
||||
fn prefix_matches() {
|
||||
let mut haystack = ObjectStorePath::default();
|
||||
haystack.push_all(&["foo/bar", "baz%2Ftest", "something"]);
|
||||
haystack.push_all_dirs(&["foo/bar", "baz%2Ftest", "something"]);
|
||||
|
||||
// self starts with self
|
||||
assert!(
|
||||
haystack.starts_with(&haystack),
|
||||
haystack.prefix_matches(&haystack),
|
||||
"{:?} should have started with {:?}",
|
||||
haystack,
|
||||
haystack
|
||||
);
|
||||
|
||||
// a longer prefix doesn't match
|
||||
let mut needle = haystack.clone();
|
||||
needle.push("longer now");
|
||||
needle.push_dir("longer now");
|
||||
assert!(
|
||||
!haystack.starts_with(&needle),
|
||||
!haystack.prefix_matches(&needle),
|
||||
"{:?} shouldn't have started with {:?}",
|
||||
haystack,
|
||||
needle
|
||||
);
|
||||
|
||||
// one dir prefix matches
|
||||
let mut needle = ObjectStorePath::default();
|
||||
needle.push("foo/bar");
|
||||
needle.push_dir("foo/bar");
|
||||
assert!(
|
||||
haystack.starts_with(&needle),
|
||||
"{:?} should have started with {:?}",
|
||||
haystack,
|
||||
needle
|
||||
);
|
||||
needle.push("baz%2Ftest");
|
||||
assert!(
|
||||
haystack.starts_with(&needle),
|
||||
haystack.prefix_matches(&needle),
|
||||
"{:?} should have started with {:?}",
|
||||
haystack,
|
||||
needle
|
||||
);
|
||||
|
||||
let mut needle = ObjectStorePath::default();
|
||||
needle.push("f");
|
||||
// two dir prefix matches
|
||||
needle.push_dir("baz%2Ftest");
|
||||
assert!(
|
||||
haystack.starts_with(&needle),
|
||||
haystack.prefix_matches(&needle),
|
||||
"{:?} should have started with {:?}",
|
||||
haystack,
|
||||
needle
|
||||
);
|
||||
needle.push("oo/bar");
|
||||
|
||||
// partial dir prefix matches
|
||||
let mut needle = ObjectStorePath::default();
|
||||
needle.push_dir("f");
|
||||
assert!(
|
||||
!haystack.starts_with(&needle),
|
||||
"{:?} shouldn't have started with {:?}",
|
||||
haystack.prefix_matches(&needle),
|
||||
"{:?} should have started with {:?}",
|
||||
haystack,
|
||||
needle
|
||||
);
|
||||
|
||||
// one dir and one partial dir matches
|
||||
let mut needle = ObjectStorePath::default();
|
||||
needle.push_all(&["foo/bar", "baz"]);
|
||||
needle.push_all_dirs(&["foo/bar", "baz"]);
|
||||
assert!(
|
||||
haystack.starts_with(&needle),
|
||||
haystack.prefix_matches(&needle),
|
||||
"{:?} should have started with {:?}",
|
||||
haystack,
|
||||
needle
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn prefix_matches_with_file_name() {
|
||||
let mut haystack = ObjectStorePath::default();
|
||||
haystack.push_all_dirs(&["foo/bar", "baz%2Ftest", "something"]);
|
||||
|
||||
let mut needle = haystack.clone();
|
||||
|
||||
// All directories match and file name is a prefix
|
||||
haystack.set_file_name("foo.segment");
|
||||
needle.set_file_name("foo");
|
||||
|
||||
assert!(
|
||||
haystack.prefix_matches(&needle),
|
||||
"{:?} should have started with {:?}",
|
||||
haystack,
|
||||
needle
|
||||
);
|
||||
|
||||
// All directories match but file name is not a prefix
|
||||
needle.set_file_name("e");
|
||||
|
||||
assert!(
|
||||
!haystack.prefix_matches(&needle),
|
||||
"{:?} should not have started with {:?}",
|
||||
haystack,
|
||||
needle
|
||||
);
|
||||
|
||||
// Not all directories match; file name is a prefix of the next directory; this
|
||||
// matches
|
||||
let mut needle = ObjectStorePath::default();
|
||||
needle.push_all_dirs(&["foo/bar", "baz%2Ftest"]);
|
||||
needle.set_file_name("s");
|
||||
|
||||
assert!(
|
||||
haystack.prefix_matches(&needle),
|
||||
"{:?} should have started with {:?}",
|
||||
haystack,
|
||||
needle
|
||||
);
|
||||
|
||||
// Not all directories match; file name is NOT a prefix of the next directory;
|
||||
// no match
|
||||
needle.set_file_name("p");
|
||||
|
||||
assert!(
|
||||
!haystack.prefix_matches(&needle),
|
||||
"{:?} should not have started with {:?}",
|
||||
haystack,
|
||||
needle
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn convert_raw_before_partial_eq() {
|
||||
// dir and file_name
|
||||
let cloud = ObjectStorePath::from_cloud_unchecked("test_dir/test_file.json");
|
||||
let mut built = ObjectStorePath::default();
|
||||
built.push_dir("test_dir");
|
||||
built.set_file_name("test_file.json");
|
||||
|
||||
assert_eq!(built, cloud);
|
||||
|
||||
// dir, no file_name
|
||||
let cloud = ObjectStorePath::from_cloud_unchecked("test_dir");
|
||||
let mut built = ObjectStorePath::default();
|
||||
built.push_dir("test_dir");
|
||||
|
||||
assert_eq!(built, cloud);
|
||||
|
||||
// file_name, no dir
|
||||
let cloud = ObjectStorePath::from_cloud_unchecked("test_file.json");
|
||||
let mut built = ObjectStorePath::default();
|
||||
built.set_file_name("test_file.json");
|
||||
|
||||
assert_eq!(built, cloud);
|
||||
|
||||
// empty
|
||||
let cloud = ObjectStorePath::from_cloud_unchecked("");
|
||||
let built = ObjectStorePath::default();
|
||||
|
||||
assert_eq!(built, cloud);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn path_rep_conversions() {
|
||||
// dir and file name
|
||||
let cloud = PathRepresentation::RawCloud("foo/bar/blah.json".into());
|
||||
let cloud_parts: DirsAndFileName = cloud.into();
|
||||
|
||||
let path_buf = PathRepresentation::RawPathBuf("foo/bar/blah.json".into());
|
||||
let path_buf_parts: DirsAndFileName = path_buf.into();
|
||||
|
||||
let mut expected_parts = DirsAndFileName::default();
|
||||
expected_parts.push_dir("foo");
|
||||
expected_parts.push_dir("bar");
|
||||
expected_parts.file_name = Some("blah.json".into());
|
||||
|
||||
assert_eq!(cloud_parts, expected_parts);
|
||||
assert_eq!(path_buf_parts, expected_parts);
|
||||
|
||||
// dir, no file name
|
||||
let cloud = PathRepresentation::RawCloud("foo/bar".into());
|
||||
let cloud_parts: DirsAndFileName = cloud.into();
|
||||
|
||||
let path_buf = PathRepresentation::RawPathBuf("foo/bar".into());
|
||||
let path_buf_parts: DirsAndFileName = path_buf.into();
|
||||
|
||||
expected_parts.file_name = None;
|
||||
|
||||
assert_eq!(cloud_parts, expected_parts);
|
||||
assert_eq!(path_buf_parts, expected_parts);
|
||||
|
||||
// no dir, file name
|
||||
let cloud = PathRepresentation::RawCloud("blah.json".into());
|
||||
let cloud_parts: DirsAndFileName = cloud.into();
|
||||
|
||||
let path_buf = PathRepresentation::RawPathBuf("blah.json".into());
|
||||
let path_buf_parts: DirsAndFileName = path_buf.into();
|
||||
|
||||
assert!(cloud_parts.directories.is_empty());
|
||||
assert_eq!(cloud_parts.file_name.unwrap().0, "blah.json");
|
||||
|
||||
assert!(path_buf_parts.directories.is_empty());
|
||||
assert_eq!(path_buf_parts.file_name.unwrap().0, "blah.json");
|
||||
|
||||
// empty
|
||||
let cloud = PathRepresentation::RawCloud("".into());
|
||||
let cloud_parts: DirsAndFileName = cloud.into();
|
||||
|
||||
let path_buf = PathRepresentation::RawPathBuf("".into());
|
||||
let path_buf_parts: DirsAndFileName = path_buf.into();
|
||||
|
||||
assert!(cloud_parts.directories.is_empty());
|
||||
assert!(cloud_parts.file_name.is_none());
|
||||
|
||||
assert!(path_buf_parts.directories.is_empty());
|
||||
assert!(path_buf_parts.file_name.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parts_after_prefix_behavior() {
|
||||
let mut existing_path = DirsAndFileName::default();
|
||||
existing_path.push_all_dirs(&["apple", "bear", "cow", "dog"]);
|
||||
existing_path.file_name = Some("egg.json".into());
|
||||
|
||||
// Prefix with one directory
|
||||
let mut prefix = DirsAndFileName::default();
|
||||
prefix.push_dir("apple");
|
||||
let expected_parts: Vec<PathPart> = vec!["bear", "cow", "dog", "egg.json"]
|
||||
.into_iter()
|
||||
.map(Into::into)
|
||||
.collect();
|
||||
let parts = existing_path.parts_after_prefix(&prefix).unwrap();
|
||||
assert_eq!(parts, expected_parts);
|
||||
|
||||
// Prefix with two directories
|
||||
let mut prefix = DirsAndFileName::default();
|
||||
prefix.push_all_dirs(&["apple", "bear"]);
|
||||
let expected_parts: Vec<PathPart> = vec!["cow", "dog", "egg.json"]
|
||||
.into_iter()
|
||||
.map(Into::into)
|
||||
.collect();
|
||||
let parts = existing_path.parts_after_prefix(&prefix).unwrap();
|
||||
assert_eq!(parts, expected_parts);
|
||||
|
||||
// Not a prefix
|
||||
let mut prefix = DirsAndFileName::default();
|
||||
prefix.push_dir("cow");
|
||||
assert!(existing_path.parts_after_prefix(&prefix).is_none());
|
||||
|
||||
// Prefix with a partial directory
|
||||
let mut prefix = DirsAndFileName::default();
|
||||
prefix.push_dir("ap");
|
||||
assert!(existing_path.parts_after_prefix(&prefix).is_none());
|
||||
|
||||
// Prefix matches but there aren't any parts after it
|
||||
let mut existing_path = DirsAndFileName::default();
|
||||
existing_path.push_all_dirs(&["apple", "bear", "cow", "dog"]);
|
||||
let prefix = existing_path.clone();
|
||||
let parts = existing_path.parts_after_prefix(&prefix).unwrap();
|
||||
assert!(parts.is_empty());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -498,7 +498,8 @@ impl RemoteServer for RemoteServerImpl {
|
|||
// location in the store for the configuration file
|
||||
fn config_location(id: u32) -> ObjectStorePath {
|
||||
let mut path = ObjectStorePath::default();
|
||||
path.push_all(&[&id.to_string(), "config.json"]);
|
||||
path.push_dir(id.to_string());
|
||||
path.set_file_name("config.json");
|
||||
path
|
||||
}
|
||||
|
||||
|
@ -789,7 +790,8 @@ partition_key:
|
|||
server.store_configuration().await.unwrap();
|
||||
|
||||
let mut location = ObjectStorePath::default();
|
||||
location.push_all(&["1", "config.json"]);
|
||||
location.push_dir("1");
|
||||
location.set_file_name("config.json");
|
||||
|
||||
let read_data = server
|
||||
.store
|
||||
|
|
|
@ -156,7 +156,7 @@ where
|
|||
|
||||
let mut location = self.data_path.clone();
|
||||
let file_name = format!("{}.parquet", table_name);
|
||||
location.push(&file_name);
|
||||
location.set_file_name(&file_name);
|
||||
self.write_batches(batches, &location).await?;
|
||||
self.mark_table_finished(pos);
|
||||
|
||||
|
@ -167,7 +167,7 @@ where
|
|||
|
||||
let mut partition_meta_path = self.metadata_path.clone();
|
||||
let key = format!("{}.json", &self.partition_meta.key);
|
||||
partition_meta_path.push(&key);
|
||||
partition_meta_path.set_file_name(&key);
|
||||
let json_data = serde_json::to_vec(&self.partition_meta).context(JsonGenerationError)?;
|
||||
let data = Bytes::from(json_data);
|
||||
let len = data.len();
|
||||
|
@ -363,10 +363,10 @@ mem,host=A,region=west used=45 1
|
|||
let chunk = Arc::new(chunk);
|
||||
let (tx, rx) = tokio::sync::oneshot::channel();
|
||||
let mut metadata_path = ObjectStorePath::default();
|
||||
metadata_path.push("meta");
|
||||
metadata_path.push_dir("meta");
|
||||
|
||||
let mut data_path = ObjectStorePath::default();
|
||||
data_path.push("data");
|
||||
data_path.push_dir("data");
|
||||
|
||||
let snapshot = snapshot_chunk(
|
||||
metadata_path.clone(),
|
||||
|
@ -381,7 +381,7 @@ mem,host=A,region=west used=45 1
|
|||
rx.await.unwrap();
|
||||
|
||||
let mut location = metadata_path;
|
||||
location.push("testaroo.json");
|
||||
location.set_file_name("testaroo.json");
|
||||
|
||||
let summary = store
|
||||
.get(&location)
|
||||
|
@ -416,10 +416,10 @@ mem,host=A,region=west used=45 1
|
|||
let store = Arc::new(ObjectStore::new_in_memory(InMemory::new()));
|
||||
let chunk = Arc::new(ChunkWB::new(11));
|
||||
let mut metadata_path = ObjectStorePath::default();
|
||||
metadata_path.push("meta");
|
||||
metadata_path.push_dir("meta");
|
||||
|
||||
let mut data_path = ObjectStorePath::default();
|
||||
data_path.push("data");
|
||||
data_path.push_dir("data");
|
||||
|
||||
let snapshot = Snapshot::new("testaroo", metadata_path, data_path, store, chunk, tables);
|
||||
|
||||
|
|
|
@ -685,10 +685,10 @@ async fn snapshot_partition<M: ConnectionManager + Send + Sync + Debug + 'static
|
|||
})?;
|
||||
|
||||
let mut metadata_path = ObjectStorePath::default();
|
||||
metadata_path.push(&db_name.to_string());
|
||||
metadata_path.push_dir(&db_name.to_string());
|
||||
let mut data_path = metadata_path.clone();
|
||||
metadata_path.push("meta");
|
||||
data_path.push_all(&["data", &snapshot.partition]);
|
||||
metadata_path.push_dir("meta");
|
||||
data_path.push_all_dirs(&["data", &snapshot.partition]);
|
||||
|
||||
let partition_key = &snapshot.partition;
|
||||
let chunk = db.rollover_partition(partition_key).await.unwrap();
|
||||
|
|
Loading…
Reference in New Issue