refactor: Always get a path to build from the object store

pull/24376/head
Carol (Nichols || Goulding) 2021-01-19 10:44:42 -05:00
parent 5e74ae2ee7
commit fdbe602e57
11 changed files with 136 additions and 64 deletions

View File

@ -54,6 +54,11 @@ impl AmazonS3 {
}
}
/// Return a new location path appropriate for this object storage
pub fn new_path(&self) -> ObjectStorePath {
ObjectStorePath::default()
}
/// Save the provided bytes to the specified location.
pub async fn put<S>(&self, location: &ObjectStorePath, bytes: S, length: usize) -> Result<()>
where
@ -309,7 +314,6 @@ impl Error {
#[cfg(test)]
mod tests {
use crate::{
path::ObjectStorePath,
tests::{get_nonexistent_object, list_with_delimiter, put_get_delete_list},
AmazonS3, Error, ObjectStore,
};
@ -417,9 +421,10 @@ mod tests {
let (_, bucket_name) = region_and_bucket_name()?;
let region = rusoto_core::Region::UsWest1;
let integration = ObjectStore::new_amazon_s3(AmazonS3::new(region, &bucket_name));
let location_name = ObjectStorePath::from_cloud_unchecked(NON_EXISTENT_NAME);
let mut location = integration.new_path();
location.set_file_name(NON_EXISTENT_NAME);
let err = get_nonexistent_object(&integration, Some(location_name))
let err = get_nonexistent_object(&integration, Some(location))
.await
.unwrap_err();
if let Some(Error::UnableToListDataFromS3 { source, bucket }) =
@ -439,9 +444,10 @@ mod tests {
maybe_skip_integration!();
let (region, bucket_name) = region_and_bucket_name()?;
let integration = ObjectStore::new_amazon_s3(AmazonS3::new(region, &bucket_name));
let location_name = ObjectStorePath::from_cloud_unchecked(NON_EXISTENT_NAME);
let mut location = integration.new_path();
location.set_file_name(NON_EXISTENT_NAME);
let err = get_nonexistent_object(&integration, Some(location_name))
let err = get_nonexistent_object(&integration, Some(location))
.await
.unwrap_err();
if let Some(Error::UnableToGetDataFromS3 {
@ -469,9 +475,10 @@ mod tests {
let (region, _) = region_and_bucket_name()?;
let bucket_name = NON_EXISTENT_NAME;
let integration = ObjectStore::new_amazon_s3(AmazonS3::new(region, bucket_name));
let location_name = ObjectStorePath::from_cloud_unchecked(NON_EXISTENT_NAME);
let mut location = integration.new_path();
location.set_file_name(NON_EXISTENT_NAME);
let err = get_nonexistent_object(&integration, Some(location_name))
let err = get_nonexistent_object(&integration, Some(location))
.await
.unwrap_err();
if let Some(Error::UnableToListDataFromS3 { source, bucket }) =
@ -496,13 +503,14 @@ mod tests {
let (_, bucket_name) = region_and_bucket_name()?;
let region = rusoto_core::Region::UsWest1;
let integration = ObjectStore::new_amazon_s3(AmazonS3::new(region, &bucket_name));
let location_name = ObjectStorePath::from_cloud_unchecked(NON_EXISTENT_NAME);
let mut location = integration.new_path();
location.set_file_name(NON_EXISTENT_NAME);
let data = Bytes::from("arbitrary data");
let stream_data = std::io::Result::Ok(data.clone());
let err = integration
.put(
&location_name,
&location,
futures::stream::once(async move { stream_data }),
data.len(),
)
@ -531,13 +539,14 @@ mod tests {
let (region, _) = region_and_bucket_name()?;
let bucket_name = NON_EXISTENT_NAME;
let integration = ObjectStore::new_amazon_s3(AmazonS3::new(region, bucket_name));
let location_name = ObjectStorePath::from_cloud_unchecked(NON_EXISTENT_NAME);
let mut location = integration.new_path();
location.set_file_name(NON_EXISTENT_NAME);
let data = Bytes::from("arbitrary data");
let stream_data = std::io::Result::Ok(data.clone());
let err = integration
.put(
&location_name,
&location,
futures::stream::once(async move { stream_data }),
data.len(),
)
@ -565,9 +574,10 @@ mod tests {
maybe_skip_integration!();
let (region, bucket_name) = region_and_bucket_name()?;
let integration = ObjectStore::new_amazon_s3(AmazonS3::new(region, &bucket_name));
let location_name = ObjectStorePath::from_cloud_unchecked(NON_EXISTENT_NAME);
let mut location = integration.new_path();
location.set_file_name(NON_EXISTENT_NAME);
let result = integration.delete(&location_name).await;
let result = integration.delete(&location).await;
assert!(result.is_ok());
@ -581,9 +591,10 @@ mod tests {
let (_, bucket_name) = region_and_bucket_name()?;
let region = rusoto_core::Region::UsWest1;
let integration = ObjectStore::new_amazon_s3(AmazonS3::new(region, &bucket_name));
let location_name = ObjectStorePath::from_cloud_unchecked(NON_EXISTENT_NAME);
let mut location = integration.new_path();
location.set_file_name(NON_EXISTENT_NAME);
let err = integration.delete(&location_name).await.unwrap_err();
let err = integration.delete(&location).await.unwrap_err();
if let Error::UnableToDeleteDataFromS3 {
source,
bucket,
@ -606,9 +617,10 @@ mod tests {
let (region, _) = region_and_bucket_name()?;
let bucket_name = NON_EXISTENT_NAME;
let integration = ObjectStore::new_amazon_s3(AmazonS3::new(region, bucket_name));
let location_name = ObjectStorePath::from_cloud_unchecked(NON_EXISTENT_NAME);
let mut location = integration.new_path();
location.set_file_name(NON_EXISTENT_NAME);
let err = integration.delete(&location_name).await.unwrap_err();
let err = integration.delete(&location).await.unwrap_err();
if let Error::UnableToDeleteDataFromS3 {
source,
bucket,

View File

@ -65,6 +65,11 @@ impl MicrosoftAzure {
Self::new(account, master_key, container_name)
}
/// Return a new location path appropriate for this object storage
pub fn new_path(&self) -> ObjectStorePath {
ObjectStorePath::default()
}
/// Save the provided bytes to the specified location.
pub async fn put<S>(&self, location: &ObjectStorePath, bytes: S, length: usize) -> Result<()>
where

View File

@ -34,6 +34,11 @@ impl File {
FileConverter::convert(&path)
}
/// Return a new location path appropriate for this object storage
pub fn new_path(&self) -> ObjectStorePath {
ObjectStorePath::default()
}
/// Save the provided bytes to the specified location.
pub async fn put<S>(&self, location: &ObjectStorePath, bytes: S, length: usize) -> Result<()>
where
@ -161,7 +166,8 @@ mod tests {
let integration = ObjectStore::new_file(File::new(root.path()));
let bytes = stream::once(async { Ok(Bytes::from("hello world")) });
let location = ObjectStorePath::from_path_buf_unchecked("junk");
let mut location = integration.new_path();
location.set_file_name("junk");
let res = integration.put(&location, bytes, 0).await;
assert!(matches!(
@ -178,14 +184,14 @@ mod tests {
#[tokio::test]
async fn creates_dir_if_not_present() -> Result<()> {
let root = TempDir::new()?;
let storage = ObjectStore::new_file(File::new(root.path()));
let integration = ObjectStore::new_file(File::new(root.path()));
let data = Bytes::from("arbitrary data");
let mut location = ObjectStorePath::default();
let mut location = integration.new_path();
location.push_all_dirs(&["nested", "file", "test_file"]);
let stream_data = std::io::Result::Ok(data.clone());
storage
integration
.put(
&location,
futures::stream::once(async move { stream_data }),
@ -193,7 +199,7 @@ mod tests {
)
.await?;
let read_data = storage
let read_data = integration
.get(&location)
.await?
.map_ok(|b| bytes::BytesMut::from(&b[..]))

View File

@ -24,6 +24,11 @@ impl GoogleCloudStorage {
}
}
/// Return a new location path appropriate for this object storage
pub fn new_path(&self) -> ObjectStorePath {
ObjectStorePath::default()
}
/// Save the provided bytes to the specified location.
pub async fn put<S>(&self, location: &ObjectStorePath, bytes: S, length: usize) -> Result<()>
where
@ -142,7 +147,6 @@ impl GoogleCloudStorage {
#[cfg(test)]
mod test {
use crate::{
path::ObjectStorePath,
tests::{get_nonexistent_object, put_get_delete_list},
Error, GoogleCloudStorage, ObjectStore,
};
@ -196,11 +200,13 @@ mod test {
async fn gcs_test_get_nonexistent_location() -> Result<()> {
maybe_skip_integration!();
let bucket_name = bucket_name()?;
let location_name = ObjectStorePath::from_cloud_unchecked(NON_EXISTENT_NAME);
let integration =
ObjectStore::new_google_cloud_storage(GoogleCloudStorage::new(&bucket_name));
let result = get_nonexistent_object(&integration, Some(location_name)).await?;
let mut location = integration.new_path();
location.set_file_name(NON_EXISTENT_NAME);
let result = get_nonexistent_object(&integration, Some(location)).await?;
assert_eq!(
result,
@ -217,11 +223,13 @@ mod test {
async fn gcs_test_get_nonexistent_bucket() -> Result<()> {
maybe_skip_integration!();
let bucket_name = NON_EXISTENT_NAME;
let location_name = ObjectStorePath::from_cloud_unchecked(NON_EXISTENT_NAME);
let integration =
ObjectStore::new_google_cloud_storage(GoogleCloudStorage::new(bucket_name));
let result = get_nonexistent_object(&integration, Some(location_name)).await?;
let mut location = integration.new_path();
location.set_file_name(NON_EXISTENT_NAME);
let result = get_nonexistent_object(&integration, Some(location)).await?;
assert_eq!(result, Bytes::from("Not Found"));
@ -232,11 +240,13 @@ mod test {
async fn gcs_test_delete_nonexistent_location() -> Result<()> {
maybe_skip_integration!();
let bucket_name = bucket_name()?;
let location_name = ObjectStorePath::from_cloud_unchecked(NON_EXISTENT_NAME);
let integration =
ObjectStore::new_google_cloud_storage(GoogleCloudStorage::new(&bucket_name));
let err = integration.delete(&location_name).await.unwrap_err();
let mut location = integration.new_path();
location.set_file_name(NON_EXISTENT_NAME);
let err = integration.delete(&location).await.unwrap_err();
if let Error::UnableToDeleteDataFromGcs {
source,
@ -258,11 +268,13 @@ mod test {
async fn gcs_test_delete_nonexistent_bucket() -> Result<()> {
maybe_skip_integration!();
let bucket_name = NON_EXISTENT_NAME;
let location_name = ObjectStorePath::from_cloud_unchecked(NON_EXISTENT_NAME);
let integration =
ObjectStore::new_google_cloud_storage(GoogleCloudStorage::new(bucket_name));
let err = integration.delete(&location_name).await.unwrap_err();
let mut location = integration.new_path();
location.set_file_name(NON_EXISTENT_NAME);
let err = integration.delete(&location).await.unwrap_err();
if let Error::UnableToDeleteDataFromGcs {
source,
@ -284,15 +296,18 @@ mod test {
async fn gcs_test_put_nonexistent_bucket() -> Result<()> {
maybe_skip_integration!();
let bucket_name = NON_EXISTENT_NAME;
let location_name = ObjectStorePath::from_cloud_unchecked(NON_EXISTENT_NAME);
let integration =
ObjectStore::new_google_cloud_storage(GoogleCloudStorage::new(bucket_name));
let mut location = integration.new_path();
location.set_file_name(NON_EXISTENT_NAME);
let data = Bytes::from("arbitrary data");
let stream_data = std::io::Result::Ok(data.clone());
let result = integration
.put(
&location_name,
&location,
futures::stream::once(async move { stream_data }),
data.len(),
)

View File

@ -65,6 +65,18 @@ impl ObjectStore {
Self(ObjectStoreIntegration::MicrosoftAzure(Box::new(azure)))
}
/// Return a new location path appropriate for this object storage
pub fn new_path(&self) -> ObjectStorePath {
use ObjectStoreIntegration::*;
match &self.0 {
AmazonS3(s3) => s3.new_path(),
GoogleCloudStorage(gcs) => gcs.new_path(),
InMemory(in_mem) => in_mem.new_path(),
File(file) => file.new_path(),
MicrosoftAzure(azure) => azure.new_path(),
}
}
/// Save the provided bytes to the specified location.
pub async fn put<S>(&self, location: &ObjectStorePath, bytes: S, length: usize) -> Result<()>
where
@ -368,7 +380,7 @@ mod tests {
);
let data = Bytes::from("arbitrary data");
let mut location = ObjectStorePath::default();
let mut location = storage.new_path();
location.push_dir("test_dir");
location.set_file_name("test_file.json");
@ -386,13 +398,13 @@ mod tests {
assert_eq!(content_list, &[location.clone()]);
// List everything starting with a prefix that should return results
let mut prefix = ObjectStorePath::default();
let mut prefix = storage.new_path();
prefix.push_dir("test_dir");
let content_list = flatten_list_stream(storage, Some(&prefix)).await?;
assert_eq!(content_list, &[location.clone()]);
// List everything starting with a prefix that shouldn't return results
let mut prefix = ObjectStorePath::default();
let mut prefix = storage.new_path();
prefix.push_dir("something");
let content_list = flatten_list_stream(storage, Some(&prefix)).await?;
assert!(content_list.is_empty());
@ -447,7 +459,7 @@ mod tests {
.unwrap();
}
let mut prefix = ObjectStorePath::default();
let mut prefix = storage.new_path();
prefix.push_all_dirs(&["mydb", "wal"]);
let mut expected_000 = prefix.clone();
@ -469,11 +481,11 @@ mod tests {
assert!(object.last_modified > time_before_creation);
// List with a prefix containing a partial "file name"
let mut prefix = ObjectStorePath::default();
let mut prefix = storage.new_path();
prefix.push_all_dirs(&["mydb", "wal", "000", "000"]);
prefix.set_file_name("001");
let mut expected_location = ObjectStorePath::default();
let mut expected_location = storage.new_path();
expected_location.push_all_dirs(&["mydb", "wal", "000", "000"]);
expected_location.set_file_name("001.segment");
@ -499,8 +511,11 @@ mod tests {
storage: &ObjectStore,
location: Option<ObjectStorePath>,
) -> Result<Bytes> {
let location = location
.unwrap_or_else(|| ObjectStorePath::from_cloud_unchecked("this_file_should_not_exist"));
let location = location.unwrap_or_else(|| {
let mut loc = storage.new_path();
loc.set_file_name("this_file_should_not_exist");
loc
});
let content_list = flatten_list_stream(storage, Some(&location)).await?;
assert!(content_list.is_empty());

View File

@ -36,6 +36,11 @@ impl InMemory {
}
}
/// Return a new location path appropriate for this object storage
pub fn new_path(&self) -> ObjectStorePath {
ObjectStorePath::default()
}
/// Save the provided bytes to the specified location.
pub async fn put<S>(&self, location: &ObjectStorePath, bytes: S, length: usize) -> Result<()>
where
@ -186,7 +191,8 @@ mod tests {
let integration = ObjectStore::new_in_memory(InMemory::new());
let bytes = stream::once(async { Ok(Bytes::from("hello world")) });
let location = ObjectStorePath::from_cloud_unchecked("junk");
let mut location = integration.new_path();
location.set_file_name("junk");
let res = integration.put(&location, bytes, 0).await;
assert!(matches!(

View File

@ -362,7 +362,7 @@ impl Segment {
store: Arc<ObjectStore>,
) -> Result<()> {
let data = self.to_file_bytes(writer_id)?;
let location = database_object_store_path(writer_id, db_name);
let location = database_object_store_path(writer_id, db_name, &store);
let location = object_store_path_for_segment(&location, self.id)?;
let len = data.len();
@ -534,8 +534,12 @@ fn object_store_path_for_segment(
}
// base location in object store for a given database name
fn database_object_store_path(writer_id: u32, database_name: &DatabaseName<'_>) -> ObjectStorePath {
let mut path = ObjectStorePath::default();
fn database_object_store_path(
writer_id: u32,
database_name: &DatabaseName<'_>,
store: &ObjectStore,
) -> ObjectStorePath {
let mut path = store.new_path();
path.push_dir(format!("{}", writer_id));
path.push_dir(database_name.to_string());
path
@ -546,6 +550,7 @@ mod tests {
use super::*;
use data_types::{data::lines_to_replicated_write, database_rules::DatabaseRules};
use influxdb_line_protocol::parse_lines;
use object_store::memory::InMemory;
#[test]
fn append_increments_current_size_and_uses_existing_segment() {
@ -853,7 +858,8 @@ mod tests {
#[test]
fn valid_object_store_path_for_segment() {
let mut base_path = ObjectStorePath::default();
let storage = ObjectStore::new_in_memory(InMemory::new());
let mut base_path = storage.new_path();
base_path.push_all_dirs(&["1", "mydb"]);
let segment_path = object_store_path_for_segment(&base_path, 23).unwrap();
@ -877,7 +883,8 @@ mod tests {
#[test]
fn object_store_path_for_segment_out_of_bounds() {
let mut base_path = ObjectStorePath::default();
let storage = ObjectStore::new_in_memory(InMemory::new());
let mut base_path = storage.new_path();
base_path.push_all_dirs(&["1", "mydb"]);
let segment_path = object_store_path_for_segment(&base_path, 0).err().unwrap();

View File

@ -137,6 +137,7 @@ impl<'a> Drop for CreateDatabaseHandle<'a> {
mod test {
use super::*;
use object_store::path::cloud::CloudConverter;
use object_store::{memory::InMemory, ObjectStore};
#[test]
fn create_db() {
@ -157,7 +158,9 @@ mod test {
#[test]
fn object_store_path_for_database_config() {
let path = ObjectStorePath::from_cloud_unchecked("1");
let storage = ObjectStore::new_in_memory(InMemory::new());
let mut path = storage.new_path();
path.push_dir("1");
let name = DatabaseName::new("foo").unwrap();
let rules_path = super::object_store_path_for_database_config(&path, &name);
let rules_path = CloudConverter::convert(&rules_path);

View File

@ -221,7 +221,7 @@ impl<M: ConnectionManager> Server<M> {
fn root_path(&self) -> Result<ObjectStorePath> {
let id = self.require_id()?;
let mut path = ObjectStorePath::default();
let mut path = self.store.new_path();
path.push_dir(format!("{}", id));
Ok(path)
}
@ -610,11 +610,13 @@ mod tests {
.await
.expect("failed to create database");
let mut rules_path = store.new_path();
rules_path.push_all_dirs(&["1", name]);
rules_path.set_file_name("rules.json");
let read_data = server
.store
.get(&ObjectStorePath::from_cloud_unchecked(
"1/bananas/rules.json",
))
.get(&rules_path)
.await
.unwrap()
.map_ok(|b| bytes::BytesMut::from(&b[..]))
@ -633,10 +635,7 @@ mod tests {
.await
.expect("failed to create 2nd db");
store
.list_with_delimiter(&ObjectStorePath::from_cloud_unchecked(""))
.await
.unwrap();
store.list_with_delimiter(&store.new_path()).await.unwrap();
let manager = TestConnectionManager::new();
let server2 = Server::new(manager, store);
@ -895,7 +894,10 @@ partition_key:
// write lines should have caused a segment rollover and persist, wait
tokio::task::yield_now().await;
let path = ObjectStorePath::from_cloud_unchecked("1/my_db/wal/000/000/001.segment");
let mut path = store.new_path();
path.push_all_dirs(&["1", "my_db", "wal", "000", "000"]);
path.set_file_name("001.segment");
let data = store
.get(&path)
.await

View File

@ -365,10 +365,10 @@ mem,host=A,region=west used=45 1
let store = Arc::new(ObjectStore::new_in_memory(InMemory::new()));
let chunk = Arc::new(chunk);
let (tx, rx) = tokio::sync::oneshot::channel();
let mut metadata_path = ObjectStorePath::default();
let mut metadata_path = store.new_path();
metadata_path.push_dir("meta");
let mut data_path = ObjectStorePath::default();
let mut data_path = store.new_path();
data_path.push_dir("data");
let snapshot = snapshot_chunk(
@ -418,10 +418,10 @@ mem,host=A,region=west used=45 1
let store = Arc::new(ObjectStore::new_in_memory(InMemory::new()));
let chunk = Arc::new(ChunkWB::new(11));
let mut metadata_path = ObjectStorePath::default();
let mut metadata_path = store.new_path();
metadata_path.push_dir("meta");
let mut data_path = ObjectStorePath::default();
let mut data_path = store.new_path();
data_path.push_dir("data");
let snapshot = Snapshot::new("testaroo", metadata_path, data_path, store, chunk, tables);

View File

@ -17,7 +17,6 @@ use data_types::{
DatabaseName,
};
use influxdb_line_protocol::parse_lines;
use object_store::path::ObjectStorePath;
use query::{frontend::sql::SQLQueryPlanner, Database, DatabaseStore};
use server::{ConnectionManager, Server as AppServer};
@ -715,7 +714,9 @@ async fn snapshot_partition<M: ConnectionManager + Send + Sync + Debug + 'static
bucket: &snapshot.bucket,
})?;
let mut metadata_path = ObjectStorePath::default();
let store = server.store.clone();
let mut metadata_path = store.new_path();
metadata_path.push_dir(&db_name.to_string());
let mut data_path = metadata_path.clone();
metadata_path.push_dir("meta");
@ -726,7 +727,7 @@ async fn snapshot_partition<M: ConnectionManager + Send + Sync + Debug + 'static
let snapshot = server::snapshot::snapshot_chunk(
metadata_path,
data_path,
server.store.clone(),
store,
partition_key,
chunk,
None,