From 6d5cb9c11708cf8aac8dbfbf856ea6b61cab366a Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Thu, 12 Aug 2021 12:52:39 -0400 Subject: [PATCH] refactor: Extract a ParquetFilePath to handle paths to parquet files in a db's object store --- Cargo.lock | 4 +- iox_object_store/Cargo.toml | 2 + iox_object_store/src/lib.rs | 204 ++++++++++---- iox_object_store/src/paths.rs | 462 +++++++++++++++++++++++++++++++ parquet_file/src/catalog.rs | 265 ++++++++++-------- parquet_file/src/chunk.rs | 38 ++- parquet_file/src/cleanup.rs | 124 +++------ parquet_file/src/rebuild.rs | 38 +-- parquet_file/src/storage.rs | 67 +---- parquet_file/src/test_utils.rs | 22 +- server/src/db.rs | 91 ++---- server/src/db/chunk.rs | 4 +- server/src/db/lifecycle/drop.rs | 17 +- server/src/db/lifecycle/write.rs | 13 +- server/src/db/load.rs | 16 +- 15 files changed, 924 insertions(+), 443 deletions(-) create mode 100644 iox_object_store/src/paths.rs diff --git a/Cargo.lock b/Cargo.lock index 7841b6d3ac..de657f1731 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1445,7 +1445,7 @@ dependencies = [ [[package]] name = "heappy" version = "0.1.0" -source = "git+https://github.com/mkmik/heappy?rev=c8fe1fefbaefd536b0137435fce8d90a98a184de#c8fe1fefbaefd536b0137435fce8d90a98a184de" +source = "git+https://github.com/mkmik/heappy?rev=82a383128e484039cc2f31476e6289bed48a6701#82a383128e484039cc2f31476e6289bed48a6701" dependencies = [ "backtrace", "bytes", @@ -1823,8 +1823,10 @@ dependencies = [ "data_types", "futures", "object_store", + "snafu", "tokio", "tokio-stream", + "uuid", ] [[package]] diff --git a/iox_object_store/Cargo.toml b/iox_object_store/Cargo.toml index a606f4d543..f8c4005693 100644 --- a/iox_object_store/Cargo.toml +++ b/iox_object_store/Cargo.toml @@ -9,5 +9,7 @@ bytes = "1.0" data_types = { path = "../data_types" } futures = "0.3" object_store = { path = "../object_store" } +snafu = "0.6" tokio = { version = "1.0", features = ["macros", "time"] } tokio-stream = "0.1" +uuid = { version = "0.8", features = ["serde", "v4"] } diff --git a/iox_object_store/src/lib.rs b/iox_object_store/src/lib.rs index 069a0c85c9..2f7bb04036 100644 --- a/iox_object_store/src/lib.rs +++ b/iox_object_store/src/lib.rs @@ -16,7 +16,7 @@ use bytes::Bytes; use data_types::{server_id::ServerId, DatabaseName}; -use futures::{stream::BoxStream, Stream, StreamExt}; +use futures::{stream::BoxStream, Stream, StreamExt, TryStreamExt}; use object_store::{ path::{parsed::DirsAndFileName, ObjectStorePath, Path}, ObjectStore, ObjectStoreApi, Result, @@ -25,6 +25,10 @@ use std::{io, sync::Arc}; use tokio::sync::mpsc::channel; use tokio_stream::wrappers::ReceiverStream; +mod paths; +use paths::{DataPath, RootPath}; +pub use paths::{ParquetFilePath, ParquetFilePathParseError}; + /// Handles persistence of data for a particular database. Writes within its directory/prefix. /// /// This wrapper on top of an `ObjectStore` maps IOx specific concepts to ObjectStore locations @@ -34,6 +38,7 @@ pub struct IoxObjectStore { server_id: ServerId, database_name: String, // TODO: use data_types DatabaseName? root_path: RootPath, + data_path: DataPath, } impl IoxObjectStore { @@ -45,11 +50,13 @@ impl IoxObjectStore { database_name: &DatabaseName<'_>, ) -> Self { let root_path = RootPath::new(inner.new_path(), server_id, database_name); + let data_path = DataPath::new(&root_path); Self { inner, server_id, database_name: database_name.into(), root_path, + data_path, } } @@ -74,22 +81,6 @@ impl IoxObjectStore { path } - /// Location where parquet data goes to. - /// - /// Schema currently is: - /// - /// ```text - /// //data/ - /// ``` - // TODO: avoid leaking this outside this crate - pub fn data_path(&self) -> Path { - let mut path = self.inner.new_path(); - path.push_dir(self.server_id.to_string()); - path.push_dir(&self.database_name); - path.push_dir("data"); - path - } - /// Store this data in this database's object store. pub async fn put(&self, location: &Path, bytes: S, length: Option) -> Result<()> where @@ -103,6 +94,53 @@ impl IoxObjectStore { Ok(self.list(Some(&self.catalog_path())).await?.boxed()) } + /// List all parquet file paths in object storage for this database. + pub async fn parquet_files(&self) -> Result>>> { + Ok(self + .list(Some(&self.data_path.inner)) + .await? + .map_ok(move |list| { + list.into_iter() + // This `flat_map` ignores any filename in the data_path we couldn't parse as + // a ParquetFilePath + .flat_map(ParquetFilePath::from_absolute) + .collect() + }) + .boxed()) + } + + /// Get the data in this relative path in this database's object store. + pub async fn get_parquet_file( + &self, + location: &ParquetFilePath, + ) -> Result>> { + let full_path = self.data_path.join(location); + + self.inner.get(&full_path).await + } + + /// Store the data for this parquet file in this database's object store. + pub async fn put_parquet_file( + &self, + location: &ParquetFilePath, + bytes: S, + length: Option, + ) -> Result<()> + where + S: Stream> + Send + Sync + 'static, + { + let full_path = self.data_path.join(location); + + self.inner.put(&full_path, bytes, length).await + } + + /// Remove the data for this parquet file from this database's object store + pub async fn delete_parquet_file(&self, location: &ParquetFilePath) -> Result<()> { + let full_path = self.data_path.join(location); + + self.inner.delete(&full_path).await + } + /// List the relative paths in this database's object store. pub async fn list( &self, @@ -148,56 +186,122 @@ impl IoxObjectStore { } } -/// A database-specific object store path that all `IoxPath`s should be within. -#[derive(Debug, Clone)] -struct RootPath { - root: Path, -} - -impl RootPath { - /// How the root of a database is defined in object storage. - fn new(mut root: Path, server_id: ServerId, database_name: &DatabaseName<'_>) -> Self { - root.push_dir(server_id.to_string()); - root.push_dir(database_name.as_str()); - Self { root } - } -} - #[cfg(test)] mod tests { use super::*; + use data_types::chunk_metadata::ChunkAddr; + use object_store::{ObjectStore, ObjectStoreApi}; use std::num::NonZeroU32; + use uuid::Uuid; /// Creates new test server ID fn make_server_id() -> ServerId { ServerId::new(NonZeroU32::new(1).unwrap()) } - /// Creates a new in-memory object store. These tests rely on the `Path`s being of type - /// `DirsAndFileName` and thus using object_store::path::DELIMITER as the separator + /// Creates a new in-memory object store fn make_object_store() -> Arc { Arc::new(ObjectStore::new_in_memory()) } - #[test] - fn catalog_path_is_relative_to_db_root() { - let server_id = make_server_id(); - let database_name = DatabaseName::new("clouds").unwrap(); - let iox_object_store = IoxObjectStore::new(make_object_store(), server_id, &database_name); - assert_eq!( - iox_object_store.catalog_path().to_string(), - "mem:1/clouds/transactions/" - ); + async fn add_file(object_store: &ObjectStore, location: &Path) { + let data = Bytes::from("arbitrary data"); + let stream_data = std::io::Result::Ok(data.clone()); + + object_store + .put( + location, + futures::stream::once(async move { stream_data }), + None, + ) + .await + .unwrap(); } - #[test] - fn data_path_is_relative_to_db_root() { + async fn parquet_files(iox_object_store: &IoxObjectStore) -> Vec { + iox_object_store + .parquet_files() + .await + .unwrap() + .try_collect::>() + .await + .unwrap() + .into_iter() + .flatten() + .collect() + } + + async fn add_parquet_file(iox_object_store: &IoxObjectStore, location: &ParquetFilePath) { + let data = Bytes::from("arbitrary data"); + let stream_data = std::io::Result::Ok(data.clone()); + + iox_object_store + .put_parquet_file( + location, + futures::stream::once(async move { stream_data }), + None, + ) + .await + .unwrap(); + } + + #[tokio::test] + async fn only_lists_relevant_parquet_files() { + let object_store = make_object_store(); let server_id = make_server_id(); let database_name = DatabaseName::new("clouds").unwrap(); - let iox_object_store = IoxObjectStore::new(make_object_store(), server_id, &database_name); - assert_eq!( - iox_object_store.data_path().to_string(), - "mem:1/clouds/data/" - ); + let iox_object_store = + IoxObjectStore::new(Arc::clone(&object_store), server_id, &database_name); + let uuid = Uuid::new_v4(); + + // Put a non-database file in + let mut path = object_store.new_path(); + path.push_dir("foo"); + add_file(&object_store, &path).await; + + // Put a file for some other server in + let mut path = object_store.new_path(); + path.push_dir("12345"); + add_file(&object_store, &path).await; + + // Put a file for some other database in + let mut path = object_store.new_path(); + path.push_dir(server_id.to_string()); + path.push_dir("thunder"); + add_file(&object_store, &path).await; + + // Put a file in the database dir but not the data dir + let mut path = object_store.new_path(); + path.push_dir(server_id.to_string()); + path.push_dir(database_name.to_string()); + path.set_file_name(&format!("111.{}.parquet", uuid)); + add_file(&object_store, &path).await; + + // Put files in the data dir whose names are in the wrong format + let mut path = object_store.new_path(); + path.push_dir(server_id.to_string()); + path.push_dir(database_name.to_string()); + path.set_file_name("111.parquet"); + add_file(&object_store, &path).await; + path.set_file_name(&format!("111.{}.xls", uuid)); + add_file(&object_store, &path).await; + + // Parquet files should be empty + let pf = parquet_files(&iox_object_store).await; + assert!(pf.is_empty(), "{:?}", pf); + + // Add a real parquet file + let chunk_addr = ChunkAddr { + db_name: "clouds".into(), + table_name: "my_table".into(), + partition_key: "my_partition".into(), + chunk_id: 13, + }; + let p1 = ParquetFilePath::new(&chunk_addr); + add_parquet_file(&iox_object_store, &p1).await; + + // Only the real file should be returned + let pf = parquet_files(&iox_object_store).await; + assert_eq!(&pf, &[p1]); } } diff --git a/iox_object_store/src/paths.rs b/iox_object_store/src/paths.rs new file mode 100644 index 0000000000..5e535d7f3c --- /dev/null +++ b/iox_object_store/src/paths.rs @@ -0,0 +1,462 @@ +//! Paths for specific types of files within a database's object storage. + +use data_types::{chunk_metadata::ChunkAddr, server_id::ServerId, DatabaseName}; +use object_store::{ + path::{parsed::DirsAndFileName, ObjectStorePath, Path}, + Result, +}; +use snafu::{ensure, OptionExt, ResultExt, Snafu}; +use std::sync::Arc; +use uuid::Uuid; + +/// A database-specific object store path that all `IoxPath`s should be within. +/// This should not be leaked outside this crate. +#[derive(Debug, Clone)] +pub struct RootPath { + inner: Path, +} + +impl RootPath { + /// How the root of a database is defined in object storage. + pub fn new(mut root: Path, server_id: ServerId, database_name: &DatabaseName<'_>) -> Self { + root.push_dir(server_id.to_string()); + root.push_dir(database_name.as_str()); + Self { inner: root } + } + + pub fn join(&self, dir: &str) -> Path { + let mut result = self.inner.clone(); + result.push_dir(dir); + result + } +} + +/// A database-specific object store path for all data files. This should not be leaked outside +/// this crate. +#[derive(Debug, Clone)] +pub struct DataPath { + pub inner: Path, +} + +impl DataPath { + pub fn new(root_path: &RootPath) -> Self { + Self { + inner: root_path.join("data"), + } + } + + pub fn join(&self, parquet_file_path: &ParquetFilePath) -> Path { + let mut result = self.inner.clone(); + let relative = parquet_file_path.relative_dirs_and_file_name(); + for part in relative.directories { + result.push_dir(part.to_string()); + } + result.set_file_name( + relative + .file_name + .expect("Parquet file paths have filenames") + .to_string(), + ); + result + } +} + +/// Location of a Parquet file within a database's object store. +/// The exact format is an implementation detail and is subject to change. +#[derive(Debug, Clone, Eq, PartialEq, Hash, Ord, PartialOrd)] +pub struct ParquetFilePath { + table_name: Arc, + partition_key: Arc, + chunk_id: u32, + uuid: Uuid, +} + +impl ParquetFilePath { + /// Create a location for this chunk's parquet file. Calling this twice on the same `ChunkAddr` + /// will return different `ParquetFilePaths`. + pub fn new(chunk_addr: &ChunkAddr) -> Self { + Self { + table_name: Arc::clone(&chunk_addr.table_name), + partition_key: Arc::clone(&chunk_addr.partition_key), + chunk_id: chunk_addr.chunk_id, + // generate random UUID so that files are unique and never overwritten + uuid: Uuid::new_v4(), + } + } + + /// Turn this into directories and file names to be added to a root path or to be serialized + /// in protobuf. + pub fn relative_dirs_and_file_name(&self) -> DirsAndFileName { + let mut result = DirsAndFileName::default(); + result.push_all_dirs(&[self.table_name.as_ref(), self.partition_key.as_ref()]); + result.set_file_name(format!("{}.{}.parquet", self.chunk_id, self.uuid)); + result + } + + /// Create from serialized protobuf strings. + pub fn from_relative_dirs_and_file_name( + dirs_and_file_name: &DirsAndFileName, + ) -> Result { + let mut directories = dirs_and_file_name.directories.iter(); + let table_name = directories + .next() + .context(MissingTableName)? + .to_string() + .into(); + let partition_key = directories + .next() + .context(MissingPartitionKey)? + .to_string() + .into(); + + ensure!(directories.next().is_none(), UnexpectedDirectory); + + let file_name = dirs_and_file_name + .file_name + .as_ref() + .context(MissingChunkId)? + .to_string(); + let mut parts = file_name.split('.'); + let chunk_id = parts + .next() + .context(MissingChunkId)? + .parse() + .context(InvalidChunkId)?; + let uuid = parts + .next() + .context(MissingUuid)? + .parse() + .context(InvalidUuid)?; + let ext = parts.next().context(MissingExtension)?; + ensure!(ext == "parquet", InvalidExtension { ext }); + ensure!(parts.next().is_none(), UnexpectedExtension); + + Ok(Self { + table_name, + partition_key, + chunk_id, + uuid, + }) + } + + // Deliberately pub(crate); this transformation should only happen within this crate + pub(crate) fn from_absolute(absolute_path: Path) -> Result { + let absolute_path: DirsAndFileName = absolute_path.into(); + + let mut absolute_dirs = absolute_path.directories.into_iter().fuse(); + + // The number of `next`s here needs to match the total number of directories in + // iox_object_store data_paths + absolute_dirs.next(); // server id + absolute_dirs.next(); // database name + absolute_dirs.next(); // "data" + + let remaining = DirsAndFileName { + directories: absolute_dirs.collect(), + file_name: absolute_path.file_name, + }; + + Self::from_relative_dirs_and_file_name(&remaining) + } +} + +impl From<&Self> for ParquetFilePath { + fn from(borrowed: &Self) -> Self { + borrowed.clone() + } +} + +#[derive(Snafu, Debug, PartialEq)] +#[allow(missing_docs)] +pub enum ParquetFilePathParseError { + #[snafu(display("Could not find required table name"))] + MissingTableName, + + #[snafu(display("Could not find required partition key"))] + MissingPartitionKey, + + #[snafu(display("Too many directories found"))] + UnexpectedDirectory, + + #[snafu(display("Could not find required chunk id"))] + MissingChunkId, + + #[snafu(display("Could not parse chunk id: {}", source))] + InvalidChunkId { source: std::num::ParseIntError }, + + #[snafu(display("Could not find required UUID"))] + MissingUuid, + + #[snafu(display("Could not parse UUID: {}", source))] + InvalidUuid { source: uuid::Error }, + + #[snafu(display("Could not find required file extension"))] + MissingExtension, + + #[snafu(display("Extension should have been `parquet`, instead found `{}`", ext))] + InvalidExtension { ext: String }, + + #[snafu(display("Too many extensions found"))] + UnexpectedExtension, +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::IoxObjectStore; + use object_store::{ObjectStore, ObjectStoreApi}; + use std::num::NonZeroU32; + + /// Creates new test server ID + fn make_server_id() -> ServerId { + ServerId::new(NonZeroU32::new(1).unwrap()) + } + + /// Creates a new in-memory object store. These tests rely on the `Path`s being of type + /// `DirsAndFileName` and thus using object_store::path::DELIMITER as the separator + fn make_object_store() -> Arc { + Arc::new(ObjectStore::new_in_memory()) + } + + #[test] + fn root_path_contains_server_id_and_db_name() { + let server_id = make_server_id(); + let database_name = DatabaseName::new("clouds").unwrap(); + let iox_object_store = IoxObjectStore::new(make_object_store(), server_id, &database_name); + + assert_eq!(iox_object_store.root_path.inner.to_string(), "1/clouds/") + } + + #[test] + fn root_path_join_concatenates() { + let server_id = make_server_id(); + let database_name = DatabaseName::new("clouds").unwrap(); + let iox_object_store = IoxObjectStore::new(make_object_store(), server_id, &database_name); + + let path = iox_object_store.root_path.join("foo"); + assert_eq!(path.to_string(), "1/clouds/foo/"); + } + + #[test] + fn catalog_path_is_relative_to_db_root() { + let server_id = make_server_id(); + let database_name = DatabaseName::new("clouds").unwrap(); + let iox_object_store = IoxObjectStore::new(make_object_store(), server_id, &database_name); + assert_eq!( + iox_object_store.catalog_path().to_string(), + "1/clouds/transactions/" + ); + } + + #[test] + fn data_path_is_relative_to_db_root() { + let server_id = make_server_id(); + let database_name = DatabaseName::new("clouds").unwrap(); + let iox_object_store = IoxObjectStore::new(make_object_store(), server_id, &database_name); + assert_eq!( + iox_object_store.data_path.inner.to_string(), + "1/clouds/data/" + ); + } + + #[test] + fn test_parquet_file_paths_are_unique() { + let chunk_addr = ChunkAddr { + db_name: "clouds".into(), + table_name: "my_table".into(), + partition_key: "my_partition".into(), + chunk_id: 13, + }; + + let p1 = ParquetFilePath::new(&chunk_addr); + let p2 = ParquetFilePath::new(&chunk_addr); + assert_ne!(p1, p2); + } + + #[test] + fn test_parquet_file_path_deserialization() { + // Error cases + use ParquetFilePathParseError::*; + + let mut df = DirsAndFileName::default(); + let result = ParquetFilePath::from_relative_dirs_and_file_name(&df); + assert!(matches!(result, Err(MissingTableName)), "got {:?}", result); + + df.push_dir("foo"); + let result = ParquetFilePath::from_relative_dirs_and_file_name(&df); + assert!( + matches!(result, Err(MissingPartitionKey)), + "got {:?}", + result + ); + + df.push_dir("bar"); + let result = ParquetFilePath::from_relative_dirs_and_file_name(&df); + assert!( + matches!(result, Err(MissingChunkId { .. })), + "got {:?}", + result + ); + + let mut extra = df.clone(); + extra.push_dir("nope"); + let result = ParquetFilePath::from_relative_dirs_and_file_name(&extra); + assert!( + matches!(result, Err(UnexpectedDirectory)), + "got {:?}", + result + ); + + df.set_file_name("bleh"); + let result = ParquetFilePath::from_relative_dirs_and_file_name(&df); + assert!( + matches!(result, Err(InvalidChunkId { .. })), + "got {:?}", + result + ); + + df.set_file_name("3"); + let result = ParquetFilePath::from_relative_dirs_and_file_name(&df); + assert!( + matches!(result, Err(MissingUuid { .. })), + "got {:?}", + result + ); + + df.set_file_name("3.nope"); + let result = ParquetFilePath::from_relative_dirs_and_file_name(&df); + assert!( + matches!(result, Err(InvalidUuid { .. })), + "got {:?}", + result + ); + + let uuid = Uuid::new_v4(); + df.set_file_name(&format!("3.{}", uuid)); + let result = ParquetFilePath::from_relative_dirs_and_file_name(&df); + assert!(matches!(result, Err(MissingExtension)), "got {:?}", result); + + df.set_file_name(&format!("3.{}.exe", uuid)); + let result = ParquetFilePath::from_relative_dirs_and_file_name(&df); + assert!( + matches!(result, Err(InvalidExtension { .. })), + "got {:?}", + result + ); + + df.set_file_name(&format!("3.{}.parquet.v6", uuid)); + let result = ParquetFilePath::from_relative_dirs_and_file_name(&df); + assert!( + matches!(result, Err(UnexpectedExtension)), + "got {:?}", + result + ); + + // Success case + df.set_file_name(&format!("3.{}.parquet", uuid)); + let result = ParquetFilePath::from_relative_dirs_and_file_name(&df).unwrap(); + assert_eq!( + result, + ParquetFilePath { + table_name: "foo".into(), + partition_key: "bar".into(), + chunk_id: 3, + uuid + } + ); + let round_trip = result.relative_dirs_and_file_name(); + assert_eq!(round_trip, df); + } + + #[test] + fn parquet_file_from_absolute() { + let object_store = make_object_store(); + + // Error cases + use ParquetFilePathParseError::*; + + let mut path = object_store.new_path(); + // incorrect directories are fine, we're assuming that list(data_path) scoped to the + // right directories so we don't check again on the way out + path.push_all_dirs(&["foo", "bar", "baz", "}*", "aoeu"]); + path.set_file_name("rules.pb"); + let result = ParquetFilePath::from_absolute(path); + assert!( + matches!(result, Err(InvalidChunkId { .. })), + "got: {:?}", + result + ); + + let mut path = object_store.new_path(); + path.push_all_dirs(&["foo", "bar", "baz", "}*", "aoeu"]); + // missing file name + let result = ParquetFilePath::from_absolute(path); + assert!(matches!(result, Err(MissingChunkId)), "got: {:?}", result); + + // Success case + let uuid = Uuid::new_v4(); + let mut path = object_store.new_path(); + path.push_all_dirs(&["foo", "bar", "baz", "}*", "aoeu"]); + path.set_file_name(&format!("10.{}.parquet", uuid)); + let result = ParquetFilePath::from_absolute(path); + assert_eq!( + result.unwrap(), + ParquetFilePath { + table_name: "}*".into(), + partition_key: "aoeu".into(), + chunk_id: 10, + uuid + } + ); + } + + #[test] + fn parquet_file_relative_dirs_and_file_path() { + let uuid = Uuid::new_v4(); + let pfp = ParquetFilePath { + table_name: "}*".into(), + partition_key: "aoeu".into(), + chunk_id: 10, + uuid, + }; + let dirs_and_file_name = pfp.relative_dirs_and_file_name(); + assert_eq!( + dirs_and_file_name.to_string(), + format!("%7D%2A/aoeu/10.{}.parquet", uuid) + ); + let round_trip = + ParquetFilePath::from_relative_dirs_and_file_name(&dirs_and_file_name).unwrap(); + assert_eq!(pfp, round_trip); + } + + #[test] + fn data_path_join_with_parquet_file_path() { + let server_id = make_server_id(); + let database_name = DatabaseName::new("clouds").unwrap(); + let object_store = make_object_store(); + let iox_object_store = + IoxObjectStore::new(Arc::clone(&object_store), server_id, &database_name); + + let uuid = Uuid::new_v4(); + let pfp = ParquetFilePath { + table_name: "}*".into(), + partition_key: "aoeu".into(), + chunk_id: 10, + uuid, + }; + + let path = iox_object_store.data_path.join(&pfp); + + let mut expected_path = object_store.new_path(); + expected_path.push_all_dirs(&[ + &server_id.to_string(), + database_name.as_str(), + "data", + "}*", + "aoeu", + ]); + expected_path.set_file_name(&format!("10.{}.parquet", uuid)); + + assert_eq!(path, expected_path); + } +} diff --git a/parquet_file/src/catalog.rs b/parquet_file/src/catalog.rs index b4662f69f2..e72a943be2 100644 --- a/parquet_file/src/catalog.rs +++ b/parquet_file/src/catalog.rs @@ -4,7 +4,7 @@ use bytes::Bytes; use chrono::{DateTime, Utc}; use futures::TryStreamExt; use generated_types::influxdata::iox::catalog::v1 as proto; -use iox_object_store::IoxObjectStore; +use iox_object_store::{IoxObjectStore, ParquetFilePath, ParquetFilePathParseError}; use object_store::{ path::{parsed::DirsAndFileName, parts::PathPart, ObjectStorePath, Path}, ObjectStore, ObjectStoreApi, @@ -41,10 +41,14 @@ pub const CHECKPOINT_FILE_SUFFIX: &str = "ckpt"; #[derive(Debug, Snafu)] pub enum Error { #[snafu(display("Error during serialization: {}", source))] - Serialization { source: EncodeError }, + Serialization { + source: EncodeError, + }, #[snafu(display("Error during deserialization: {}", source))] - Deserialization { source: DecodeError }, + Deserialization { + source: DecodeError, + }, #[snafu(display("Error during store write operation: {}", source))] Write { @@ -57,14 +61,19 @@ pub enum Error { }, #[snafu(display("Missing transaction: {}", revision_counter))] - MissingTransaction { revision_counter: u64 }, + MissingTransaction { + revision_counter: u64, + }, #[snafu(display( "Wrong revision counter in transaction file: expected {} but found {}", expected, actual ))] - WrongTransactionRevision { expected: u64, actual: u64 }, + WrongTransactionRevision { + expected: u64, + actual: u64, + }, #[snafu(display( "Wrong UUID for transaction file (revision: {}): expected {} but found {}", @@ -91,7 +100,9 @@ pub enum Error { }, #[snafu(display("Cannot parse UUID: {}", source))] - UuidParse { source: uuid::Error }, + UuidParse { + source: uuid::Error, + }, #[snafu(display("UUID required but not provided"))] UuidRequired {}, @@ -119,19 +130,29 @@ pub enum Error { }, #[snafu(display("Upgrade path not implemented/supported: {}", format))] - UnsupportedUpgrade { format: String }, + UnsupportedUpgrade { + format: String, + }, #[snafu(display("Parquet already exists in catalog: {:?}", path))] - ParquetFileAlreadyExists { path: DirsAndFileName }, + ParquetFileAlreadyExists { + path: ParquetFilePath, + }, #[snafu(display("Parquet does not exist in catalog: {:?}", path))] - ParquetFileDoesNotExist { path: DirsAndFileName }, + ParquetFileDoesNotExist { + path: ParquetFilePath, + }, #[snafu(display("Cannot encode parquet metadata: {}", source))] - MetadataEncodingFailed { source: crate::metadata::Error }, + MetadataEncodingFailed { + source: crate::metadata::Error, + }, #[snafu(display("Cannot decode parquet metadata: {}", source))] - MetadataDecodingFailed { source: crate::metadata::Error }, + MetadataDecodingFailed { + source: crate::metadata::Error, + }, #[snafu( display("Cannot extract metadata from {:?}: {}", path, source), @@ -139,7 +160,7 @@ pub enum Error { )] MetadataExtractFailed { source: crate::metadata::Error, - path: DirsAndFileName, + path: ParquetFilePath, }, #[snafu( @@ -148,7 +169,7 @@ pub enum Error { )] SchemaError { source: Box, - path: DirsAndFileName, + path: ParquetFilePath, }, #[snafu( @@ -161,7 +182,7 @@ pub enum Error { )] ReplayPlanError { source: Box, - path: DirsAndFileName, + path: ParquetFilePath, }, #[snafu( @@ -170,7 +191,7 @@ pub enum Error { )] ChunkCreationFailed { source: crate::chunk::Error, - path: DirsAndFileName, + path: ParquetFilePath, }, #[snafu(display("Catalog already exists"))] @@ -180,13 +201,17 @@ pub enum Error { DateTimeRequired {}, #[snafu(display("Internal: Cannot parse datetime in serialized catalog: {}", source))] - DateTimeParseError { source: TryFromIntError }, + DateTimeParseError { + source: TryFromIntError, + }, #[snafu(display( "Internal: Cannot parse encoding in serialized catalog: {} is not a valid, specified variant", data ))] - EncodingParseError { data: i32 }, + EncodingParseError { + data: i32, + }, #[snafu(display( "Internal: Found wrong encoding in serialized catalog file: Expected {:?} but got {:?}", @@ -199,7 +224,13 @@ pub enum Error { }, #[snafu(display("Cannot commit transaction: {}", source))] - CommitError { source: Box }, + CommitError { + source: Box, + }, + + InvalidParquetFilePath { + source: ParquetFilePathParseError, + }, } pub type Result = std::result::Result; @@ -208,7 +239,7 @@ pub type Result = std::result::Result; #[derive(Debug, Clone)] pub struct CatalogParquetInfo { /// Path within this database. - pub path: DirsAndFileName, + pub path: ParquetFilePath, /// Size of the parquet file, in bytes pub file_size_bytes: usize, @@ -235,7 +266,7 @@ pub trait CatalogState { ) -> Result<()>; /// Remove parquet file from state. - fn remove(&mut self, path: DirsAndFileName) -> Result<()>; + fn remove(&mut self, path: &ParquetFilePath) -> Result<()>; } /// In-memory view of the preserved catalog. @@ -314,8 +345,8 @@ impl PreservedCatalog { /// Create new catalog w/o any data. /// - /// An empty transaction will be used to mark the catalog start so that concurrent open but still-empty catalogs can - /// easily be detected. + /// An empty transaction will be used to mark the catalog start so that concurrent open but + /// still-empty catalogs can easily be detected. pub async fn new_empty( iox_object_store: Arc, state_data: S::EmptyInput, @@ -688,22 +719,24 @@ fn parse_uuid_required(s: &str) -> Result { parse_uuid(s)?.context(UuidRequired {}) } -/// Parse [`DirsAndFilename`](object_store::path::parsed::DirsAndFileName) from protobuf. -fn parse_dirs_and_filename(proto: &Option) -> Result { - let proto = proto.as_ref().context(PathRequired)?; - - Ok(DirsAndFileName { +/// Parse [`ParquetFilePath`](iox_object_store::ParquetFilePath) from protobuf. +fn parse_dirs_and_filename(proto: &proto::Path) -> Result { + let dirs_and_file_name = DirsAndFileName { directories: proto .directories .iter() .map(|s| PathPart::from(&s[..])) .collect(), file_name: Some(PathPart::from(&proto.file_name[..])), - }) + }; + + ParquetFilePath::from_relative_dirs_and_file_name(&dirs_and_file_name) + .context(InvalidParquetFilePath) } -/// Store [`DirsAndFilename`](object_store::path::parsed::DirsAndFileName) as protobuf. -fn unparse_dirs_and_filename(path: &DirsAndFileName) -> proto::Path { +/// Store [`ParquetFilePath`](iox_object_store::ParquetFilePath) as protobuf. +fn unparse_dirs_and_filename(path: &ParquetFilePath) -> proto::Path { + let path = path.relative_dirs_and_file_name(); proto::Path { directories: path .directories @@ -787,11 +820,11 @@ impl OpenTransaction { /// Handle the given action and populate data to the catalog state. /// - /// The deserializes the action state and passes it to the correct method in [`CatalogState`]. + /// This deserializes the action state and passes it to the correct method in [`CatalogState`]. /// - /// Note that this method is primarily for replaying transactions and will NOT append the given action to the - /// current transaction. If you want to store the given action (e.g. during an in-progress transaction), use - /// [`record_action`](Self::record_action). + /// Note that this method is primarily for replaying transactions and will NOT append the given + /// action to the current transaction. If you want to store the given action (e.g. during an + /// in-progress transaction), use [`record_action`](Self::record_action). fn handle_action( state: &mut S, action: &proto::transaction::action::Action, @@ -808,7 +841,7 @@ impl OpenTransaction { .fail()?; } proto::transaction::action::Action::AddParquet(a) => { - let path = parse_dirs_and_filename(&a.path)?; + let path = parse_dirs_and_filename(a.path.as_ref().context(PathRequired)?)?; let file_size_bytes = a.file_size_bytes as usize; let metadata = @@ -825,8 +858,8 @@ impl OpenTransaction { )?; } proto::transaction::action::Action::RemoveParquet(a) => { - let path = parse_dirs_and_filename(&a.path)?; - state.remove(path)?; + let path = parse_dirs_and_filename(a.path.as_ref().context(PathRequired)?)?; + state.remove(&path)?; } }; Ok(()) @@ -932,14 +965,16 @@ impl OpenTransaction { /// Structure that holds all information required to create a checkpoint. /// -/// Note that while checkpoint are addressed using the same schema as we use for transaction (revision counter, UUID), -/// they contain the changes at the end (aka including) the transaction they refer. +/// Note that while checkpoint are addressed using the same schema as we use for transaction +/// (revision counter, UUID), they contain the changes at the end (aka including) the transaction +/// they refer. #[derive(Debug)] pub struct CheckpointData { - /// List of all Parquet files that are currently (i.e. by the current version) tracked by the catalog. + /// List of all Parquet files that are currently (i.e. by the current version) tracked by the + /// catalog. /// /// If a file was once added but later removed it MUST NOT appear in the result. - pub files: HashMap, + pub files: HashMap, } /// Handle for an open uncommitted transaction. @@ -1082,7 +1117,7 @@ impl<'c> TransactionHandle<'c> { /// Remove a parquet file from the catalog. /// /// Removing files that do not exist or were already removed will result in an error. - pub fn remove_parquet(&mut self, path: &DirsAndFileName) { + pub fn remove_parquet(&mut self, path: &ParquetFilePath) { self.transaction .as_mut() .expect("transaction handle w/o transaction?!") @@ -1208,14 +1243,15 @@ impl<'c> Debug for CheckpointHandle<'c> { pub mod test_helpers { use super::*; - use crate::test_utils::{chunk_addr, make_iox_object_store, make_metadata}; - use object_store::parsed_path; + use crate::test_utils::{ + chunk_addr, make_iox_object_store, make_metadata, make_parquet_file_path, + }; /// In-memory catalog state, for testing. #[derive(Clone, Debug)] pub struct TestCatalogState { /// Map of all parquet files that are currently pregistered. - pub parquet_files: HashMap, + pub parquet_files: HashMap, } impl TestCatalogState { @@ -1255,8 +1291,8 @@ pub mod test_helpers { Ok(()) } - fn remove(&mut self, path: DirsAndFileName) -> Result<()> { - match self.parquet_files.entry(path) { + fn remove(&mut self, path: &ParquetFilePath) -> Result<()> { + match self.parquet_files.entry(path.clone()) { Occupied(o) => { o.remove(); } @@ -1302,15 +1338,14 @@ pub mod test_helpers { PreservedCatalog::new_empty::(Arc::clone(&iox_object_store), state_data) .await .unwrap(); - let mut expected = HashMap::new(); + let mut expected: HashMap = HashMap::new(); assert_checkpoint(&state, &f, &expected); // add files let mut chunk_id_watermark = 5; { for chunk_id in 0..chunk_id_watermark { - let path = parsed_path!(format!("chunk_{}", chunk_id).as_ref()); - let (_, metadata) = + let (path, metadata) = make_metadata(&iox_object_store, "ok", chunk_addr(chunk_id)).await; state .add( @@ -1329,16 +1364,15 @@ pub mod test_helpers { // remove files { - let path = parsed_path!("chunk_1"); - state.remove(path.clone()).unwrap(); + let path = expected.keys().next().unwrap().clone(); + state.remove(&path).unwrap(); expected.remove(&path); } assert_checkpoint(&state, &f, &expected); // add and remove in the same transaction { - let path = parsed_path!(format!("chunk_{}", chunk_id_watermark).as_ref()); - let (_, metadata) = + let (path, metadata) = make_metadata(&iox_object_store, "ok", chunk_addr(chunk_id_watermark)).await; state .add( @@ -1350,23 +1384,23 @@ pub mod test_helpers { }, ) .unwrap(); - state.remove(path.clone()).unwrap(); + state.remove(&path).unwrap(); chunk_id_watermark += 1; } assert_checkpoint(&state, &f, &expected); // remove and add in the same transaction { - let path = parsed_path!("chunk_2"); - let (_, metadata) = make_metadata(&iox_object_store, "ok", chunk_addr(2)).await; - state.remove(path.clone()).unwrap(); + let path = expected.keys().next().unwrap().clone(); + let metadata = expected.get(&path).unwrap(); + state.remove(&path).unwrap(); state .add( Arc::clone(&iox_object_store), CatalogParquetInfo { path: path.clone(), file_size_bytes: 33, - metadata: Arc::new(metadata), + metadata: Arc::clone(metadata), }, ) .unwrap(); @@ -1375,8 +1409,7 @@ pub mod test_helpers { // add, remove, add in the same transaction { - let path = parsed_path!(format!("chunk_{}", chunk_id_watermark).as_ref()); - let (_, metadata) = + let (path, metadata) = make_metadata(&iox_object_store, "ok", chunk_addr(chunk_id_watermark)).await; state .add( @@ -1388,7 +1421,7 @@ pub mod test_helpers { }, ) .unwrap(); - state.remove(path.clone()).unwrap(); + state.remove(&path).unwrap(); state .add( Arc::clone(&iox_object_store), @@ -1406,20 +1439,20 @@ pub mod test_helpers { // remove, add, remove in same transaction { - let path = parsed_path!("chunk_2"); - let (_, metadata) = make_metadata(&iox_object_store, "ok", chunk_addr(2)).await; - state.remove(path.clone()).unwrap(); + let path = expected.keys().next().unwrap().clone(); + let metadata = expected.get(&path).unwrap(); + state.remove(&path).unwrap(); state .add( Arc::clone(&iox_object_store), CatalogParquetInfo { path: path.clone(), file_size_bytes: 33, - metadata: Arc::new(metadata), + metadata: Arc::clone(metadata), }, ) .unwrap(); - state.remove(path.clone()).unwrap(); + state.remove(&path).unwrap(); expected.remove(&path); } assert_checkpoint(&state, &f, &expected); @@ -1427,7 +1460,7 @@ pub mod test_helpers { // error handling, no real opt { // already exists (should also not change the metadata) - let path = parsed_path!("chunk_0"); + let path = expected.keys().next().unwrap(); let (_, metadata) = make_metadata(&iox_object_store, "fail", chunk_addr(0)).await; let err = state .add( @@ -1442,8 +1475,8 @@ pub mod test_helpers { assert!(matches!(err, Error::ParquetFileAlreadyExists { .. })); // does not exist - let path = parsed_path!(format!("chunk_{}", chunk_id_watermark).as_ref()); - let err = state.remove(path).unwrap_err(); + let path = make_parquet_file_path(); + let err = state.remove(&path).unwrap_err(); assert!(matches!(err, Error::ParquetFileDoesNotExist { .. })); chunk_id_watermark += 1; } @@ -1452,7 +1485,7 @@ pub mod test_helpers { // error handling, still something works { // already exists (should also not change the metadata) - let path = parsed_path!("chunk_0"); + let path = expected.keys().next().unwrap(); let (_, metadata) = make_metadata(&iox_object_store, "fail", chunk_addr(0)).await; let err = state .add( @@ -1467,8 +1500,7 @@ pub mod test_helpers { assert!(matches!(err, Error::ParquetFileAlreadyExists { .. })); // this transaction will still work - let path = parsed_path!(format!("chunk_{}", chunk_id_watermark).as_ref()); - let (_, metadata) = + let (path, metadata) = make_metadata(&iox_object_store, "ok", chunk_addr(chunk_id_watermark)).await; state .add( @@ -1488,7 +1520,7 @@ pub mod test_helpers { .add( Arc::clone(&iox_object_store), CatalogParquetInfo { - path: path.clone(), + path, file_size_bytes: 33, metadata: Arc::new(metadata), }, @@ -1497,18 +1529,18 @@ pub mod test_helpers { assert!(matches!(err, Error::ParquetFileAlreadyExists { .. })); // does not exist - let path = parsed_path!(format!("chunk_{}", chunk_id_watermark).as_ref()); - let err = state.remove(path).unwrap_err(); + let path = make_parquet_file_path(); + let err = state.remove(&path).unwrap_err(); assert!(matches!(err, Error::ParquetFileDoesNotExist { .. })); chunk_id_watermark += 1; // this still works - let path = parsed_path!("chunk_3"); - state.remove(path.clone()).unwrap(); + let path = expected.keys().next().unwrap().clone(); + state.remove(&path).unwrap(); expected.remove(&path); // recently removed - let err = state.remove(path).unwrap_err(); + let err = state.remove(&path).unwrap_err(); assert!(matches!(err, Error::ParquetFileDoesNotExist { .. })); } assert_checkpoint(&state, &f, &expected); @@ -1521,7 +1553,7 @@ pub mod test_helpers { fn assert_checkpoint( state: &S, f: &F, - expected_files: &HashMap>, + expected_files: &HashMap>, ) where F: Fn(&S) -> CheckpointData, { @@ -1568,8 +1600,9 @@ mod tests { }, *, }; - use crate::test_utils::{chunk_addr, make_iox_object_store, make_metadata}; - use object_store::parsed_path; + use crate::test_utils::{ + chunk_addr, make_iox_object_store, make_metadata, make_parquet_file_path, + }; #[tokio::test] async fn test_create_empty() { @@ -2226,7 +2259,7 @@ mod tests { // create another transaction on-top that adds a file (this transaction will be required to load the full state) { let mut transaction = catalog.open_transaction().await; - let path = parsed_path!("last_one"); + let path = make_parquet_file_path(); let info = CatalogParquetInfo { path, file_size_bytes: 33, @@ -2271,11 +2304,13 @@ mod tests { } /// Get sorted list of catalog files from state - fn get_catalog_parquet_files(state: &TestCatalogState) -> Vec<(String, IoxParquetMetaData)> { - let mut files: Vec<(String, IoxParquetMetaData)> = state + fn get_catalog_parquet_files( + state: &TestCatalogState, + ) -> Vec<(ParquetFilePath, IoxParquetMetaData)> { + let mut files: Vec<(ParquetFilePath, IoxParquetMetaData)> = state .parquet_files .values() - .map(|info| (info.path.to_string(), info.metadata.as_ref().clone())) + .map(|info| (info.path.clone(), info.metadata.as_ref().clone())) .collect(); files.sort_by_key(|(path, _)| path.clone()); files @@ -2284,9 +2319,12 @@ mod tests { /// Assert that set of parquet files tracked by a catalog are identical to the given sorted list. fn assert_catalog_parquet_files( state: &TestCatalogState, - expected: &[(String, IoxParquetMetaData)], + expected: &[(ParquetFilePath, IoxParquetMetaData)], ) { let actual = get_catalog_parquet_files(state); + let mut expected = expected.to_vec(); + expected.sort_by_key(|(path, _)| path.clone()); + for ((actual_path, actual_md), (expected_path, expected_md)) in actual.iter().zip(expected.iter()) { @@ -2388,39 +2426,39 @@ mod tests { trace.record(&catalog, &state, false); // fill catalog with examples + let test1 = make_parquet_file_path(); + let sub1_test1 = make_parquet_file_path(); + let sub1_test2 = make_parquet_file_path(); + let sub2_test1 = make_parquet_file_path(); { let mut t = catalog.open_transaction().await; - let path = parsed_path!("test1"); let info = CatalogParquetInfo { - path, + path: test1.clone(), file_size_bytes: 33, metadata: Arc::clone(&metadata1), }; state.parquet_files.insert(info.path.clone(), info.clone()); t.add_parquet(&info).unwrap(); - let path = parsed_path!(["sub1"], "test1"); let info = CatalogParquetInfo { - path, + path: sub1_test1.clone(), file_size_bytes: 33, metadata: Arc::clone(&metadata2), }; state.parquet_files.insert(info.path.clone(), info.clone()); t.add_parquet(&info).unwrap(); - let path = parsed_path!(["sub1"], "test2"); let info = CatalogParquetInfo { - path, + path: sub1_test2.clone(), file_size_bytes: 33, metadata: Arc::clone(&metadata2), }; state.parquet_files.insert(info.path.clone(), info.clone()); t.add_parquet(&info).unwrap(); - let path = parsed_path!(["sub2"], "test1"); let info = CatalogParquetInfo { - path, + path: sub2_test1.clone(), file_size_bytes: 33, metadata: Arc::clone(&metadata1), }; @@ -2433,31 +2471,30 @@ mod tests { assert_catalog_parquet_files( &state, &[ - ("sub1/test1".to_string(), metadata2.as_ref().clone()), - ("sub1/test2".to_string(), metadata2.as_ref().clone()), - ("sub2/test1".to_string(), metadata1.as_ref().clone()), - ("test1".to_string(), metadata1.as_ref().clone()), + (sub1_test1.clone(), metadata2.as_ref().clone()), + (sub1_test2.clone(), metadata2.as_ref().clone()), + (sub2_test1.clone(), metadata1.as_ref().clone()), + (test1.clone(), metadata1.as_ref().clone()), ], ); trace.record(&catalog, &state, false); // modify catalog with examples + let test4 = make_parquet_file_path(); { let mut t = catalog.open_transaction().await; // "real" modifications - let path = parsed_path!("test4"); let info = CatalogParquetInfo { - path, + path: test4.clone(), file_size_bytes: 33, metadata: Arc::clone(&metadata1), }; state.parquet_files.insert(info.path.clone(), info.clone()); t.add_parquet(&info).unwrap(); - let path = parsed_path!("test1"); - state.parquet_files.remove(&path); - t.remove_parquet(&path); + state.parquet_files.remove(&test1); + t.remove_parquet(&test1); t.commit().await.unwrap(); } @@ -2465,10 +2502,10 @@ mod tests { assert_catalog_parquet_files( &state, &[ - ("sub1/test1".to_string(), metadata2.as_ref().clone()), - ("sub1/test2".to_string(), metadata2.as_ref().clone()), - ("sub2/test1".to_string(), metadata1.as_ref().clone()), - ("test4".to_string(), metadata1.as_ref().clone()), + (sub1_test1.clone(), metadata2.as_ref().clone()), + (sub1_test2.clone(), metadata2.as_ref().clone()), + (sub2_test1.clone(), metadata1.as_ref().clone()), + (test4.clone(), metadata1.as_ref().clone()), ], ); trace.record(&catalog, &state, false); @@ -2478,13 +2515,13 @@ mod tests { let mut t = catalog.open_transaction().await; let info = CatalogParquetInfo { - path: parsed_path!("test5"), + path: make_parquet_file_path(), file_size_bytes: 33, metadata: Arc::clone(&metadata1), }; t.add_parquet(&info).unwrap(); - t.remove_parquet(&parsed_path!(["sub1"], "test2")); + t.remove_parquet(&sub1_test2); // NO commit here! } @@ -2492,10 +2529,10 @@ mod tests { assert_catalog_parquet_files( &state, &[ - ("sub1/test1".to_string(), metadata2.as_ref().clone()), - ("sub1/test2".to_string(), metadata2.as_ref().clone()), - ("sub2/test1".to_string(), metadata1.as_ref().clone()), - ("test4".to_string(), metadata1.as_ref().clone()), + (sub1_test1.clone(), metadata2.as_ref().clone()), + (sub1_test2.clone(), metadata2.as_ref().clone()), + (sub2_test1.clone(), metadata1.as_ref().clone()), + (test4.clone(), metadata1.as_ref().clone()), ], ); trace.record(&catalog, &state, true); diff --git a/parquet_file/src/chunk.rs b/parquet_file/src/chunk.rs index 8fdea47ba1..f1c6b556d8 100644 --- a/parquet_file/src/chunk.rs +++ b/parquet_file/src/chunk.rs @@ -8,7 +8,7 @@ use internal_types::{ schema::{Schema, TIME_COLUMN_NAME}, selection::Selection, }; -use iox_object_store::IoxObjectStore; +use iox_object_store::{IoxObjectStore, ParquetFilePath}; use object_store::path::Path; use query::predicate::Predicate; use snafu::{ResultExt, Snafu}; @@ -42,7 +42,7 @@ pub enum Error { )] SchemaReadFailed { source: crate::metadata::Error, - path: Path, + path: ParquetFilePath, }, #[snafu( @@ -51,7 +51,7 @@ pub enum Error { )] StatisticsReadFailed { source: crate::metadata::Error, - path: Path, + path: ParquetFilePath, }, } @@ -95,10 +95,8 @@ pub struct ParquetChunk { /// Persists the parquet file within a database's relative path iox_object_store: Arc, - /// Path in the object store. Format: - /// //data///.parquet - object_store_path: Path, + /// Path in the database's object store. + path: ParquetFilePath, /// Size of the data, in object store file_size_bytes: usize, @@ -112,7 +110,7 @@ pub struct ParquetChunk { impl ParquetChunk { /// Creates new chunk from given parquet metadata. pub fn new( - file_location: Path, + path: &ParquetFilePath, iox_object_store: Arc, file_size_bytes: usize, parquet_metadata: Arc, @@ -120,14 +118,12 @@ impl ParquetChunk { partition_key: Arc, metrics: ChunkMetrics, ) -> Result { - let schema = parquet_metadata.read_schema().context(SchemaReadFailed { - path: &file_location, - })?; + let schema = parquet_metadata + .read_schema() + .context(SchemaReadFailed { path })?; let columns = parquet_metadata .read_statistics(&schema) - .context(StatisticsReadFailed { - path: &file_location, - })?; + .context(StatisticsReadFailed { path })?; let table_summary = TableSummary { name: table_name.to_string(), columns, @@ -137,7 +133,7 @@ impl ParquetChunk { partition_key, Arc::new(table_summary), schema, - file_location, + path, iox_object_store, file_size_bytes, parquet_metadata, @@ -152,7 +148,7 @@ impl ParquetChunk { partition_key: Arc, table_summary: Arc, schema: Arc, - file_location: Path, + path: &ParquetFilePath, iox_object_store: Arc, file_size_bytes: usize, parquet_metadata: Arc, @@ -166,7 +162,7 @@ impl ParquetChunk { schema, timestamp_range, iox_object_store, - object_store_path: file_location, + path: path.into(), file_size_bytes, parquet_metadata, metrics, @@ -179,8 +175,8 @@ impl ParquetChunk { } /// Return object store path for this chunk - pub fn path(&self) -> Path { - self.object_store_path.clone() + pub fn path(&self) -> &ParquetFilePath { + &self.path } /// Returns the summary statistics for this chunk @@ -200,7 +196,7 @@ impl ParquetChunk { + self.partition_key.len() + self.table_summary.size() + mem::size_of_val(&self.schema.as_ref()) - + mem::size_of_val(&self.object_store_path) + + mem::size_of_val(&self.path) + mem::size_of_val(&self.parquet_metadata) } @@ -247,7 +243,7 @@ impl ParquetChunk { predicate, selection, Arc::clone(&self.schema.as_arrow()), - self.object_store_path.clone(), + self.path.clone(), Arc::clone(&self.iox_object_store), ) .context(ReadParquet) diff --git a/parquet_file/src/cleanup.rs b/parquet_file/src/cleanup.rs index 88610c299f..a24a91791d 100644 --- a/parquet_file/src/cleanup.rs +++ b/parquet_file/src/cleanup.rs @@ -3,11 +3,8 @@ use std::{collections::HashSet, sync::Arc}; use crate::catalog::{CatalogParquetInfo, CatalogState, PreservedCatalog}; use futures::TryStreamExt; -use iox_object_store::IoxObjectStore; -use object_store::{ - path::{parsed::DirsAndFileName, Path}, - ObjectStore, ObjectStoreApi, -}; +use iox_object_store::{IoxObjectStore, ParquetFilePath}; +use object_store::{ObjectStore, ObjectStoreApi}; use observability_deps::tracing::info; use parking_lot::Mutex; use snafu::{ResultExt, Snafu}; @@ -35,20 +32,23 @@ pub type Result = std::result::Result; /// The resulting vector is in no particular order. It may be passed to [`delete_files`]. /// /// # Locking / Concurrent Actions -/// While this method is running you MUST NOT create any new parquet files or modify the preserved catalog in any other -/// way. Hence this method needs exclusive access to the preserved catalog and the parquet file. Otherwise this method -/// may report files for deletion that you are about to write to the catalog! +/// +/// While this method is running you MUST NOT create any new parquet files or modify the preserved +/// catalog in any other way. Hence this method needs exclusive access to the preserved catalog and +/// the parquet file. Otherwise this method may report files for deletion that you are about to +/// write to the catalog! /// /// **This method does NOT acquire the transaction lock!** /// -/// To limit the time the exclusive access is required use `max_files` which will limit the number of files to be -/// detected in this cleanup round. +/// To limit the time the exclusive access is required use `max_files` which will limit the number +/// of files to be detected in this cleanup round. /// -/// The exclusive access can be dropped after this method returned and before calling [`delete_files`]. +/// The exclusive access can be dropped after this method returned and before calling +/// [`delete_files`]. pub async fn get_unreferenced_parquet_files( catalog: &PreservedCatalog, max_files: usize, -) -> Result> { +) -> Result> { let iox_object_store = catalog.iox_object_store(); let all_known = { // replay catalog transactions to track ALL (even dropped) files that are referenced @@ -61,14 +61,10 @@ pub async fn get_unreferenced_parquet_files( state.files.into_inner() }; - let prefix = iox_object_store.data_path(); - - // gather a list of "files to remove" eagerly so we do not block transactions on the catalog for too long + // gather a list of "files to remove" eagerly so we do not block transactions on the catalog + // for too long let mut to_remove = vec![]; - let mut stream = iox_object_store - .list(Some(&prefix)) - .await - .context(ReadError)?; + let mut stream = iox_object_store.parquet_files().await.context(ReadError)?; 'outer: while let Some(paths) = stream.try_next().await.context(ReadError)? { for path in paths { @@ -76,18 +72,9 @@ pub async fn get_unreferenced_parquet_files( info!(%max_files, "reached limit of number of files to cleanup in one go"); break 'outer; } - let path_parsed: DirsAndFileName = path.clone().into(); - // only delete if all of the following conditions are met: - // - filename ends with `.parquet` - // - file is not tracked by the catalog - if path_parsed - .file_name - .as_ref() - .map(|part| part.encoded().ends_with(".parquet")) - .unwrap_or(false) - && !all_known.contains(&path_parsed) - { + // only delete if file is not tracked by the catalog + if !all_known.contains(&path) { to_remove.push(path); } } @@ -100,17 +87,18 @@ pub async fn get_unreferenced_parquet_files( /// Delete all `files` from the store linked to the preserved catalog. /// -/// A file might already be deleted (or entirely absent) when this method is called. This will NOT result in an error. +/// A file might already be deleted (or entirely absent) when this method is called. This will NOT +/// result in an error. /// /// # Locking / Concurrent Actions /// File creation and catalog modifications can be done while calling this method. Even /// [`get_unreferenced_parquet_files`] can be called while is method is in-progress. -pub async fn delete_files(catalog: &PreservedCatalog, files: &[Path]) -> Result<()> { +pub async fn delete_files(catalog: &PreservedCatalog, files: &[ParquetFilePath]) -> Result<()> { let store = catalog.iox_object_store(); for path in files { - info!(%path, "Delete file"); - store.delete(path).await.context(WriteError)?; + info!(?path, "Delete file"); + store.delete_parquet_file(path).await.context(WriteError)?; } info!(n_files = files.len(), "Finished deletion, removed files."); @@ -120,7 +108,7 @@ pub async fn delete_files(catalog: &PreservedCatalog, files: &[Path]) -> Result< /// Catalog state that traces all used parquet files. struct TracerCatalogState { - files: Mutex>, + files: Mutex>, } impl CatalogState for TracerCatalogState { @@ -141,7 +129,7 @@ impl CatalogState for TracerCatalogState { Ok(()) } - fn remove(&mut self, _path: DirsAndFileName) -> crate::catalog::Result<()> { + fn remove(&mut self, _path: &ParquetFilePath) -> crate::catalog::Result<()> { // Do NOT remove the file since we still need it for time travel Ok(()) } @@ -149,17 +137,13 @@ impl CatalogState for TracerCatalogState { #[cfg(test)] mod tests { - use std::{collections::HashSet, sync::Arc}; - - use bytes::Bytes; - use object_store::path::Path; - use tokio::sync::RwLock; - use super::*; use crate::{ catalog::test_helpers::TestCatalogState, test_utils::{chunk_addr, make_iox_object_store, make_metadata}, }; + use std::{collections::HashSet, sync::Arc}; + use tokio::sync::RwLock; #[tokio::test] async fn test_cleanup_empty() { @@ -195,8 +179,6 @@ mod tests { // an ordinary tracked parquet file => keep let (path, metadata) = make_metadata(&iox_object_store, "foo", chunk_addr(1)).await; let metadata = Arc::new(metadata); - paths_keep.push(path.to_string()); - let path = path.into(); let info = CatalogParquetInfo { path, file_size_bytes: 33, @@ -204,11 +186,12 @@ mod tests { }; transaction.add_parquet(&info).unwrap(); + paths_keep.push(info.path); - // another ordinary tracked parquet file that was added and removed => keep (for time travel) + // another ordinary tracked parquet file that was added and removed => keep (for time + // travel) let (path, metadata) = make_metadata(&iox_object_store, "foo", chunk_addr(2)).await; let metadata = Arc::new(metadata); - let path = path.into(); let info = CatalogParquetInfo { path, file_size_bytes: 33, @@ -216,21 +199,11 @@ mod tests { }; transaction.add_parquet(&info).unwrap(); transaction.remove_parquet(&info.path); - let path_string = iox_object_store - .path_from_dirs_and_filename(info.path.clone()) - .to_string(); - paths_keep.push(path_string); - - // not a parquet file => keep - let mut path = info.path; - path.file_name = Some("foo.txt".into()); - let path = iox_object_store.path_from_dirs_and_filename(path); - create_empty_file(&iox_object_store, &path).await; - paths_keep.push(path.to_string()); + paths_keep.push(info.path); // an untracked parquet file => delete let (path, _md) = make_metadata(&iox_object_store, "foo", chunk_addr(3)).await; - paths_delete.push(path.to_string()); + paths_delete.push(path); transaction.commit().await.unwrap(); } @@ -266,8 +239,9 @@ mod tests { // try multiple times to provoke a conflict for i in 0..100 { - // Every so often try to create a file with the same ChunkAddr beforehand. This should not trick the cleanup - // logic to remove the actual file because file paths contains a UUIDv4 part. + // Every so often try to create a file with the same ChunkAddr beforehand. This should + // not trick the cleanup logic to remove the actual file because file paths contains a + // UUIDv4 part. if i % 2 == 0 { make_metadata(&iox_object_store, "foo", chunk_addr(i)).await; } @@ -278,7 +252,6 @@ mod tests { let (path, md) = make_metadata(&iox_object_store, "foo", chunk_addr(i)).await; let metadata = Arc::new(md); - let path = path.into(); let info = CatalogParquetInfo { path, file_size_bytes: 33, @@ -291,9 +264,7 @@ mod tests { drop(guard); - iox_object_store - .path_from_dirs_and_filename(info.path) - .to_string() + info.path }, async { let guard = lock.write().await; @@ -321,10 +292,10 @@ mod tests { .unwrap(); // create some files - let mut to_remove: HashSet = Default::default(); + let mut to_remove = HashSet::default(); for chunk_id in 0..3 { let (path, _md) = make_metadata(&iox_object_store, "foo", chunk_addr(chunk_id)).await; - to_remove.insert(path.to_string()); + to_remove.insert(path); } // run clean-up @@ -348,30 +319,15 @@ mod tests { assert_eq!(leftover.len(), 0); } - async fn create_empty_file(iox_object_store: &IoxObjectStore, path: &Path) { - let data = Bytes::default(); - let len = data.len(); - + async fn list_all_files(iox_object_store: &IoxObjectStore) -> HashSet { iox_object_store - .put( - path, - futures::stream::once(async move { Ok(data) }), - Some(len), - ) - .await - .unwrap(); - } - - async fn list_all_files(iox_object_store: &IoxObjectStore) -> HashSet { - iox_object_store - .list(None) + .parquet_files() .await .unwrap() .try_concat() .await .unwrap() - .iter() - .map(|p| p.to_string()) + .into_iter() .collect() } } diff --git a/parquet_file/src/rebuild.rs b/parquet_file/src/rebuild.rs index f82a76ef2e..351e2a431b 100644 --- a/parquet_file/src/rebuild.rs +++ b/parquet_file/src/rebuild.rs @@ -2,8 +2,7 @@ use std::{fmt::Debug, sync::Arc}; use futures::TryStreamExt; -use iox_object_store::IoxObjectStore; -use object_store::path::{parsed::DirsAndFileName, Path}; +use iox_object_store::{IoxObjectStore, ParquetFilePath}; use observability_deps::tracing::error; use snafu::{ResultExt, Snafu}; @@ -22,7 +21,7 @@ pub enum Error { #[snafu(display("Cannot read IOx metadata from parquet file ({:?}): {}", path, source))] MetadataReadFailure { source: crate::metadata::Error, - path: Path, + path: ParquetFilePath, }, #[snafu(display("Cannot add file to transaction: {}", source))] @@ -103,15 +102,17 @@ async fn collect_files( iox_object_store: &IoxObjectStore, ignore_metadata_read_failure: bool, ) -> Result> { - let mut stream = iox_object_store.list(None).await.context(ReadFailure)?; + let mut stream = iox_object_store + .parquet_files() + .await + .context(ReadFailure)?; let mut files = vec![]; while let Some(paths) = stream.try_next().await.context(ReadFailure)? { - for path in paths.into_iter().filter(is_parquet) { + for path in paths { match read_parquet(iox_object_store, &path).await { Ok((file_size_bytes, metadata)) => { - let path = path.into(); files.push(CatalogParquetInfo { path, file_size_bytes, @@ -130,23 +131,13 @@ async fn collect_files( Ok(files) } -/// Checks if the given path is (likely) a parquet file. -fn is_parquet(path: &Path) -> bool { - let path: DirsAndFileName = path.clone().into(); - if let Some(filename) = path.file_name { - filename.encoded().ends_with(".parquet") - } else { - false - } -} - /// Read Parquet and IOx metadata from given path. async fn read_parquet( iox_object_store: &IoxObjectStore, - path: &Path, + path: &ParquetFilePath, ) -> Result<(usize, Arc)> { let data = iox_object_store - .get(path) + .get_parquet_file(path) .await .context(ReadFailure)? .map_ok(|bytes| bytes.to_vec()) @@ -186,6 +177,7 @@ mod tests { storage::Storage, test_utils::{make_iox_object_store, make_record_batch}, }; + use object_store::path::parsed::DirsAndFileName; #[tokio::test] async fn test_rebuild_successfull() { @@ -397,7 +389,7 @@ mod tests { .unwrap(); CatalogParquetInfo { - path: path.into(), + path, file_size_bytes, metadata: Arc::new(metadata), } @@ -406,7 +398,7 @@ mod tests { pub async fn create_parquet_file_without_metadata( iox_object_store: &Arc, chunk_id: u32, - ) -> (DirsAndFileName, IoxParquetMetaData) { + ) -> (ParquetFilePath, IoxParquetMetaData) { let (record_batches, schema, _column_summaries, _num_rows) = make_record_batch("foo"); let mut stream: SendableRecordBatchStream = Box::pin(MemoryStream::new(record_batches)); @@ -424,15 +416,15 @@ mod tests { let data = mem_writer.into_inner().unwrap(); let md = IoxParquetMetaData::from_file_bytes(data.clone()).unwrap(); let storage = Storage::new(Arc::clone(iox_object_store)); - let path = storage.location(&ChunkAddr { + let chunk_addr = ChunkAddr { db_name: Arc::from(iox_object_store.database_name()), table_name: Arc::from("table1"), partition_key: Arc::from("part1"), chunk_id, - }); + }; + let path = ParquetFilePath::new(&chunk_addr); storage.to_object_store(data, &path).await.unwrap(); - let path: DirsAndFileName = path.into(); (path, md) } } diff --git a/parquet_file/src/storage.rs b/parquet_file/src/storage.rs index 0b1d8bae89..933015559a 100644 --- a/parquet_file/src/storage.rs +++ b/parquet_file/src/storage.rs @@ -13,8 +13,8 @@ use datafusion::{ }; use futures::StreamExt; use internal_types::selection::Selection; -use iox_object_store::IoxObjectStore; -use object_store::path::{parsed::DirsAndFileName, ObjectStorePath, Path}; +use iox_object_store::{IoxObjectStore, ParquetFilePath}; +use object_store::path::parsed::DirsAndFileName; use observability_deps::tracing::debug; use parking_lot::Mutex; use parquet::{ @@ -29,7 +29,6 @@ use std::{ io::{Cursor, Seek, SeekFrom, Write}, sync::Arc, }; -use uuid::Uuid; use crate::metadata::{IoxMetadata, IoxParquetMetaData, METADATA_KEY}; @@ -137,29 +136,6 @@ impl Storage { Self { iox_object_store } } - /// Return full path including filename in the object store to save a chunk - /// table file. - /// - /// Paths generated by this method are unique and calling the method twice with the same `addr` will yield different - /// outputs. - /// - /// **Important: The resulting path should be treated as a black box. It might vary over time and is an - /// implementation detail. Do NOT attempt to parse it.** - pub fn location(&self, chunk_addr: &ChunkAddr) -> object_store::path::Path { - // generate random UUID so that files are unique and never overwritten - let uuid = Uuid::new_v4(); - - let mut path = self.iox_object_store.data_path(); - path.push_dir(chunk_addr.table_name.as_ref()); - path.push_dir(chunk_addr.partition_key.as_ref()); - path.set_file_name(format!( - "{}.{}.parquet", - chunk_addr.chunk_id, - uuid.to_string() - )); - path - } - /// Write the given stream of data of a specified table of /// a specified partitioned chunk to a parquet file of this storage /// @@ -170,9 +146,9 @@ impl Storage { chunk_addr: ChunkAddr, stream: SendableRecordBatchStream, metadata: IoxMetadata, - ) -> Result<(Path, usize, IoxParquetMetaData)> { + ) -> Result<(ParquetFilePath, usize, IoxParquetMetaData)> { // Create full path location of this file in object store - let path = self.location(&chunk_addr); + let path = ParquetFilePath::new(&chunk_addr); let schema = stream.schema(); let data = Self::parquet_stream_to_bytes(stream, schema, metadata).await?; @@ -220,18 +196,14 @@ impl Storage { } /// Put the given vector of bytes to the specified location - pub async fn to_object_store( - &self, - data: Vec, - file_name: &object_store::path::Path, - ) -> Result<()> { + pub async fn to_object_store(&self, data: Vec, path: &ParquetFilePath) -> Result<()> { let len = data.len(); let data = Bytes::from(data); let stream_data = Result::Ok(data); self.iox_object_store - .put( - file_name, + .put_parquet_file( + path, futures::stream::once(async move { stream_data }), Some(len), ) @@ -266,7 +238,7 @@ impl Storage { async fn download_and_scan_parquet( predicate: Option, projection: Vec, - path: Path, + path: ParquetFilePath, store: Arc, tx: tokio::sync::mpsc::Sender>, ) -> Result<()> { @@ -288,7 +260,10 @@ impl Storage { .context(OpenTempFile)?; debug!(?path, ?temp_file, "Beginning to read parquet to temp file"); - let mut read_stream = store.get(&path).await.context(ReadingObjectStore)?; + let mut read_stream = store + .get_parquet_file(&path) + .await + .context(ReadingObjectStore)?; while let Some(bytes) = read_stream.next().await { let bytes = bytes.context(ReadingObjectStore)?; @@ -345,7 +320,7 @@ impl Storage { predicate: &Predicate, selection: Selection<'_>, schema: SchemaRef, - path: Path, + path: ParquetFilePath, store: Arc, ) -> Result { // fire up a async task that will fetch the parquet file @@ -568,22 +543,6 @@ mod tests { assert_batches_eq!(&expected, &read_batches); } - #[test] - fn test_locations_are_unique() { - let iox_object_store = make_iox_object_store(); - let storage = Storage::new(Arc::clone(&iox_object_store)); - let chunk_addr = ChunkAddr { - db_name: iox_object_store.database_name().into(), - table_name: Arc::from("my_table"), - partition_key: Arc::from("my_partition"), - chunk_id: 13, - }; - - let l1 = storage.location(&chunk_addr); - let l2 = storage.location(&chunk_addr); - assert_ne!(l1, l2); - } - #[test] fn test_props_have_compression() { // should be writing with compression diff --git a/parquet_file/src/test_utils.rs b/parquet_file/src/test_utils.rs index 3d5cbf5199..2fc120ff11 100644 --- a/parquet_file/src/test_utils.rs +++ b/parquet_file/src/test_utils.rs @@ -25,8 +25,8 @@ use internal_types::{ schema::{builder::SchemaBuilder, Schema, TIME_COLUMN_NAME}, selection::Selection, }; -use iox_object_store::IoxObjectStore; -use object_store::{path::Path, ObjectStore}; +use iox_object_store::{IoxObjectStore, ParquetFilePath}; +use object_store::ObjectStore; use parquet::{ arrow::{ArrowReader, ParquetFileArrowReader}, file::serialized_reader::{SerializedFileReader, SliceableCursor}, @@ -65,15 +65,15 @@ pub async fn load_parquet_from_store_for_chunk( store: Arc, ) -> Result> { let path = chunk.path(); - Ok(load_parquet_from_store_for_path(&path, store).await?) + Ok(load_parquet_from_store_for_path(path, store).await?) } pub async fn load_parquet_from_store_for_path( - path: &Path, + path: &ParquetFilePath, store: Arc, ) -> Result> { let parquet_data = store - .get(path) + .get_parquet_file(path) .await .context(GettingDataFromObjectStore)? .map_ok(|bytes| bytes.to_vec()) @@ -173,7 +173,7 @@ pub async fn make_chunk_given_record_batch( addr.partition_key, Arc::new(table_summary), Arc::new(schema), - path, + &path, Arc::clone(&iox_object_store), file_size_bytes, Arc::new(parquet_metadata), @@ -764,6 +764,12 @@ pub fn read_data_from_parquet_data(schema: SchemaRef, parquet_data: Vec) -> record_batches } +/// Create an arbitrary ParquetFilePath +pub fn make_parquet_file_path() -> ParquetFilePath { + let chunk_addr = chunk_addr(3); + ParquetFilePath::new(&chunk_addr) +} + /// Create test metadata by creating a parquet file and reading it back into memory. /// /// See [`make_chunk`] for details. @@ -771,13 +777,13 @@ pub async fn make_metadata( iox_object_store: &Arc, column_prefix: &str, addr: ChunkAddr, -) -> (Path, IoxParquetMetaData) { +) -> (ParquetFilePath, IoxParquetMetaData) { let chunk = make_chunk(Arc::clone(iox_object_store), column_prefix, addr).await; let parquet_data = load_parquet_from_store(&chunk, Arc::clone(iox_object_store)) .await .unwrap(); ( - chunk.path(), + chunk.path().clone(), IoxParquetMetaData::from_file_bytes(parquet_data).unwrap(), ) } diff --git a/server/src/db.rs b/server/src/db.rs index 454b24df60..f308261764 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -31,7 +31,6 @@ use internal_types::schema::Schema; use iox_object_store::IoxObjectStore; use metrics::KeyValue; use mutable_buffer::chunk::{ChunkMetrics as MutableBufferChunkMetrics, MBChunk}; -use object_store::path::parsed::DirsAndFileName; use observability_deps::tracing::{debug, error, info}; use parking_lot::{Mutex, RwLock}; use parquet_file::{ @@ -1407,7 +1406,7 @@ pub(crate) fn checkpoint_data_from_catalog(catalog: &Catalog) -> CheckpointData for chunk in catalog.chunks() { let guard = chunk.read(); if let ChunkStage::Persisted { parquet, .. } = guard.stage() { - let path: DirsAndFileName = parquet.path().into(); + let path = parquet.path().clone(); let m = CatalogParquetInfo { path: path.clone(), @@ -1510,7 +1509,7 @@ mod tests { use bytes::Bytes; use chrono::{DateTime, TimeZone}; use data_types::{ - chunk_metadata::ChunkStorage, + chunk_metadata::{ChunkAddr, ChunkStorage}, database_rules::{LifecycleRules, PartitionTemplate, TemplatePart}, partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics, TableSummary}, write_summary::TimestampSummary, @@ -1518,10 +1517,8 @@ mod tests { use entry::{test_helpers::lp_to_entry, Sequence}; use futures::{stream, StreamExt, TryStreamExt}; use internal_types::{schema::Schema, selection::Selection}; - use object_store::{ - path::{parts::PathPart, Path}, - ObjectStore, ObjectStoreApi, - }; + use iox_object_store::ParquetFilePath; + use object_store::{path::parsed::DirsAndFileName, ObjectStore, ObjectStoreApi}; use parquet_file::{ catalog::test_helpers::TestCatalogState, metadata::IoxParquetMetaData, @@ -1530,7 +1527,7 @@ mod tests { use persistence_windows::min_max_sequence::MinMaxSequence; use query::{frontend::sql::SqlQueryPlanner, QueryChunk, QueryDatabase}; use std::{ - collections::{BTreeMap, HashSet}, + collections::BTreeMap, convert::TryFrom, iter::Iterator, num::{NonZeroU32, NonZeroU64, NonZeroUsize}, @@ -2591,12 +2588,9 @@ mod tests { ); } - async fn flatten_list_stream( - storage: Arc, - prefix: Option<&Path>, - ) -> Result> { - storage - .list(prefix) + async fn parquet_files(iox_storage: &IoxObjectStore) -> Result> { + iox_storage + .parquet_files() .await? .map_ok(|v| stream::iter(v).map(Ok)) .try_flatten() @@ -2679,11 +2673,9 @@ mod tests { let path = pq_chunk.object_store_path().unwrap(); // Check that the path must exist in the object store - let path_list = flatten_list_stream(Arc::clone(&object_store), Some(&path)) - .await - .unwrap(); + let path_list = parquet_files(&db.iox_object_store).await.unwrap(); assert_eq!(path_list.len(), 1); - assert_eq!(path_list[0], path); + assert_eq!(&path_list[0], path); // Now read data from that path let parquet_data = @@ -2821,12 +2813,10 @@ mod tests { let path = pq_chunk.object_store_path().unwrap(); // Check that the path must exist in the object store - let path_list = flatten_list_stream(Arc::clone(&object_store), Some(&path)) - .await - .unwrap(); + let path_list = parquet_files(&db.iox_object_store).await.unwrap(); println!("path_list: {:#?}", path_list); assert_eq!(path_list.len(), 1); - assert_eq!(path_list[0], path); + assert_eq!(&path_list[0], path); // Now read data from that path let parquet_data = @@ -3913,7 +3903,7 @@ mod tests { let chunk = db.chunk(table_name, partition_key, *chunk_id).unwrap(); let chunk = chunk.read(); if let ChunkStage::Persisted { parquet, .. } = chunk.stage() { - paths_expected.push(parquet.path().to_string()); + paths_expected.push(parquet.path().clone()); } else { panic!("Wrong chunk state."); } @@ -3925,15 +3915,7 @@ mod tests { .unwrap() .unwrap(); let paths_actual = { - let mut tmp: Vec = catalog - .parquet_files - .keys() - .map(|p| { - object_store - .path_from_dirs_and_filename(p.clone()) - .to_string() - }) - .collect(); + let mut tmp: Vec<_> = catalog.parquet_files.keys().cloned().collect(); tmp.sort(); tmp }; @@ -4010,7 +3992,7 @@ mod tests { let chunk = db.chunk(&table_name, &partition_key, chunk_id).unwrap(); let chunk = chunk.read(); if let ChunkStage::Persisted { parquet, .. } = chunk.stage() { - paths_keep.push(parquet.path()); + paths_keep.push(parquet.path().clone()); } else { panic!("Wrong chunk state."); } @@ -4030,18 +4012,18 @@ mod tests { } // ==================== do: create garbage ==================== - let mut path: DirsAndFileName = paths_keep[0].clone().into(); - path.file_name = Some(PathPart::from( - format!("prefix_{}", path.file_name.unwrap().encoded()).as_ref(), - )); - let path_delete = object_store.path_from_dirs_and_filename(path); - create_empty_file(&object_store, &path_delete).await; - let path_delete = path_delete.to_string(); + let path_delete = ParquetFilePath::new(&ChunkAddr { + table_name: "cpu".into(), + partition_key: "123".into(), + chunk_id: 3, + db_name: "not used".into(), + }); + create_empty_file(&db.iox_object_store, &path_delete).await; // ==================== check: all files are there ==================== - let all_files = get_object_store_files(&object_store).await; + let all_files = parquet_files(&db.iox_object_store).await.unwrap(); for path in &paths_keep { - assert!(all_files.contains(&path.to_string())); + assert!(all_files.contains(path)); } // ==================== do: start background task loop ==================== @@ -4054,7 +4036,7 @@ mod tests { // ==================== check: after a while the dropped file should be gone ==================== let t_0 = Instant::now(); loop { - let all_files = get_object_store_files(&object_store).await; + let all_files = parquet_files(&db.iox_object_store).await.unwrap(); if !all_files.contains(&path_delete) { break; } @@ -4067,10 +4049,10 @@ mod tests { join_handle.await.unwrap(); // ==================== check: some files are there ==================== - let all_files = get_object_store_files(&object_store).await; + let all_files = parquet_files(&db.iox_object_store).await.unwrap(); assert!(!all_files.contains(&path_delete)); for path in &paths_keep { - assert!(all_files.contains(&path.to_string())); + assert!(all_files.contains(path)); } } @@ -4359,25 +4341,12 @@ mod tests { (table_name.to_string(), partition_key.to_string(), chunk_id) } - async fn get_object_store_files(object_store: &ObjectStore) -> HashSet { - object_store - .list(None) - .await - .unwrap() - .try_concat() - .await - .unwrap() - .iter() - .map(|p| p.to_string()) - .collect() - } - - async fn create_empty_file(object_store: &ObjectStore, path: &Path) { + async fn create_empty_file(iox_object_store: &IoxObjectStore, path: &ParquetFilePath) { let data = Bytes::default(); let len = data.len(); - object_store - .put( + iox_object_store + .put_parquet_file( path, futures::stream::once(async move { Ok(data) }), Some(len), diff --git a/server/src/db/chunk.rs b/server/src/db/chunk.rs index c1526b25a8..f9d9cc6734 100644 --- a/server/src/db/chunk.rs +++ b/server/src/db/chunk.rs @@ -10,8 +10,8 @@ use internal_types::{ schema::{sort::SortKey, Schema}, selection::Selection, }; +use iox_object_store::ParquetFilePath; use mutable_buffer::chunk::snapshot::ChunkSnapshot; -use object_store::path::Path; use observability_deps::tracing::debug; use parquet_file::chunk::ParquetChunk; use partition_metadata::TableSummary; @@ -198,7 +198,7 @@ impl DbChunk { /// Return the Path in ObjectStorage where this chunk is /// persisted, if any - pub fn object_store_path(&self) -> Option { + pub fn object_store_path(&self) -> Option<&ParquetFilePath> { match &self.state { State::ParquetFile { chunk } => Some(chunk.path()), _ => None, diff --git a/server/src/db/lifecycle/drop.rs b/server/src/db/lifecycle/drop.rs index b832ac4fda..7c203a75a9 100644 --- a/server/src/db/lifecycle/drop.rs +++ b/server/src/db/lifecycle/drop.rs @@ -3,7 +3,6 @@ use std::sync::Arc; use data_types::job::Job; use futures::Future; use lifecycle::{LifecycleWriteGuard, LockableChunk}; -use object_store::path::parsed::DirsAndFileName; use observability_deps::tracing::debug; use snafu::ResultExt; use tracker::{TaskTracker, TrackedFuture, TrackedFutureExt}; @@ -58,10 +57,12 @@ pub fn drop_chunk( let chunk_read = chunk.read(); if let ChunkStage::Persisted { parquet, .. } = chunk_read.stage() { - let path: DirsAndFileName = parquet.path().into(); - Some(path) + Some(parquet.path().clone()) } else if lifecycle_persist { - unreachable!("Unpersisted chunks in a persisted DB should be ruled out before doing any work.") + unreachable!( + "Unpersisted chunks in a persisted DB should be ruled out \ + before doing any work." + ) } else { None } @@ -168,10 +169,12 @@ pub fn drop_partition( let chunk_read = chunk.read(); if let ChunkStage::Persisted { parquet, .. } = chunk_read.stage() { - let path: DirsAndFileName = parquet.path().into(); - paths.push(path); + paths.push(parquet.path().clone()); } else if lifecycle_persist { - unreachable!("Unpersisted chunks in a persisted DB should be ruled out before doing any work.") + unreachable!( + "Unpersisted chunks in a persisted DB should be ruled out \ + before doing any work." + ) } } diff --git a/server/src/db/lifecycle/write.rs b/server/src/db/lifecycle/write.rs index 7ecbe3cd8a..5ab4004134 100644 --- a/server/src/db/lifecycle/write.rs +++ b/server/src/db/lifecycle/write.rs @@ -15,7 +15,6 @@ use ::lifecycle::LifecycleWriteGuard; use chrono::Utc; use data_types::{chunk_metadata::ChunkLifecycleAction, job::Job}; use internal_types::selection::Selection; -use object_store::path::parsed::DirsAndFileName; use observability_deps::tracing::{debug, warn}; use parquet_file::{ catalog::CatalogParquetInfo, @@ -139,7 +138,7 @@ pub(super) fn write_chunk_to_object_store( let metrics = ParquetChunkMetrics::new(&metrics); let parquet_chunk = Arc::new( ParquetChunk::new( - path.clone(), + &path, Arc::clone(&db.iox_object_store), file_size_bytes, Arc::clone(&parquet_metadata), @@ -150,12 +149,10 @@ pub(super) fn write_chunk_to_object_store( .context(ParquetChunkError)?, ); - let path: DirsAndFileName = path.into(); - - // IMPORTANT: Start transaction AFTER writing the actual parquet file so we do not hold the - // transaction lock (that is part of the PreservedCatalog) for too long. By using the - // cleanup lock (see above) it is ensured that the file that we have written is not deleted - // in between. + // IMPORTANT: Start transaction AFTER writing the actual parquet file so we do not hold + // the transaction lock (that is part of the PreservedCatalog) for too long. + // By using the cleanup lock (see above) it is ensured that the file that we + // have written is not deleted in between. let mut transaction = db.preserved_catalog.open_transaction().await; let info = CatalogParquetInfo { path, diff --git a/server/src/db/load.rs b/server/src/db/load.rs index ffc7d9c97b..7c54d22a51 100644 --- a/server/src/db/load.rs +++ b/server/src/db/load.rs @@ -3,9 +3,8 @@ use super::catalog::{chunk::ChunkStage, table::TableSchemaUpsertHandle, Catalog}; use data_types::server_id::ServerId; -use iox_object_store::IoxObjectStore; +use iox_object_store::{IoxObjectStore, ParquetFilePath}; use metrics::{KeyValue, MetricRegistry}; -use object_store::path::parsed::DirsAndFileName; use observability_deps::tracing::{error, info}; use parquet_file::{ catalog::{CatalogParquetInfo, CatalogState, ChunkCreationFailed, PreservedCatalog}, @@ -232,7 +231,7 @@ impl CatalogState for Loader { let metrics = ParquetChunkMetrics::new(&metrics); let parquet_chunk = ParquetChunk::new( - iox_object_store.path_from_dirs_and_filename(info.path.clone()), + &info.path, iox_object_store, info.file_size_bytes, info.metadata, @@ -240,9 +239,7 @@ impl CatalogState for Loader { Arc::clone(&iox_md.partition_key), metrics, ) - .context(ChunkCreationFailed { - path: info.path.clone(), - })?; + .context(ChunkCreationFailed { path: &info.path })?; let parquet_chunk = Arc::new(parquet_chunk); // Get partition from the catalog @@ -268,7 +265,7 @@ impl CatalogState for Loader { Ok(()) } - fn remove(&mut self, path: DirsAndFileName) -> parquet_file::catalog::Result<()> { + fn remove(&mut self, path: &ParquetFilePath) -> parquet_file::catalog::Result<()> { let mut removed_any = false; for partition in self.catalog.partitions() { @@ -278,8 +275,7 @@ impl CatalogState for Loader { for chunk in partition.chunks() { let chunk = chunk.read(); if let ChunkStage::Persisted { parquet, .. } = chunk.stage() { - let chunk_path: DirsAndFileName = parquet.path().into(); - if path == chunk_path { + if path == parquet.path() { to_remove.push(chunk.id()); } } @@ -296,7 +292,7 @@ impl CatalogState for Loader { if removed_any { Ok(()) } else { - Err(parquet_file::catalog::Error::ParquetFileDoesNotExist { path }) + Err(parquet_file::catalog::Error::ParquetFileDoesNotExist { path: path.clone() }) } } }