refactor: Extract a ParquetFilePath to handle paths to parquet files in a db's object store

pull/24376/head
Carol (Nichols || Goulding) 2021-08-12 12:52:39 -04:00
parent 5e1cb244f7
commit 6d5cb9c117
15 changed files with 924 additions and 443 deletions

4
Cargo.lock generated
View File

@ -1445,7 +1445,7 @@ dependencies = [
[[package]] [[package]]
name = "heappy" name = "heappy"
version = "0.1.0" version = "0.1.0"
source = "git+https://github.com/mkmik/heappy?rev=c8fe1fefbaefd536b0137435fce8d90a98a184de#c8fe1fefbaefd536b0137435fce8d90a98a184de" source = "git+https://github.com/mkmik/heappy?rev=82a383128e484039cc2f31476e6289bed48a6701#82a383128e484039cc2f31476e6289bed48a6701"
dependencies = [ dependencies = [
"backtrace", "backtrace",
"bytes", "bytes",
@ -1823,8 +1823,10 @@ dependencies = [
"data_types", "data_types",
"futures", "futures",
"object_store", "object_store",
"snafu",
"tokio", "tokio",
"tokio-stream", "tokio-stream",
"uuid",
] ]
[[package]] [[package]]

View File

@ -9,5 +9,7 @@ bytes = "1.0"
data_types = { path = "../data_types" } data_types = { path = "../data_types" }
futures = "0.3" futures = "0.3"
object_store = { path = "../object_store" } object_store = { path = "../object_store" }
snafu = "0.6"
tokio = { version = "1.0", features = ["macros", "time"] } tokio = { version = "1.0", features = ["macros", "time"] }
tokio-stream = "0.1" tokio-stream = "0.1"
uuid = { version = "0.8", features = ["serde", "v4"] }

View File

@ -16,7 +16,7 @@
use bytes::Bytes; use bytes::Bytes;
use data_types::{server_id::ServerId, DatabaseName}; use data_types::{server_id::ServerId, DatabaseName};
use futures::{stream::BoxStream, Stream, StreamExt}; use futures::{stream::BoxStream, Stream, StreamExt, TryStreamExt};
use object_store::{ use object_store::{
path::{parsed::DirsAndFileName, ObjectStorePath, Path}, path::{parsed::DirsAndFileName, ObjectStorePath, Path},
ObjectStore, ObjectStoreApi, Result, ObjectStore, ObjectStoreApi, Result,
@ -25,6 +25,10 @@ use std::{io, sync::Arc};
use tokio::sync::mpsc::channel; use tokio::sync::mpsc::channel;
use tokio_stream::wrappers::ReceiverStream; use tokio_stream::wrappers::ReceiverStream;
mod paths;
use paths::{DataPath, RootPath};
pub use paths::{ParquetFilePath, ParquetFilePathParseError};
/// Handles persistence of data for a particular database. Writes within its directory/prefix. /// Handles persistence of data for a particular database. Writes within its directory/prefix.
/// ///
/// This wrapper on top of an `ObjectStore` maps IOx specific concepts to ObjectStore locations /// This wrapper on top of an `ObjectStore` maps IOx specific concepts to ObjectStore locations
@ -34,6 +38,7 @@ pub struct IoxObjectStore {
server_id: ServerId, server_id: ServerId,
database_name: String, // TODO: use data_types DatabaseName? database_name: String, // TODO: use data_types DatabaseName?
root_path: RootPath, root_path: RootPath,
data_path: DataPath,
} }
impl IoxObjectStore { impl IoxObjectStore {
@ -45,11 +50,13 @@ impl IoxObjectStore {
database_name: &DatabaseName<'_>, database_name: &DatabaseName<'_>,
) -> Self { ) -> Self {
let root_path = RootPath::new(inner.new_path(), server_id, database_name); let root_path = RootPath::new(inner.new_path(), server_id, database_name);
let data_path = DataPath::new(&root_path);
Self { Self {
inner, inner,
server_id, server_id,
database_name: database_name.into(), database_name: database_name.into(),
root_path, root_path,
data_path,
} }
} }
@ -74,22 +81,6 @@ impl IoxObjectStore {
path path
} }
/// Location where parquet data goes to.
///
/// Schema currently is:
///
/// ```text
/// <server_id>/<db_name>/data/
/// ```
// TODO: avoid leaking this outside this crate
pub fn data_path(&self) -> Path {
let mut path = self.inner.new_path();
path.push_dir(self.server_id.to_string());
path.push_dir(&self.database_name);
path.push_dir("data");
path
}
/// Store this data in this database's object store. /// Store this data in this database's object store.
pub async fn put<S>(&self, location: &Path, bytes: S, length: Option<usize>) -> Result<()> pub async fn put<S>(&self, location: &Path, bytes: S, length: Option<usize>) -> Result<()>
where where
@ -103,6 +94,53 @@ impl IoxObjectStore {
Ok(self.list(Some(&self.catalog_path())).await?.boxed()) Ok(self.list(Some(&self.catalog_path())).await?.boxed())
} }
/// List all parquet file paths in object storage for this database.
pub async fn parquet_files(&self) -> Result<BoxStream<'static, Result<Vec<ParquetFilePath>>>> {
Ok(self
.list(Some(&self.data_path.inner))
.await?
.map_ok(move |list| {
list.into_iter()
// This `flat_map` ignores any filename in the data_path we couldn't parse as
// a ParquetFilePath
.flat_map(ParquetFilePath::from_absolute)
.collect()
})
.boxed())
}
/// Get the data in this relative path in this database's object store.
pub async fn get_parquet_file(
&self,
location: &ParquetFilePath,
) -> Result<BoxStream<'static, Result<Bytes>>> {
let full_path = self.data_path.join(location);
self.inner.get(&full_path).await
}
/// Store the data for this parquet file in this database's object store.
pub async fn put_parquet_file<S>(
&self,
location: &ParquetFilePath,
bytes: S,
length: Option<usize>,
) -> Result<()>
where
S: Stream<Item = io::Result<Bytes>> + Send + Sync + 'static,
{
let full_path = self.data_path.join(location);
self.inner.put(&full_path, bytes, length).await
}
/// Remove the data for this parquet file from this database's object store
pub async fn delete_parquet_file(&self, location: &ParquetFilePath) -> Result<()> {
let full_path = self.data_path.join(location);
self.inner.delete(&full_path).await
}
/// List the relative paths in this database's object store. /// List the relative paths in this database's object store.
pub async fn list( pub async fn list(
&self, &self,
@ -148,56 +186,122 @@ impl IoxObjectStore {
} }
} }
/// A database-specific object store path that all `IoxPath`s should be within.
#[derive(Debug, Clone)]
struct RootPath {
root: Path,
}
impl RootPath {
/// How the root of a database is defined in object storage.
fn new(mut root: Path, server_id: ServerId, database_name: &DatabaseName<'_>) -> Self {
root.push_dir(server_id.to_string());
root.push_dir(database_name.as_str());
Self { root }
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use data_types::chunk_metadata::ChunkAddr;
use object_store::{ObjectStore, ObjectStoreApi};
use std::num::NonZeroU32; use std::num::NonZeroU32;
use uuid::Uuid;
/// Creates new test server ID /// Creates new test server ID
fn make_server_id() -> ServerId { fn make_server_id() -> ServerId {
ServerId::new(NonZeroU32::new(1).unwrap()) ServerId::new(NonZeroU32::new(1).unwrap())
} }
/// Creates a new in-memory object store. These tests rely on the `Path`s being of type /// Creates a new in-memory object store
/// `DirsAndFileName` and thus using object_store::path::DELIMITER as the separator
fn make_object_store() -> Arc<ObjectStore> { fn make_object_store() -> Arc<ObjectStore> {
Arc::new(ObjectStore::new_in_memory()) Arc::new(ObjectStore::new_in_memory())
} }
#[test] async fn add_file(object_store: &ObjectStore, location: &Path) {
fn catalog_path_is_relative_to_db_root() { let data = Bytes::from("arbitrary data");
let server_id = make_server_id(); let stream_data = std::io::Result::Ok(data.clone());
let database_name = DatabaseName::new("clouds").unwrap();
let iox_object_store = IoxObjectStore::new(make_object_store(), server_id, &database_name); object_store
assert_eq!( .put(
iox_object_store.catalog_path().to_string(), location,
"mem:1/clouds/transactions/" futures::stream::once(async move { stream_data }),
); None,
)
.await
.unwrap();
} }
#[test] async fn parquet_files(iox_object_store: &IoxObjectStore) -> Vec<ParquetFilePath> {
fn data_path_is_relative_to_db_root() { iox_object_store
.parquet_files()
.await
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap()
.into_iter()
.flatten()
.collect()
}
async fn add_parquet_file(iox_object_store: &IoxObjectStore, location: &ParquetFilePath) {
let data = Bytes::from("arbitrary data");
let stream_data = std::io::Result::Ok(data.clone());
iox_object_store
.put_parquet_file(
location,
futures::stream::once(async move { stream_data }),
None,
)
.await
.unwrap();
}
#[tokio::test]
async fn only_lists_relevant_parquet_files() {
let object_store = make_object_store();
let server_id = make_server_id(); let server_id = make_server_id();
let database_name = DatabaseName::new("clouds").unwrap(); let database_name = DatabaseName::new("clouds").unwrap();
let iox_object_store = IoxObjectStore::new(make_object_store(), server_id, &database_name); let iox_object_store =
assert_eq!( IoxObjectStore::new(Arc::clone(&object_store), server_id, &database_name);
iox_object_store.data_path().to_string(), let uuid = Uuid::new_v4();
"mem:1/clouds/data/"
); // Put a non-database file in
let mut path = object_store.new_path();
path.push_dir("foo");
add_file(&object_store, &path).await;
// Put a file for some other server in
let mut path = object_store.new_path();
path.push_dir("12345");
add_file(&object_store, &path).await;
// Put a file for some other database in
let mut path = object_store.new_path();
path.push_dir(server_id.to_string());
path.push_dir("thunder");
add_file(&object_store, &path).await;
// Put a file in the database dir but not the data dir
let mut path = object_store.new_path();
path.push_dir(server_id.to_string());
path.push_dir(database_name.to_string());
path.set_file_name(&format!("111.{}.parquet", uuid));
add_file(&object_store, &path).await;
// Put files in the data dir whose names are in the wrong format
let mut path = object_store.new_path();
path.push_dir(server_id.to_string());
path.push_dir(database_name.to_string());
path.set_file_name("111.parquet");
add_file(&object_store, &path).await;
path.set_file_name(&format!("111.{}.xls", uuid));
add_file(&object_store, &path).await;
// Parquet files should be empty
let pf = parquet_files(&iox_object_store).await;
assert!(pf.is_empty(), "{:?}", pf);
// Add a real parquet file
let chunk_addr = ChunkAddr {
db_name: "clouds".into(),
table_name: "my_table".into(),
partition_key: "my_partition".into(),
chunk_id: 13,
};
let p1 = ParquetFilePath::new(&chunk_addr);
add_parquet_file(&iox_object_store, &p1).await;
// Only the real file should be returned
let pf = parquet_files(&iox_object_store).await;
assert_eq!(&pf, &[p1]);
} }
} }

View File

@ -0,0 +1,462 @@
//! Paths for specific types of files within a database's object storage.
use data_types::{chunk_metadata::ChunkAddr, server_id::ServerId, DatabaseName};
use object_store::{
path::{parsed::DirsAndFileName, ObjectStorePath, Path},
Result,
};
use snafu::{ensure, OptionExt, ResultExt, Snafu};
use std::sync::Arc;
use uuid::Uuid;
/// A database-specific object store path that all `IoxPath`s should be within.
/// This should not be leaked outside this crate.
#[derive(Debug, Clone)]
pub struct RootPath {
inner: Path,
}
impl RootPath {
/// How the root of a database is defined in object storage.
pub fn new(mut root: Path, server_id: ServerId, database_name: &DatabaseName<'_>) -> Self {
root.push_dir(server_id.to_string());
root.push_dir(database_name.as_str());
Self { inner: root }
}
pub fn join(&self, dir: &str) -> Path {
let mut result = self.inner.clone();
result.push_dir(dir);
result
}
}
/// A database-specific object store path for all data files. This should not be leaked outside
/// this crate.
#[derive(Debug, Clone)]
pub struct DataPath {
pub inner: Path,
}
impl DataPath {
pub fn new(root_path: &RootPath) -> Self {
Self {
inner: root_path.join("data"),
}
}
pub fn join(&self, parquet_file_path: &ParquetFilePath) -> Path {
let mut result = self.inner.clone();
let relative = parquet_file_path.relative_dirs_and_file_name();
for part in relative.directories {
result.push_dir(part.to_string());
}
result.set_file_name(
relative
.file_name
.expect("Parquet file paths have filenames")
.to_string(),
);
result
}
}
/// Location of a Parquet file within a database's object store.
/// The exact format is an implementation detail and is subject to change.
#[derive(Debug, Clone, Eq, PartialEq, Hash, Ord, PartialOrd)]
pub struct ParquetFilePath {
table_name: Arc<str>,
partition_key: Arc<str>,
chunk_id: u32,
uuid: Uuid,
}
impl ParquetFilePath {
/// Create a location for this chunk's parquet file. Calling this twice on the same `ChunkAddr`
/// will return different `ParquetFilePaths`.
pub fn new(chunk_addr: &ChunkAddr) -> Self {
Self {
table_name: Arc::clone(&chunk_addr.table_name),
partition_key: Arc::clone(&chunk_addr.partition_key),
chunk_id: chunk_addr.chunk_id,
// generate random UUID so that files are unique and never overwritten
uuid: Uuid::new_v4(),
}
}
/// Turn this into directories and file names to be added to a root path or to be serialized
/// in protobuf.
pub fn relative_dirs_and_file_name(&self) -> DirsAndFileName {
let mut result = DirsAndFileName::default();
result.push_all_dirs(&[self.table_name.as_ref(), self.partition_key.as_ref()]);
result.set_file_name(format!("{}.{}.parquet", self.chunk_id, self.uuid));
result
}
/// Create from serialized protobuf strings.
pub fn from_relative_dirs_and_file_name(
dirs_and_file_name: &DirsAndFileName,
) -> Result<Self, ParquetFilePathParseError> {
let mut directories = dirs_and_file_name.directories.iter();
let table_name = directories
.next()
.context(MissingTableName)?
.to_string()
.into();
let partition_key = directories
.next()
.context(MissingPartitionKey)?
.to_string()
.into();
ensure!(directories.next().is_none(), UnexpectedDirectory);
let file_name = dirs_and_file_name
.file_name
.as_ref()
.context(MissingChunkId)?
.to_string();
let mut parts = file_name.split('.');
let chunk_id = parts
.next()
.context(MissingChunkId)?
.parse()
.context(InvalidChunkId)?;
let uuid = parts
.next()
.context(MissingUuid)?
.parse()
.context(InvalidUuid)?;
let ext = parts.next().context(MissingExtension)?;
ensure!(ext == "parquet", InvalidExtension { ext });
ensure!(parts.next().is_none(), UnexpectedExtension);
Ok(Self {
table_name,
partition_key,
chunk_id,
uuid,
})
}
// Deliberately pub(crate); this transformation should only happen within this crate
pub(crate) fn from_absolute(absolute_path: Path) -> Result<Self, ParquetFilePathParseError> {
let absolute_path: DirsAndFileName = absolute_path.into();
let mut absolute_dirs = absolute_path.directories.into_iter().fuse();
// The number of `next`s here needs to match the total number of directories in
// iox_object_store data_paths
absolute_dirs.next(); // server id
absolute_dirs.next(); // database name
absolute_dirs.next(); // "data"
let remaining = DirsAndFileName {
directories: absolute_dirs.collect(),
file_name: absolute_path.file_name,
};
Self::from_relative_dirs_and_file_name(&remaining)
}
}
impl From<&Self> for ParquetFilePath {
fn from(borrowed: &Self) -> Self {
borrowed.clone()
}
}
#[derive(Snafu, Debug, PartialEq)]
#[allow(missing_docs)]
pub enum ParquetFilePathParseError {
#[snafu(display("Could not find required table name"))]
MissingTableName,
#[snafu(display("Could not find required partition key"))]
MissingPartitionKey,
#[snafu(display("Too many directories found"))]
UnexpectedDirectory,
#[snafu(display("Could not find required chunk id"))]
MissingChunkId,
#[snafu(display("Could not parse chunk id: {}", source))]
InvalidChunkId { source: std::num::ParseIntError },
#[snafu(display("Could not find required UUID"))]
MissingUuid,
#[snafu(display("Could not parse UUID: {}", source))]
InvalidUuid { source: uuid::Error },
#[snafu(display("Could not find required file extension"))]
MissingExtension,
#[snafu(display("Extension should have been `parquet`, instead found `{}`", ext))]
InvalidExtension { ext: String },
#[snafu(display("Too many extensions found"))]
UnexpectedExtension,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::IoxObjectStore;
use object_store::{ObjectStore, ObjectStoreApi};
use std::num::NonZeroU32;
/// Creates new test server ID
fn make_server_id() -> ServerId {
ServerId::new(NonZeroU32::new(1).unwrap())
}
/// Creates a new in-memory object store. These tests rely on the `Path`s being of type
/// `DirsAndFileName` and thus using object_store::path::DELIMITER as the separator
fn make_object_store() -> Arc<ObjectStore> {
Arc::new(ObjectStore::new_in_memory())
}
#[test]
fn root_path_contains_server_id_and_db_name() {
let server_id = make_server_id();
let database_name = DatabaseName::new("clouds").unwrap();
let iox_object_store = IoxObjectStore::new(make_object_store(), server_id, &database_name);
assert_eq!(iox_object_store.root_path.inner.to_string(), "1/clouds/")
}
#[test]
fn root_path_join_concatenates() {
let server_id = make_server_id();
let database_name = DatabaseName::new("clouds").unwrap();
let iox_object_store = IoxObjectStore::new(make_object_store(), server_id, &database_name);
let path = iox_object_store.root_path.join("foo");
assert_eq!(path.to_string(), "1/clouds/foo/");
}
#[test]
fn catalog_path_is_relative_to_db_root() {
let server_id = make_server_id();
let database_name = DatabaseName::new("clouds").unwrap();
let iox_object_store = IoxObjectStore::new(make_object_store(), server_id, &database_name);
assert_eq!(
iox_object_store.catalog_path().to_string(),
"1/clouds/transactions/"
);
}
#[test]
fn data_path_is_relative_to_db_root() {
let server_id = make_server_id();
let database_name = DatabaseName::new("clouds").unwrap();
let iox_object_store = IoxObjectStore::new(make_object_store(), server_id, &database_name);
assert_eq!(
iox_object_store.data_path.inner.to_string(),
"1/clouds/data/"
);
}
#[test]
fn test_parquet_file_paths_are_unique() {
let chunk_addr = ChunkAddr {
db_name: "clouds".into(),
table_name: "my_table".into(),
partition_key: "my_partition".into(),
chunk_id: 13,
};
let p1 = ParquetFilePath::new(&chunk_addr);
let p2 = ParquetFilePath::new(&chunk_addr);
assert_ne!(p1, p2);
}
#[test]
fn test_parquet_file_path_deserialization() {
// Error cases
use ParquetFilePathParseError::*;
let mut df = DirsAndFileName::default();
let result = ParquetFilePath::from_relative_dirs_and_file_name(&df);
assert!(matches!(result, Err(MissingTableName)), "got {:?}", result);
df.push_dir("foo");
let result = ParquetFilePath::from_relative_dirs_and_file_name(&df);
assert!(
matches!(result, Err(MissingPartitionKey)),
"got {:?}",
result
);
df.push_dir("bar");
let result = ParquetFilePath::from_relative_dirs_and_file_name(&df);
assert!(
matches!(result, Err(MissingChunkId { .. })),
"got {:?}",
result
);
let mut extra = df.clone();
extra.push_dir("nope");
let result = ParquetFilePath::from_relative_dirs_and_file_name(&extra);
assert!(
matches!(result, Err(UnexpectedDirectory)),
"got {:?}",
result
);
df.set_file_name("bleh");
let result = ParquetFilePath::from_relative_dirs_and_file_name(&df);
assert!(
matches!(result, Err(InvalidChunkId { .. })),
"got {:?}",
result
);
df.set_file_name("3");
let result = ParquetFilePath::from_relative_dirs_and_file_name(&df);
assert!(
matches!(result, Err(MissingUuid { .. })),
"got {:?}",
result
);
df.set_file_name("3.nope");
let result = ParquetFilePath::from_relative_dirs_and_file_name(&df);
assert!(
matches!(result, Err(InvalidUuid { .. })),
"got {:?}",
result
);
let uuid = Uuid::new_v4();
df.set_file_name(&format!("3.{}", uuid));
let result = ParquetFilePath::from_relative_dirs_and_file_name(&df);
assert!(matches!(result, Err(MissingExtension)), "got {:?}", result);
df.set_file_name(&format!("3.{}.exe", uuid));
let result = ParquetFilePath::from_relative_dirs_and_file_name(&df);
assert!(
matches!(result, Err(InvalidExtension { .. })),
"got {:?}",
result
);
df.set_file_name(&format!("3.{}.parquet.v6", uuid));
let result = ParquetFilePath::from_relative_dirs_and_file_name(&df);
assert!(
matches!(result, Err(UnexpectedExtension)),
"got {:?}",
result
);
// Success case
df.set_file_name(&format!("3.{}.parquet", uuid));
let result = ParquetFilePath::from_relative_dirs_and_file_name(&df).unwrap();
assert_eq!(
result,
ParquetFilePath {
table_name: "foo".into(),
partition_key: "bar".into(),
chunk_id: 3,
uuid
}
);
let round_trip = result.relative_dirs_and_file_name();
assert_eq!(round_trip, df);
}
#[test]
fn parquet_file_from_absolute() {
let object_store = make_object_store();
// Error cases
use ParquetFilePathParseError::*;
let mut path = object_store.new_path();
// incorrect directories are fine, we're assuming that list(data_path) scoped to the
// right directories so we don't check again on the way out
path.push_all_dirs(&["foo", "bar", "baz", "}*", "aoeu"]);
path.set_file_name("rules.pb");
let result = ParquetFilePath::from_absolute(path);
assert!(
matches!(result, Err(InvalidChunkId { .. })),
"got: {:?}",
result
);
let mut path = object_store.new_path();
path.push_all_dirs(&["foo", "bar", "baz", "}*", "aoeu"]);
// missing file name
let result = ParquetFilePath::from_absolute(path);
assert!(matches!(result, Err(MissingChunkId)), "got: {:?}", result);
// Success case
let uuid = Uuid::new_v4();
let mut path = object_store.new_path();
path.push_all_dirs(&["foo", "bar", "baz", "}*", "aoeu"]);
path.set_file_name(&format!("10.{}.parquet", uuid));
let result = ParquetFilePath::from_absolute(path);
assert_eq!(
result.unwrap(),
ParquetFilePath {
table_name: "}*".into(),
partition_key: "aoeu".into(),
chunk_id: 10,
uuid
}
);
}
#[test]
fn parquet_file_relative_dirs_and_file_path() {
let uuid = Uuid::new_v4();
let pfp = ParquetFilePath {
table_name: "}*".into(),
partition_key: "aoeu".into(),
chunk_id: 10,
uuid,
};
let dirs_and_file_name = pfp.relative_dirs_and_file_name();
assert_eq!(
dirs_and_file_name.to_string(),
format!("%7D%2A/aoeu/10.{}.parquet", uuid)
);
let round_trip =
ParquetFilePath::from_relative_dirs_and_file_name(&dirs_and_file_name).unwrap();
assert_eq!(pfp, round_trip);
}
#[test]
fn data_path_join_with_parquet_file_path() {
let server_id = make_server_id();
let database_name = DatabaseName::new("clouds").unwrap();
let object_store = make_object_store();
let iox_object_store =
IoxObjectStore::new(Arc::clone(&object_store), server_id, &database_name);
let uuid = Uuid::new_v4();
let pfp = ParquetFilePath {
table_name: "}*".into(),
partition_key: "aoeu".into(),
chunk_id: 10,
uuid,
};
let path = iox_object_store.data_path.join(&pfp);
let mut expected_path = object_store.new_path();
expected_path.push_all_dirs(&[
&server_id.to_string(),
database_name.as_str(),
"data",
"}*",
"aoeu",
]);
expected_path.set_file_name(&format!("10.{}.parquet", uuid));
assert_eq!(path, expected_path);
}
}

View File

@ -4,7 +4,7 @@ use bytes::Bytes;
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use futures::TryStreamExt; use futures::TryStreamExt;
use generated_types::influxdata::iox::catalog::v1 as proto; use generated_types::influxdata::iox::catalog::v1 as proto;
use iox_object_store::IoxObjectStore; use iox_object_store::{IoxObjectStore, ParquetFilePath, ParquetFilePathParseError};
use object_store::{ use object_store::{
path::{parsed::DirsAndFileName, parts::PathPart, ObjectStorePath, Path}, path::{parsed::DirsAndFileName, parts::PathPart, ObjectStorePath, Path},
ObjectStore, ObjectStoreApi, ObjectStore, ObjectStoreApi,
@ -41,10 +41,14 @@ pub const CHECKPOINT_FILE_SUFFIX: &str = "ckpt";
#[derive(Debug, Snafu)] #[derive(Debug, Snafu)]
pub enum Error { pub enum Error {
#[snafu(display("Error during serialization: {}", source))] #[snafu(display("Error during serialization: {}", source))]
Serialization { source: EncodeError }, Serialization {
source: EncodeError,
},
#[snafu(display("Error during deserialization: {}", source))] #[snafu(display("Error during deserialization: {}", source))]
Deserialization { source: DecodeError }, Deserialization {
source: DecodeError,
},
#[snafu(display("Error during store write operation: {}", source))] #[snafu(display("Error during store write operation: {}", source))]
Write { Write {
@ -57,14 +61,19 @@ pub enum Error {
}, },
#[snafu(display("Missing transaction: {}", revision_counter))] #[snafu(display("Missing transaction: {}", revision_counter))]
MissingTransaction { revision_counter: u64 }, MissingTransaction {
revision_counter: u64,
},
#[snafu(display( #[snafu(display(
"Wrong revision counter in transaction file: expected {} but found {}", "Wrong revision counter in transaction file: expected {} but found {}",
expected, expected,
actual actual
))] ))]
WrongTransactionRevision { expected: u64, actual: u64 }, WrongTransactionRevision {
expected: u64,
actual: u64,
},
#[snafu(display( #[snafu(display(
"Wrong UUID for transaction file (revision: {}): expected {} but found {}", "Wrong UUID for transaction file (revision: {}): expected {} but found {}",
@ -91,7 +100,9 @@ pub enum Error {
}, },
#[snafu(display("Cannot parse UUID: {}", source))] #[snafu(display("Cannot parse UUID: {}", source))]
UuidParse { source: uuid::Error }, UuidParse {
source: uuid::Error,
},
#[snafu(display("UUID required but not provided"))] #[snafu(display("UUID required but not provided"))]
UuidRequired {}, UuidRequired {},
@ -119,19 +130,29 @@ pub enum Error {
}, },
#[snafu(display("Upgrade path not implemented/supported: {}", format))] #[snafu(display("Upgrade path not implemented/supported: {}", format))]
UnsupportedUpgrade { format: String }, UnsupportedUpgrade {
format: String,
},
#[snafu(display("Parquet already exists in catalog: {:?}", path))] #[snafu(display("Parquet already exists in catalog: {:?}", path))]
ParquetFileAlreadyExists { path: DirsAndFileName }, ParquetFileAlreadyExists {
path: ParquetFilePath,
},
#[snafu(display("Parquet does not exist in catalog: {:?}", path))] #[snafu(display("Parquet does not exist in catalog: {:?}", path))]
ParquetFileDoesNotExist { path: DirsAndFileName }, ParquetFileDoesNotExist {
path: ParquetFilePath,
},
#[snafu(display("Cannot encode parquet metadata: {}", source))] #[snafu(display("Cannot encode parquet metadata: {}", source))]
MetadataEncodingFailed { source: crate::metadata::Error }, MetadataEncodingFailed {
source: crate::metadata::Error,
},
#[snafu(display("Cannot decode parquet metadata: {}", source))] #[snafu(display("Cannot decode parquet metadata: {}", source))]
MetadataDecodingFailed { source: crate::metadata::Error }, MetadataDecodingFailed {
source: crate::metadata::Error,
},
#[snafu( #[snafu(
display("Cannot extract metadata from {:?}: {}", path, source), display("Cannot extract metadata from {:?}: {}", path, source),
@ -139,7 +160,7 @@ pub enum Error {
)] )]
MetadataExtractFailed { MetadataExtractFailed {
source: crate::metadata::Error, source: crate::metadata::Error,
path: DirsAndFileName, path: ParquetFilePath,
}, },
#[snafu( #[snafu(
@ -148,7 +169,7 @@ pub enum Error {
)] )]
SchemaError { SchemaError {
source: Box<dyn std::error::Error + Send + Sync>, source: Box<dyn std::error::Error + Send + Sync>,
path: DirsAndFileName, path: ParquetFilePath,
}, },
#[snafu( #[snafu(
@ -161,7 +182,7 @@ pub enum Error {
)] )]
ReplayPlanError { ReplayPlanError {
source: Box<dyn std::error::Error + Send + Sync>, source: Box<dyn std::error::Error + Send + Sync>,
path: DirsAndFileName, path: ParquetFilePath,
}, },
#[snafu( #[snafu(
@ -170,7 +191,7 @@ pub enum Error {
)] )]
ChunkCreationFailed { ChunkCreationFailed {
source: crate::chunk::Error, source: crate::chunk::Error,
path: DirsAndFileName, path: ParquetFilePath,
}, },
#[snafu(display("Catalog already exists"))] #[snafu(display("Catalog already exists"))]
@ -180,13 +201,17 @@ pub enum Error {
DateTimeRequired {}, DateTimeRequired {},
#[snafu(display("Internal: Cannot parse datetime in serialized catalog: {}", source))] #[snafu(display("Internal: Cannot parse datetime in serialized catalog: {}", source))]
DateTimeParseError { source: TryFromIntError }, DateTimeParseError {
source: TryFromIntError,
},
#[snafu(display( #[snafu(display(
"Internal: Cannot parse encoding in serialized catalog: {} is not a valid, specified variant", "Internal: Cannot parse encoding in serialized catalog: {} is not a valid, specified variant",
data data
))] ))]
EncodingParseError { data: i32 }, EncodingParseError {
data: i32,
},
#[snafu(display( #[snafu(display(
"Internal: Found wrong encoding in serialized catalog file: Expected {:?} but got {:?}", "Internal: Found wrong encoding in serialized catalog file: Expected {:?} but got {:?}",
@ -199,7 +224,13 @@ pub enum Error {
}, },
#[snafu(display("Cannot commit transaction: {}", source))] #[snafu(display("Cannot commit transaction: {}", source))]
CommitError { source: Box<Error> }, CommitError {
source: Box<Error>,
},
InvalidParquetFilePath {
source: ParquetFilePathParseError,
},
} }
pub type Result<T, E = Error> = std::result::Result<T, E>; pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -208,7 +239,7 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct CatalogParquetInfo { pub struct CatalogParquetInfo {
/// Path within this database. /// Path within this database.
pub path: DirsAndFileName, pub path: ParquetFilePath,
/// Size of the parquet file, in bytes /// Size of the parquet file, in bytes
pub file_size_bytes: usize, pub file_size_bytes: usize,
@ -235,7 +266,7 @@ pub trait CatalogState {
) -> Result<()>; ) -> Result<()>;
/// Remove parquet file from state. /// Remove parquet file from state.
fn remove(&mut self, path: DirsAndFileName) -> Result<()>; fn remove(&mut self, path: &ParquetFilePath) -> Result<()>;
} }
/// In-memory view of the preserved catalog. /// In-memory view of the preserved catalog.
@ -314,8 +345,8 @@ impl PreservedCatalog {
/// Create new catalog w/o any data. /// Create new catalog w/o any data.
/// ///
/// An empty transaction will be used to mark the catalog start so that concurrent open but still-empty catalogs can /// An empty transaction will be used to mark the catalog start so that concurrent open but
/// easily be detected. /// still-empty catalogs can easily be detected.
pub async fn new_empty<S>( pub async fn new_empty<S>(
iox_object_store: Arc<IoxObjectStore>, iox_object_store: Arc<IoxObjectStore>,
state_data: S::EmptyInput, state_data: S::EmptyInput,
@ -688,22 +719,24 @@ fn parse_uuid_required(s: &str) -> Result<Uuid> {
parse_uuid(s)?.context(UuidRequired {}) parse_uuid(s)?.context(UuidRequired {})
} }
/// Parse [`DirsAndFilename`](object_store::path::parsed::DirsAndFileName) from protobuf. /// Parse [`ParquetFilePath`](iox_object_store::ParquetFilePath) from protobuf.
fn parse_dirs_and_filename(proto: &Option<proto::Path>) -> Result<DirsAndFileName> { fn parse_dirs_and_filename(proto: &proto::Path) -> Result<ParquetFilePath> {
let proto = proto.as_ref().context(PathRequired)?; let dirs_and_file_name = DirsAndFileName {
Ok(DirsAndFileName {
directories: proto directories: proto
.directories .directories
.iter() .iter()
.map(|s| PathPart::from(&s[..])) .map(|s| PathPart::from(&s[..]))
.collect(), .collect(),
file_name: Some(PathPart::from(&proto.file_name[..])), file_name: Some(PathPart::from(&proto.file_name[..])),
}) };
ParquetFilePath::from_relative_dirs_and_file_name(&dirs_and_file_name)
.context(InvalidParquetFilePath)
} }
/// Store [`DirsAndFilename`](object_store::path::parsed::DirsAndFileName) as protobuf. /// Store [`ParquetFilePath`](iox_object_store::ParquetFilePath) as protobuf.
fn unparse_dirs_and_filename(path: &DirsAndFileName) -> proto::Path { fn unparse_dirs_and_filename(path: &ParquetFilePath) -> proto::Path {
let path = path.relative_dirs_and_file_name();
proto::Path { proto::Path {
directories: path directories: path
.directories .directories
@ -787,11 +820,11 @@ impl OpenTransaction {
/// Handle the given action and populate data to the catalog state. /// Handle the given action and populate data to the catalog state.
/// ///
/// The deserializes the action state and passes it to the correct method in [`CatalogState`]. /// This deserializes the action state and passes it to the correct method in [`CatalogState`].
/// ///
/// Note that this method is primarily for replaying transactions and will NOT append the given action to the /// Note that this method is primarily for replaying transactions and will NOT append the given
/// current transaction. If you want to store the given action (e.g. during an in-progress transaction), use /// action to the current transaction. If you want to store the given action (e.g. during an
/// [`record_action`](Self::record_action). /// in-progress transaction), use [`record_action`](Self::record_action).
fn handle_action<S>( fn handle_action<S>(
state: &mut S, state: &mut S,
action: &proto::transaction::action::Action, action: &proto::transaction::action::Action,
@ -808,7 +841,7 @@ impl OpenTransaction {
.fail()?; .fail()?;
} }
proto::transaction::action::Action::AddParquet(a) => { proto::transaction::action::Action::AddParquet(a) => {
let path = parse_dirs_and_filename(&a.path)?; let path = parse_dirs_and_filename(a.path.as_ref().context(PathRequired)?)?;
let file_size_bytes = a.file_size_bytes as usize; let file_size_bytes = a.file_size_bytes as usize;
let metadata = let metadata =
@ -825,8 +858,8 @@ impl OpenTransaction {
)?; )?;
} }
proto::transaction::action::Action::RemoveParquet(a) => { proto::transaction::action::Action::RemoveParquet(a) => {
let path = parse_dirs_and_filename(&a.path)?; let path = parse_dirs_and_filename(a.path.as_ref().context(PathRequired)?)?;
state.remove(path)?; state.remove(&path)?;
} }
}; };
Ok(()) Ok(())
@ -932,14 +965,16 @@ impl OpenTransaction {
/// Structure that holds all information required to create a checkpoint. /// Structure that holds all information required to create a checkpoint.
/// ///
/// Note that while checkpoint are addressed using the same schema as we use for transaction (revision counter, UUID), /// Note that while checkpoint are addressed using the same schema as we use for transaction
/// they contain the changes at the end (aka including) the transaction they refer. /// (revision counter, UUID), they contain the changes at the end (aka including) the transaction
/// they refer.
#[derive(Debug)] #[derive(Debug)]
pub struct CheckpointData { pub struct CheckpointData {
/// List of all Parquet files that are currently (i.e. by the current version) tracked by the catalog. /// List of all Parquet files that are currently (i.e. by the current version) tracked by the
/// catalog.
/// ///
/// If a file was once added but later removed it MUST NOT appear in the result. /// If a file was once added but later removed it MUST NOT appear in the result.
pub files: HashMap<DirsAndFileName, CatalogParquetInfo>, pub files: HashMap<ParquetFilePath, CatalogParquetInfo>,
} }
/// Handle for an open uncommitted transaction. /// Handle for an open uncommitted transaction.
@ -1082,7 +1117,7 @@ impl<'c> TransactionHandle<'c> {
/// Remove a parquet file from the catalog. /// Remove a parquet file from the catalog.
/// ///
/// Removing files that do not exist or were already removed will result in an error. /// Removing files that do not exist or were already removed will result in an error.
pub fn remove_parquet(&mut self, path: &DirsAndFileName) { pub fn remove_parquet(&mut self, path: &ParquetFilePath) {
self.transaction self.transaction
.as_mut() .as_mut()
.expect("transaction handle w/o transaction?!") .expect("transaction handle w/o transaction?!")
@ -1208,14 +1243,15 @@ impl<'c> Debug for CheckpointHandle<'c> {
pub mod test_helpers { pub mod test_helpers {
use super::*; use super::*;
use crate::test_utils::{chunk_addr, make_iox_object_store, make_metadata}; use crate::test_utils::{
use object_store::parsed_path; chunk_addr, make_iox_object_store, make_metadata, make_parquet_file_path,
};
/// In-memory catalog state, for testing. /// In-memory catalog state, for testing.
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct TestCatalogState { pub struct TestCatalogState {
/// Map of all parquet files that are currently pregistered. /// Map of all parquet files that are currently pregistered.
pub parquet_files: HashMap<DirsAndFileName, CatalogParquetInfo>, pub parquet_files: HashMap<ParquetFilePath, CatalogParquetInfo>,
} }
impl TestCatalogState { impl TestCatalogState {
@ -1255,8 +1291,8 @@ pub mod test_helpers {
Ok(()) Ok(())
} }
fn remove(&mut self, path: DirsAndFileName) -> Result<()> { fn remove(&mut self, path: &ParquetFilePath) -> Result<()> {
match self.parquet_files.entry(path) { match self.parquet_files.entry(path.clone()) {
Occupied(o) => { Occupied(o) => {
o.remove(); o.remove();
} }
@ -1302,15 +1338,14 @@ pub mod test_helpers {
PreservedCatalog::new_empty::<S>(Arc::clone(&iox_object_store), state_data) PreservedCatalog::new_empty::<S>(Arc::clone(&iox_object_store), state_data)
.await .await
.unwrap(); .unwrap();
let mut expected = HashMap::new(); let mut expected: HashMap<ParquetFilePath, _> = HashMap::new();
assert_checkpoint(&state, &f, &expected); assert_checkpoint(&state, &f, &expected);
// add files // add files
let mut chunk_id_watermark = 5; let mut chunk_id_watermark = 5;
{ {
for chunk_id in 0..chunk_id_watermark { for chunk_id in 0..chunk_id_watermark {
let path = parsed_path!(format!("chunk_{}", chunk_id).as_ref()); let (path, metadata) =
let (_, metadata) =
make_metadata(&iox_object_store, "ok", chunk_addr(chunk_id)).await; make_metadata(&iox_object_store, "ok", chunk_addr(chunk_id)).await;
state state
.add( .add(
@ -1329,16 +1364,15 @@ pub mod test_helpers {
// remove files // remove files
{ {
let path = parsed_path!("chunk_1"); let path = expected.keys().next().unwrap().clone();
state.remove(path.clone()).unwrap(); state.remove(&path).unwrap();
expected.remove(&path); expected.remove(&path);
} }
assert_checkpoint(&state, &f, &expected); assert_checkpoint(&state, &f, &expected);
// add and remove in the same transaction // add and remove in the same transaction
{ {
let path = parsed_path!(format!("chunk_{}", chunk_id_watermark).as_ref()); let (path, metadata) =
let (_, metadata) =
make_metadata(&iox_object_store, "ok", chunk_addr(chunk_id_watermark)).await; make_metadata(&iox_object_store, "ok", chunk_addr(chunk_id_watermark)).await;
state state
.add( .add(
@ -1350,23 +1384,23 @@ pub mod test_helpers {
}, },
) )
.unwrap(); .unwrap();
state.remove(path.clone()).unwrap(); state.remove(&path).unwrap();
chunk_id_watermark += 1; chunk_id_watermark += 1;
} }
assert_checkpoint(&state, &f, &expected); assert_checkpoint(&state, &f, &expected);
// remove and add in the same transaction // remove and add in the same transaction
{ {
let path = parsed_path!("chunk_2"); let path = expected.keys().next().unwrap().clone();
let (_, metadata) = make_metadata(&iox_object_store, "ok", chunk_addr(2)).await; let metadata = expected.get(&path).unwrap();
state.remove(path.clone()).unwrap(); state.remove(&path).unwrap();
state state
.add( .add(
Arc::clone(&iox_object_store), Arc::clone(&iox_object_store),
CatalogParquetInfo { CatalogParquetInfo {
path: path.clone(), path: path.clone(),
file_size_bytes: 33, file_size_bytes: 33,
metadata: Arc::new(metadata), metadata: Arc::clone(metadata),
}, },
) )
.unwrap(); .unwrap();
@ -1375,8 +1409,7 @@ pub mod test_helpers {
// add, remove, add in the same transaction // add, remove, add in the same transaction
{ {
let path = parsed_path!(format!("chunk_{}", chunk_id_watermark).as_ref()); let (path, metadata) =
let (_, metadata) =
make_metadata(&iox_object_store, "ok", chunk_addr(chunk_id_watermark)).await; make_metadata(&iox_object_store, "ok", chunk_addr(chunk_id_watermark)).await;
state state
.add( .add(
@ -1388,7 +1421,7 @@ pub mod test_helpers {
}, },
) )
.unwrap(); .unwrap();
state.remove(path.clone()).unwrap(); state.remove(&path).unwrap();
state state
.add( .add(
Arc::clone(&iox_object_store), Arc::clone(&iox_object_store),
@ -1406,20 +1439,20 @@ pub mod test_helpers {
// remove, add, remove in same transaction // remove, add, remove in same transaction
{ {
let path = parsed_path!("chunk_2"); let path = expected.keys().next().unwrap().clone();
let (_, metadata) = make_metadata(&iox_object_store, "ok", chunk_addr(2)).await; let metadata = expected.get(&path).unwrap();
state.remove(path.clone()).unwrap(); state.remove(&path).unwrap();
state state
.add( .add(
Arc::clone(&iox_object_store), Arc::clone(&iox_object_store),
CatalogParquetInfo { CatalogParquetInfo {
path: path.clone(), path: path.clone(),
file_size_bytes: 33, file_size_bytes: 33,
metadata: Arc::new(metadata), metadata: Arc::clone(metadata),
}, },
) )
.unwrap(); .unwrap();
state.remove(path.clone()).unwrap(); state.remove(&path).unwrap();
expected.remove(&path); expected.remove(&path);
} }
assert_checkpoint(&state, &f, &expected); assert_checkpoint(&state, &f, &expected);
@ -1427,7 +1460,7 @@ pub mod test_helpers {
// error handling, no real opt // error handling, no real opt
{ {
// already exists (should also not change the metadata) // already exists (should also not change the metadata)
let path = parsed_path!("chunk_0"); let path = expected.keys().next().unwrap();
let (_, metadata) = make_metadata(&iox_object_store, "fail", chunk_addr(0)).await; let (_, metadata) = make_metadata(&iox_object_store, "fail", chunk_addr(0)).await;
let err = state let err = state
.add( .add(
@ -1442,8 +1475,8 @@ pub mod test_helpers {
assert!(matches!(err, Error::ParquetFileAlreadyExists { .. })); assert!(matches!(err, Error::ParquetFileAlreadyExists { .. }));
// does not exist // does not exist
let path = parsed_path!(format!("chunk_{}", chunk_id_watermark).as_ref()); let path = make_parquet_file_path();
let err = state.remove(path).unwrap_err(); let err = state.remove(&path).unwrap_err();
assert!(matches!(err, Error::ParquetFileDoesNotExist { .. })); assert!(matches!(err, Error::ParquetFileDoesNotExist { .. }));
chunk_id_watermark += 1; chunk_id_watermark += 1;
} }
@ -1452,7 +1485,7 @@ pub mod test_helpers {
// error handling, still something works // error handling, still something works
{ {
// already exists (should also not change the metadata) // already exists (should also not change the metadata)
let path = parsed_path!("chunk_0"); let path = expected.keys().next().unwrap();
let (_, metadata) = make_metadata(&iox_object_store, "fail", chunk_addr(0)).await; let (_, metadata) = make_metadata(&iox_object_store, "fail", chunk_addr(0)).await;
let err = state let err = state
.add( .add(
@ -1467,8 +1500,7 @@ pub mod test_helpers {
assert!(matches!(err, Error::ParquetFileAlreadyExists { .. })); assert!(matches!(err, Error::ParquetFileAlreadyExists { .. }));
// this transaction will still work // this transaction will still work
let path = parsed_path!(format!("chunk_{}", chunk_id_watermark).as_ref()); let (path, metadata) =
let (_, metadata) =
make_metadata(&iox_object_store, "ok", chunk_addr(chunk_id_watermark)).await; make_metadata(&iox_object_store, "ok", chunk_addr(chunk_id_watermark)).await;
state state
.add( .add(
@ -1488,7 +1520,7 @@ pub mod test_helpers {
.add( .add(
Arc::clone(&iox_object_store), Arc::clone(&iox_object_store),
CatalogParquetInfo { CatalogParquetInfo {
path: path.clone(), path,
file_size_bytes: 33, file_size_bytes: 33,
metadata: Arc::new(metadata), metadata: Arc::new(metadata),
}, },
@ -1497,18 +1529,18 @@ pub mod test_helpers {
assert!(matches!(err, Error::ParquetFileAlreadyExists { .. })); assert!(matches!(err, Error::ParquetFileAlreadyExists { .. }));
// does not exist // does not exist
let path = parsed_path!(format!("chunk_{}", chunk_id_watermark).as_ref()); let path = make_parquet_file_path();
let err = state.remove(path).unwrap_err(); let err = state.remove(&path).unwrap_err();
assert!(matches!(err, Error::ParquetFileDoesNotExist { .. })); assert!(matches!(err, Error::ParquetFileDoesNotExist { .. }));
chunk_id_watermark += 1; chunk_id_watermark += 1;
// this still works // this still works
let path = parsed_path!("chunk_3"); let path = expected.keys().next().unwrap().clone();
state.remove(path.clone()).unwrap(); state.remove(&path).unwrap();
expected.remove(&path); expected.remove(&path);
// recently removed // recently removed
let err = state.remove(path).unwrap_err(); let err = state.remove(&path).unwrap_err();
assert!(matches!(err, Error::ParquetFileDoesNotExist { .. })); assert!(matches!(err, Error::ParquetFileDoesNotExist { .. }));
} }
assert_checkpoint(&state, &f, &expected); assert_checkpoint(&state, &f, &expected);
@ -1521,7 +1553,7 @@ pub mod test_helpers {
fn assert_checkpoint<S, F>( fn assert_checkpoint<S, F>(
state: &S, state: &S,
f: &F, f: &F,
expected_files: &HashMap<DirsAndFileName, Arc<IoxParquetMetaData>>, expected_files: &HashMap<ParquetFilePath, Arc<IoxParquetMetaData>>,
) where ) where
F: Fn(&S) -> CheckpointData, F: Fn(&S) -> CheckpointData,
{ {
@ -1568,8 +1600,9 @@ mod tests {
}, },
*, *,
}; };
use crate::test_utils::{chunk_addr, make_iox_object_store, make_metadata}; use crate::test_utils::{
use object_store::parsed_path; chunk_addr, make_iox_object_store, make_metadata, make_parquet_file_path,
};
#[tokio::test] #[tokio::test]
async fn test_create_empty() { async fn test_create_empty() {
@ -2226,7 +2259,7 @@ mod tests {
// create another transaction on-top that adds a file (this transaction will be required to load the full state) // create another transaction on-top that adds a file (this transaction will be required to load the full state)
{ {
let mut transaction = catalog.open_transaction().await; let mut transaction = catalog.open_transaction().await;
let path = parsed_path!("last_one"); let path = make_parquet_file_path();
let info = CatalogParquetInfo { let info = CatalogParquetInfo {
path, path,
file_size_bytes: 33, file_size_bytes: 33,
@ -2271,11 +2304,13 @@ mod tests {
} }
/// Get sorted list of catalog files from state /// Get sorted list of catalog files from state
fn get_catalog_parquet_files(state: &TestCatalogState) -> Vec<(String, IoxParquetMetaData)> { fn get_catalog_parquet_files(
let mut files: Vec<(String, IoxParquetMetaData)> = state state: &TestCatalogState,
) -> Vec<(ParquetFilePath, IoxParquetMetaData)> {
let mut files: Vec<(ParquetFilePath, IoxParquetMetaData)> = state
.parquet_files .parquet_files
.values() .values()
.map(|info| (info.path.to_string(), info.metadata.as_ref().clone())) .map(|info| (info.path.clone(), info.metadata.as_ref().clone()))
.collect(); .collect();
files.sort_by_key(|(path, _)| path.clone()); files.sort_by_key(|(path, _)| path.clone());
files files
@ -2284,9 +2319,12 @@ mod tests {
/// Assert that set of parquet files tracked by a catalog are identical to the given sorted list. /// Assert that set of parquet files tracked by a catalog are identical to the given sorted list.
fn assert_catalog_parquet_files( fn assert_catalog_parquet_files(
state: &TestCatalogState, state: &TestCatalogState,
expected: &[(String, IoxParquetMetaData)], expected: &[(ParquetFilePath, IoxParquetMetaData)],
) { ) {
let actual = get_catalog_parquet_files(state); let actual = get_catalog_parquet_files(state);
let mut expected = expected.to_vec();
expected.sort_by_key(|(path, _)| path.clone());
for ((actual_path, actual_md), (expected_path, expected_md)) in for ((actual_path, actual_md), (expected_path, expected_md)) in
actual.iter().zip(expected.iter()) actual.iter().zip(expected.iter())
{ {
@ -2388,39 +2426,39 @@ mod tests {
trace.record(&catalog, &state, false); trace.record(&catalog, &state, false);
// fill catalog with examples // fill catalog with examples
let test1 = make_parquet_file_path();
let sub1_test1 = make_parquet_file_path();
let sub1_test2 = make_parquet_file_path();
let sub2_test1 = make_parquet_file_path();
{ {
let mut t = catalog.open_transaction().await; let mut t = catalog.open_transaction().await;
let path = parsed_path!("test1");
let info = CatalogParquetInfo { let info = CatalogParquetInfo {
path, path: test1.clone(),
file_size_bytes: 33, file_size_bytes: 33,
metadata: Arc::clone(&metadata1), metadata: Arc::clone(&metadata1),
}; };
state.parquet_files.insert(info.path.clone(), info.clone()); state.parquet_files.insert(info.path.clone(), info.clone());
t.add_parquet(&info).unwrap(); t.add_parquet(&info).unwrap();
let path = parsed_path!(["sub1"], "test1");
let info = CatalogParquetInfo { let info = CatalogParquetInfo {
path, path: sub1_test1.clone(),
file_size_bytes: 33, file_size_bytes: 33,
metadata: Arc::clone(&metadata2), metadata: Arc::clone(&metadata2),
}; };
state.parquet_files.insert(info.path.clone(), info.clone()); state.parquet_files.insert(info.path.clone(), info.clone());
t.add_parquet(&info).unwrap(); t.add_parquet(&info).unwrap();
let path = parsed_path!(["sub1"], "test2");
let info = CatalogParquetInfo { let info = CatalogParquetInfo {
path, path: sub1_test2.clone(),
file_size_bytes: 33, file_size_bytes: 33,
metadata: Arc::clone(&metadata2), metadata: Arc::clone(&metadata2),
}; };
state.parquet_files.insert(info.path.clone(), info.clone()); state.parquet_files.insert(info.path.clone(), info.clone());
t.add_parquet(&info).unwrap(); t.add_parquet(&info).unwrap();
let path = parsed_path!(["sub2"], "test1");
let info = CatalogParquetInfo { let info = CatalogParquetInfo {
path, path: sub2_test1.clone(),
file_size_bytes: 33, file_size_bytes: 33,
metadata: Arc::clone(&metadata1), metadata: Arc::clone(&metadata1),
}; };
@ -2433,31 +2471,30 @@ mod tests {
assert_catalog_parquet_files( assert_catalog_parquet_files(
&state, &state,
&[ &[
("sub1/test1".to_string(), metadata2.as_ref().clone()), (sub1_test1.clone(), metadata2.as_ref().clone()),
("sub1/test2".to_string(), metadata2.as_ref().clone()), (sub1_test2.clone(), metadata2.as_ref().clone()),
("sub2/test1".to_string(), metadata1.as_ref().clone()), (sub2_test1.clone(), metadata1.as_ref().clone()),
("test1".to_string(), metadata1.as_ref().clone()), (test1.clone(), metadata1.as_ref().clone()),
], ],
); );
trace.record(&catalog, &state, false); trace.record(&catalog, &state, false);
// modify catalog with examples // modify catalog with examples
let test4 = make_parquet_file_path();
{ {
let mut t = catalog.open_transaction().await; let mut t = catalog.open_transaction().await;
// "real" modifications // "real" modifications
let path = parsed_path!("test4");
let info = CatalogParquetInfo { let info = CatalogParquetInfo {
path, path: test4.clone(),
file_size_bytes: 33, file_size_bytes: 33,
metadata: Arc::clone(&metadata1), metadata: Arc::clone(&metadata1),
}; };
state.parquet_files.insert(info.path.clone(), info.clone()); state.parquet_files.insert(info.path.clone(), info.clone());
t.add_parquet(&info).unwrap(); t.add_parquet(&info).unwrap();
let path = parsed_path!("test1"); state.parquet_files.remove(&test1);
state.parquet_files.remove(&path); t.remove_parquet(&test1);
t.remove_parquet(&path);
t.commit().await.unwrap(); t.commit().await.unwrap();
} }
@ -2465,10 +2502,10 @@ mod tests {
assert_catalog_parquet_files( assert_catalog_parquet_files(
&state, &state,
&[ &[
("sub1/test1".to_string(), metadata2.as_ref().clone()), (sub1_test1.clone(), metadata2.as_ref().clone()),
("sub1/test2".to_string(), metadata2.as_ref().clone()), (sub1_test2.clone(), metadata2.as_ref().clone()),
("sub2/test1".to_string(), metadata1.as_ref().clone()), (sub2_test1.clone(), metadata1.as_ref().clone()),
("test4".to_string(), metadata1.as_ref().clone()), (test4.clone(), metadata1.as_ref().clone()),
], ],
); );
trace.record(&catalog, &state, false); trace.record(&catalog, &state, false);
@ -2478,13 +2515,13 @@ mod tests {
let mut t = catalog.open_transaction().await; let mut t = catalog.open_transaction().await;
let info = CatalogParquetInfo { let info = CatalogParquetInfo {
path: parsed_path!("test5"), path: make_parquet_file_path(),
file_size_bytes: 33, file_size_bytes: 33,
metadata: Arc::clone(&metadata1), metadata: Arc::clone(&metadata1),
}; };
t.add_parquet(&info).unwrap(); t.add_parquet(&info).unwrap();
t.remove_parquet(&parsed_path!(["sub1"], "test2")); t.remove_parquet(&sub1_test2);
// NO commit here! // NO commit here!
} }
@ -2492,10 +2529,10 @@ mod tests {
assert_catalog_parquet_files( assert_catalog_parquet_files(
&state, &state,
&[ &[
("sub1/test1".to_string(), metadata2.as_ref().clone()), (sub1_test1.clone(), metadata2.as_ref().clone()),
("sub1/test2".to_string(), metadata2.as_ref().clone()), (sub1_test2.clone(), metadata2.as_ref().clone()),
("sub2/test1".to_string(), metadata1.as_ref().clone()), (sub2_test1.clone(), metadata1.as_ref().clone()),
("test4".to_string(), metadata1.as_ref().clone()), (test4.clone(), metadata1.as_ref().clone()),
], ],
); );
trace.record(&catalog, &state, true); trace.record(&catalog, &state, true);

View File

@ -8,7 +8,7 @@ use internal_types::{
schema::{Schema, TIME_COLUMN_NAME}, schema::{Schema, TIME_COLUMN_NAME},
selection::Selection, selection::Selection,
}; };
use iox_object_store::IoxObjectStore; use iox_object_store::{IoxObjectStore, ParquetFilePath};
use object_store::path::Path; use object_store::path::Path;
use query::predicate::Predicate; use query::predicate::Predicate;
use snafu::{ResultExt, Snafu}; use snafu::{ResultExt, Snafu};
@ -42,7 +42,7 @@ pub enum Error {
)] )]
SchemaReadFailed { SchemaReadFailed {
source: crate::metadata::Error, source: crate::metadata::Error,
path: Path, path: ParquetFilePath,
}, },
#[snafu( #[snafu(
@ -51,7 +51,7 @@ pub enum Error {
)] )]
StatisticsReadFailed { StatisticsReadFailed {
source: crate::metadata::Error, source: crate::metadata::Error,
path: Path, path: ParquetFilePath,
}, },
} }
@ -95,10 +95,8 @@ pub struct ParquetChunk {
/// Persists the parquet file within a database's relative path /// Persists the parquet file within a database's relative path
iox_object_store: Arc<IoxObjectStore>, iox_object_store: Arc<IoxObjectStore>,
/// Path in the object store. Format: /// Path in the database's object store.
/// <writer id>/<database>/data/<partition key>/<chunk path: ParquetFilePath,
/// id>/<tablename>.parquet
object_store_path: Path,
/// Size of the data, in object store /// Size of the data, in object store
file_size_bytes: usize, file_size_bytes: usize,
@ -112,7 +110,7 @@ pub struct ParquetChunk {
impl ParquetChunk { impl ParquetChunk {
/// Creates new chunk from given parquet metadata. /// Creates new chunk from given parquet metadata.
pub fn new( pub fn new(
file_location: Path, path: &ParquetFilePath,
iox_object_store: Arc<IoxObjectStore>, iox_object_store: Arc<IoxObjectStore>,
file_size_bytes: usize, file_size_bytes: usize,
parquet_metadata: Arc<IoxParquetMetaData>, parquet_metadata: Arc<IoxParquetMetaData>,
@ -120,14 +118,12 @@ impl ParquetChunk {
partition_key: Arc<str>, partition_key: Arc<str>,
metrics: ChunkMetrics, metrics: ChunkMetrics,
) -> Result<Self> { ) -> Result<Self> {
let schema = parquet_metadata.read_schema().context(SchemaReadFailed { let schema = parquet_metadata
path: &file_location, .read_schema()
})?; .context(SchemaReadFailed { path })?;
let columns = parquet_metadata let columns = parquet_metadata
.read_statistics(&schema) .read_statistics(&schema)
.context(StatisticsReadFailed { .context(StatisticsReadFailed { path })?;
path: &file_location,
})?;
let table_summary = TableSummary { let table_summary = TableSummary {
name: table_name.to_string(), name: table_name.to_string(),
columns, columns,
@ -137,7 +133,7 @@ impl ParquetChunk {
partition_key, partition_key,
Arc::new(table_summary), Arc::new(table_summary),
schema, schema,
file_location, path,
iox_object_store, iox_object_store,
file_size_bytes, file_size_bytes,
parquet_metadata, parquet_metadata,
@ -152,7 +148,7 @@ impl ParquetChunk {
partition_key: Arc<str>, partition_key: Arc<str>,
table_summary: Arc<TableSummary>, table_summary: Arc<TableSummary>,
schema: Arc<Schema>, schema: Arc<Schema>,
file_location: Path, path: &ParquetFilePath,
iox_object_store: Arc<IoxObjectStore>, iox_object_store: Arc<IoxObjectStore>,
file_size_bytes: usize, file_size_bytes: usize,
parquet_metadata: Arc<IoxParquetMetaData>, parquet_metadata: Arc<IoxParquetMetaData>,
@ -166,7 +162,7 @@ impl ParquetChunk {
schema, schema,
timestamp_range, timestamp_range,
iox_object_store, iox_object_store,
object_store_path: file_location, path: path.into(),
file_size_bytes, file_size_bytes,
parquet_metadata, parquet_metadata,
metrics, metrics,
@ -179,8 +175,8 @@ impl ParquetChunk {
} }
/// Return object store path for this chunk /// Return object store path for this chunk
pub fn path(&self) -> Path { pub fn path(&self) -> &ParquetFilePath {
self.object_store_path.clone() &self.path
} }
/// Returns the summary statistics for this chunk /// Returns the summary statistics for this chunk
@ -200,7 +196,7 @@ impl ParquetChunk {
+ self.partition_key.len() + self.partition_key.len()
+ self.table_summary.size() + self.table_summary.size()
+ mem::size_of_val(&self.schema.as_ref()) + mem::size_of_val(&self.schema.as_ref())
+ mem::size_of_val(&self.object_store_path) + mem::size_of_val(&self.path)
+ mem::size_of_val(&self.parquet_metadata) + mem::size_of_val(&self.parquet_metadata)
} }
@ -247,7 +243,7 @@ impl ParquetChunk {
predicate, predicate,
selection, selection,
Arc::clone(&self.schema.as_arrow()), Arc::clone(&self.schema.as_arrow()),
self.object_store_path.clone(), self.path.clone(),
Arc::clone(&self.iox_object_store), Arc::clone(&self.iox_object_store),
) )
.context(ReadParquet) .context(ReadParquet)

View File

@ -3,11 +3,8 @@ use std::{collections::HashSet, sync::Arc};
use crate::catalog::{CatalogParquetInfo, CatalogState, PreservedCatalog}; use crate::catalog::{CatalogParquetInfo, CatalogState, PreservedCatalog};
use futures::TryStreamExt; use futures::TryStreamExt;
use iox_object_store::IoxObjectStore; use iox_object_store::{IoxObjectStore, ParquetFilePath};
use object_store::{ use object_store::{ObjectStore, ObjectStoreApi};
path::{parsed::DirsAndFileName, Path},
ObjectStore, ObjectStoreApi,
};
use observability_deps::tracing::info; use observability_deps::tracing::info;
use parking_lot::Mutex; use parking_lot::Mutex;
use snafu::{ResultExt, Snafu}; use snafu::{ResultExt, Snafu};
@ -35,20 +32,23 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
/// The resulting vector is in no particular order. It may be passed to [`delete_files`]. /// The resulting vector is in no particular order. It may be passed to [`delete_files`].
/// ///
/// # Locking / Concurrent Actions /// # Locking / Concurrent Actions
/// While this method is running you MUST NOT create any new parquet files or modify the preserved catalog in any other ///
/// way. Hence this method needs exclusive access to the preserved catalog and the parquet file. Otherwise this method /// While this method is running you MUST NOT create any new parquet files or modify the preserved
/// may report files for deletion that you are about to write to the catalog! /// catalog in any other way. Hence this method needs exclusive access to the preserved catalog and
/// the parquet file. Otherwise this method may report files for deletion that you are about to
/// write to the catalog!
/// ///
/// **This method does NOT acquire the transaction lock!** /// **This method does NOT acquire the transaction lock!**
/// ///
/// To limit the time the exclusive access is required use `max_files` which will limit the number of files to be /// To limit the time the exclusive access is required use `max_files` which will limit the number
/// detected in this cleanup round. /// of files to be detected in this cleanup round.
/// ///
/// The exclusive access can be dropped after this method returned and before calling [`delete_files`]. /// The exclusive access can be dropped after this method returned and before calling
/// [`delete_files`].
pub async fn get_unreferenced_parquet_files( pub async fn get_unreferenced_parquet_files(
catalog: &PreservedCatalog, catalog: &PreservedCatalog,
max_files: usize, max_files: usize,
) -> Result<Vec<Path>> { ) -> Result<Vec<ParquetFilePath>> {
let iox_object_store = catalog.iox_object_store(); let iox_object_store = catalog.iox_object_store();
let all_known = { let all_known = {
// replay catalog transactions to track ALL (even dropped) files that are referenced // replay catalog transactions to track ALL (even dropped) files that are referenced
@ -61,14 +61,10 @@ pub async fn get_unreferenced_parquet_files(
state.files.into_inner() state.files.into_inner()
}; };
let prefix = iox_object_store.data_path(); // gather a list of "files to remove" eagerly so we do not block transactions on the catalog
// for too long
// gather a list of "files to remove" eagerly so we do not block transactions on the catalog for too long
let mut to_remove = vec![]; let mut to_remove = vec![];
let mut stream = iox_object_store let mut stream = iox_object_store.parquet_files().await.context(ReadError)?;
.list(Some(&prefix))
.await
.context(ReadError)?;
'outer: while let Some(paths) = stream.try_next().await.context(ReadError)? { 'outer: while let Some(paths) = stream.try_next().await.context(ReadError)? {
for path in paths { for path in paths {
@ -76,18 +72,9 @@ pub async fn get_unreferenced_parquet_files(
info!(%max_files, "reached limit of number of files to cleanup in one go"); info!(%max_files, "reached limit of number of files to cleanup in one go");
break 'outer; break 'outer;
} }
let path_parsed: DirsAndFileName = path.clone().into();
// only delete if all of the following conditions are met: // only delete if file is not tracked by the catalog
// - filename ends with `.parquet` if !all_known.contains(&path) {
// - file is not tracked by the catalog
if path_parsed
.file_name
.as_ref()
.map(|part| part.encoded().ends_with(".parquet"))
.unwrap_or(false)
&& !all_known.contains(&path_parsed)
{
to_remove.push(path); to_remove.push(path);
} }
} }
@ -100,17 +87,18 @@ pub async fn get_unreferenced_parquet_files(
/// Delete all `files` from the store linked to the preserved catalog. /// Delete all `files` from the store linked to the preserved catalog.
/// ///
/// A file might already be deleted (or entirely absent) when this method is called. This will NOT result in an error. /// A file might already be deleted (or entirely absent) when this method is called. This will NOT
/// result in an error.
/// ///
/// # Locking / Concurrent Actions /// # Locking / Concurrent Actions
/// File creation and catalog modifications can be done while calling this method. Even /// File creation and catalog modifications can be done while calling this method. Even
/// [`get_unreferenced_parquet_files`] can be called while is method is in-progress. /// [`get_unreferenced_parquet_files`] can be called while is method is in-progress.
pub async fn delete_files(catalog: &PreservedCatalog, files: &[Path]) -> Result<()> { pub async fn delete_files(catalog: &PreservedCatalog, files: &[ParquetFilePath]) -> Result<()> {
let store = catalog.iox_object_store(); let store = catalog.iox_object_store();
for path in files { for path in files {
info!(%path, "Delete file"); info!(?path, "Delete file");
store.delete(path).await.context(WriteError)?; store.delete_parquet_file(path).await.context(WriteError)?;
} }
info!(n_files = files.len(), "Finished deletion, removed files."); info!(n_files = files.len(), "Finished deletion, removed files.");
@ -120,7 +108,7 @@ pub async fn delete_files(catalog: &PreservedCatalog, files: &[Path]) -> Result<
/// Catalog state that traces all used parquet files. /// Catalog state that traces all used parquet files.
struct TracerCatalogState { struct TracerCatalogState {
files: Mutex<HashSet<DirsAndFileName>>, files: Mutex<HashSet<ParquetFilePath>>,
} }
impl CatalogState for TracerCatalogState { impl CatalogState for TracerCatalogState {
@ -141,7 +129,7 @@ impl CatalogState for TracerCatalogState {
Ok(()) Ok(())
} }
fn remove(&mut self, _path: DirsAndFileName) -> crate::catalog::Result<()> { fn remove(&mut self, _path: &ParquetFilePath) -> crate::catalog::Result<()> {
// Do NOT remove the file since we still need it for time travel // Do NOT remove the file since we still need it for time travel
Ok(()) Ok(())
} }
@ -149,17 +137,13 @@ impl CatalogState for TracerCatalogState {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::{collections::HashSet, sync::Arc};
use bytes::Bytes;
use object_store::path::Path;
use tokio::sync::RwLock;
use super::*; use super::*;
use crate::{ use crate::{
catalog::test_helpers::TestCatalogState, catalog::test_helpers::TestCatalogState,
test_utils::{chunk_addr, make_iox_object_store, make_metadata}, test_utils::{chunk_addr, make_iox_object_store, make_metadata},
}; };
use std::{collections::HashSet, sync::Arc};
use tokio::sync::RwLock;
#[tokio::test] #[tokio::test]
async fn test_cleanup_empty() { async fn test_cleanup_empty() {
@ -195,8 +179,6 @@ mod tests {
// an ordinary tracked parquet file => keep // an ordinary tracked parquet file => keep
let (path, metadata) = make_metadata(&iox_object_store, "foo", chunk_addr(1)).await; let (path, metadata) = make_metadata(&iox_object_store, "foo", chunk_addr(1)).await;
let metadata = Arc::new(metadata); let metadata = Arc::new(metadata);
paths_keep.push(path.to_string());
let path = path.into();
let info = CatalogParquetInfo { let info = CatalogParquetInfo {
path, path,
file_size_bytes: 33, file_size_bytes: 33,
@ -204,11 +186,12 @@ mod tests {
}; };
transaction.add_parquet(&info).unwrap(); transaction.add_parquet(&info).unwrap();
paths_keep.push(info.path);
// another ordinary tracked parquet file that was added and removed => keep (for time travel) // another ordinary tracked parquet file that was added and removed => keep (for time
// travel)
let (path, metadata) = make_metadata(&iox_object_store, "foo", chunk_addr(2)).await; let (path, metadata) = make_metadata(&iox_object_store, "foo", chunk_addr(2)).await;
let metadata = Arc::new(metadata); let metadata = Arc::new(metadata);
let path = path.into();
let info = CatalogParquetInfo { let info = CatalogParquetInfo {
path, path,
file_size_bytes: 33, file_size_bytes: 33,
@ -216,21 +199,11 @@ mod tests {
}; };
transaction.add_parquet(&info).unwrap(); transaction.add_parquet(&info).unwrap();
transaction.remove_parquet(&info.path); transaction.remove_parquet(&info.path);
let path_string = iox_object_store paths_keep.push(info.path);
.path_from_dirs_and_filename(info.path.clone())
.to_string();
paths_keep.push(path_string);
// not a parquet file => keep
let mut path = info.path;
path.file_name = Some("foo.txt".into());
let path = iox_object_store.path_from_dirs_and_filename(path);
create_empty_file(&iox_object_store, &path).await;
paths_keep.push(path.to_string());
// an untracked parquet file => delete // an untracked parquet file => delete
let (path, _md) = make_metadata(&iox_object_store, "foo", chunk_addr(3)).await; let (path, _md) = make_metadata(&iox_object_store, "foo", chunk_addr(3)).await;
paths_delete.push(path.to_string()); paths_delete.push(path);
transaction.commit().await.unwrap(); transaction.commit().await.unwrap();
} }
@ -266,8 +239,9 @@ mod tests {
// try multiple times to provoke a conflict // try multiple times to provoke a conflict
for i in 0..100 { for i in 0..100 {
// Every so often try to create a file with the same ChunkAddr beforehand. This should not trick the cleanup // Every so often try to create a file with the same ChunkAddr beforehand. This should
// logic to remove the actual file because file paths contains a UUIDv4 part. // not trick the cleanup logic to remove the actual file because file paths contains a
// UUIDv4 part.
if i % 2 == 0 { if i % 2 == 0 {
make_metadata(&iox_object_store, "foo", chunk_addr(i)).await; make_metadata(&iox_object_store, "foo", chunk_addr(i)).await;
} }
@ -278,7 +252,6 @@ mod tests {
let (path, md) = make_metadata(&iox_object_store, "foo", chunk_addr(i)).await; let (path, md) = make_metadata(&iox_object_store, "foo", chunk_addr(i)).await;
let metadata = Arc::new(md); let metadata = Arc::new(md);
let path = path.into();
let info = CatalogParquetInfo { let info = CatalogParquetInfo {
path, path,
file_size_bytes: 33, file_size_bytes: 33,
@ -291,9 +264,7 @@ mod tests {
drop(guard); drop(guard);
iox_object_store info.path
.path_from_dirs_and_filename(info.path)
.to_string()
}, },
async { async {
let guard = lock.write().await; let guard = lock.write().await;
@ -321,10 +292,10 @@ mod tests {
.unwrap(); .unwrap();
// create some files // create some files
let mut to_remove: HashSet<String> = Default::default(); let mut to_remove = HashSet::default();
for chunk_id in 0..3 { for chunk_id in 0..3 {
let (path, _md) = make_metadata(&iox_object_store, "foo", chunk_addr(chunk_id)).await; let (path, _md) = make_metadata(&iox_object_store, "foo", chunk_addr(chunk_id)).await;
to_remove.insert(path.to_string()); to_remove.insert(path);
} }
// run clean-up // run clean-up
@ -348,30 +319,15 @@ mod tests {
assert_eq!(leftover.len(), 0); assert_eq!(leftover.len(), 0);
} }
async fn create_empty_file(iox_object_store: &IoxObjectStore, path: &Path) { async fn list_all_files(iox_object_store: &IoxObjectStore) -> HashSet<ParquetFilePath> {
let data = Bytes::default();
let len = data.len();
iox_object_store iox_object_store
.put( .parquet_files()
path,
futures::stream::once(async move { Ok(data) }),
Some(len),
)
.await
.unwrap();
}
async fn list_all_files(iox_object_store: &IoxObjectStore) -> HashSet<String> {
iox_object_store
.list(None)
.await .await
.unwrap() .unwrap()
.try_concat() .try_concat()
.await .await
.unwrap() .unwrap()
.iter() .into_iter()
.map(|p| p.to_string())
.collect() .collect()
} }
} }

View File

@ -2,8 +2,7 @@
use std::{fmt::Debug, sync::Arc}; use std::{fmt::Debug, sync::Arc};
use futures::TryStreamExt; use futures::TryStreamExt;
use iox_object_store::IoxObjectStore; use iox_object_store::{IoxObjectStore, ParquetFilePath};
use object_store::path::{parsed::DirsAndFileName, Path};
use observability_deps::tracing::error; use observability_deps::tracing::error;
use snafu::{ResultExt, Snafu}; use snafu::{ResultExt, Snafu};
@ -22,7 +21,7 @@ pub enum Error {
#[snafu(display("Cannot read IOx metadata from parquet file ({:?}): {}", path, source))] #[snafu(display("Cannot read IOx metadata from parquet file ({:?}): {}", path, source))]
MetadataReadFailure { MetadataReadFailure {
source: crate::metadata::Error, source: crate::metadata::Error,
path: Path, path: ParquetFilePath,
}, },
#[snafu(display("Cannot add file to transaction: {}", source))] #[snafu(display("Cannot add file to transaction: {}", source))]
@ -103,15 +102,17 @@ async fn collect_files(
iox_object_store: &IoxObjectStore, iox_object_store: &IoxObjectStore,
ignore_metadata_read_failure: bool, ignore_metadata_read_failure: bool,
) -> Result<Vec<CatalogParquetInfo>> { ) -> Result<Vec<CatalogParquetInfo>> {
let mut stream = iox_object_store.list(None).await.context(ReadFailure)?; let mut stream = iox_object_store
.parquet_files()
.await
.context(ReadFailure)?;
let mut files = vec![]; let mut files = vec![];
while let Some(paths) = stream.try_next().await.context(ReadFailure)? { while let Some(paths) = stream.try_next().await.context(ReadFailure)? {
for path in paths.into_iter().filter(is_parquet) { for path in paths {
match read_parquet(iox_object_store, &path).await { match read_parquet(iox_object_store, &path).await {
Ok((file_size_bytes, metadata)) => { Ok((file_size_bytes, metadata)) => {
let path = path.into();
files.push(CatalogParquetInfo { files.push(CatalogParquetInfo {
path, path,
file_size_bytes, file_size_bytes,
@ -130,23 +131,13 @@ async fn collect_files(
Ok(files) Ok(files)
} }
/// Checks if the given path is (likely) a parquet file.
fn is_parquet(path: &Path) -> bool {
let path: DirsAndFileName = path.clone().into();
if let Some(filename) = path.file_name {
filename.encoded().ends_with(".parquet")
} else {
false
}
}
/// Read Parquet and IOx metadata from given path. /// Read Parquet and IOx metadata from given path.
async fn read_parquet( async fn read_parquet(
iox_object_store: &IoxObjectStore, iox_object_store: &IoxObjectStore,
path: &Path, path: &ParquetFilePath,
) -> Result<(usize, Arc<IoxParquetMetaData>)> { ) -> Result<(usize, Arc<IoxParquetMetaData>)> {
let data = iox_object_store let data = iox_object_store
.get(path) .get_parquet_file(path)
.await .await
.context(ReadFailure)? .context(ReadFailure)?
.map_ok(|bytes| bytes.to_vec()) .map_ok(|bytes| bytes.to_vec())
@ -186,6 +177,7 @@ mod tests {
storage::Storage, storage::Storage,
test_utils::{make_iox_object_store, make_record_batch}, test_utils::{make_iox_object_store, make_record_batch},
}; };
use object_store::path::parsed::DirsAndFileName;
#[tokio::test] #[tokio::test]
async fn test_rebuild_successfull() { async fn test_rebuild_successfull() {
@ -397,7 +389,7 @@ mod tests {
.unwrap(); .unwrap();
CatalogParquetInfo { CatalogParquetInfo {
path: path.into(), path,
file_size_bytes, file_size_bytes,
metadata: Arc::new(metadata), metadata: Arc::new(metadata),
} }
@ -406,7 +398,7 @@ mod tests {
pub async fn create_parquet_file_without_metadata( pub async fn create_parquet_file_without_metadata(
iox_object_store: &Arc<IoxObjectStore>, iox_object_store: &Arc<IoxObjectStore>,
chunk_id: u32, chunk_id: u32,
) -> (DirsAndFileName, IoxParquetMetaData) { ) -> (ParquetFilePath, IoxParquetMetaData) {
let (record_batches, schema, _column_summaries, _num_rows) = make_record_batch("foo"); let (record_batches, schema, _column_summaries, _num_rows) = make_record_batch("foo");
let mut stream: SendableRecordBatchStream = Box::pin(MemoryStream::new(record_batches)); let mut stream: SendableRecordBatchStream = Box::pin(MemoryStream::new(record_batches));
@ -424,15 +416,15 @@ mod tests {
let data = mem_writer.into_inner().unwrap(); let data = mem_writer.into_inner().unwrap();
let md = IoxParquetMetaData::from_file_bytes(data.clone()).unwrap(); let md = IoxParquetMetaData::from_file_bytes(data.clone()).unwrap();
let storage = Storage::new(Arc::clone(iox_object_store)); let storage = Storage::new(Arc::clone(iox_object_store));
let path = storage.location(&ChunkAddr { let chunk_addr = ChunkAddr {
db_name: Arc::from(iox_object_store.database_name()), db_name: Arc::from(iox_object_store.database_name()),
table_name: Arc::from("table1"), table_name: Arc::from("table1"),
partition_key: Arc::from("part1"), partition_key: Arc::from("part1"),
chunk_id, chunk_id,
}); };
let path = ParquetFilePath::new(&chunk_addr);
storage.to_object_store(data, &path).await.unwrap(); storage.to_object_store(data, &path).await.unwrap();
let path: DirsAndFileName = path.into();
(path, md) (path, md)
} }
} }

View File

@ -13,8 +13,8 @@ use datafusion::{
}; };
use futures::StreamExt; use futures::StreamExt;
use internal_types::selection::Selection; use internal_types::selection::Selection;
use iox_object_store::IoxObjectStore; use iox_object_store::{IoxObjectStore, ParquetFilePath};
use object_store::path::{parsed::DirsAndFileName, ObjectStorePath, Path}; use object_store::path::parsed::DirsAndFileName;
use observability_deps::tracing::debug; use observability_deps::tracing::debug;
use parking_lot::Mutex; use parking_lot::Mutex;
use parquet::{ use parquet::{
@ -29,7 +29,6 @@ use std::{
io::{Cursor, Seek, SeekFrom, Write}, io::{Cursor, Seek, SeekFrom, Write},
sync::Arc, sync::Arc,
}; };
use uuid::Uuid;
use crate::metadata::{IoxMetadata, IoxParquetMetaData, METADATA_KEY}; use crate::metadata::{IoxMetadata, IoxParquetMetaData, METADATA_KEY};
@ -137,29 +136,6 @@ impl Storage {
Self { iox_object_store } Self { iox_object_store }
} }
/// Return full path including filename in the object store to save a chunk
/// table file.
///
/// Paths generated by this method are unique and calling the method twice with the same `addr` will yield different
/// outputs.
///
/// **Important: The resulting path should be treated as a black box. It might vary over time and is an
/// implementation detail. Do NOT attempt to parse it.**
pub fn location(&self, chunk_addr: &ChunkAddr) -> object_store::path::Path {
// generate random UUID so that files are unique and never overwritten
let uuid = Uuid::new_v4();
let mut path = self.iox_object_store.data_path();
path.push_dir(chunk_addr.table_name.as_ref());
path.push_dir(chunk_addr.partition_key.as_ref());
path.set_file_name(format!(
"{}.{}.parquet",
chunk_addr.chunk_id,
uuid.to_string()
));
path
}
/// Write the given stream of data of a specified table of /// Write the given stream of data of a specified table of
/// a specified partitioned chunk to a parquet file of this storage /// a specified partitioned chunk to a parquet file of this storage
/// ///
@ -170,9 +146,9 @@ impl Storage {
chunk_addr: ChunkAddr, chunk_addr: ChunkAddr,
stream: SendableRecordBatchStream, stream: SendableRecordBatchStream,
metadata: IoxMetadata, metadata: IoxMetadata,
) -> Result<(Path, usize, IoxParquetMetaData)> { ) -> Result<(ParquetFilePath, usize, IoxParquetMetaData)> {
// Create full path location of this file in object store // Create full path location of this file in object store
let path = self.location(&chunk_addr); let path = ParquetFilePath::new(&chunk_addr);
let schema = stream.schema(); let schema = stream.schema();
let data = Self::parquet_stream_to_bytes(stream, schema, metadata).await?; let data = Self::parquet_stream_to_bytes(stream, schema, metadata).await?;
@ -220,18 +196,14 @@ impl Storage {
} }
/// Put the given vector of bytes to the specified location /// Put the given vector of bytes to the specified location
pub async fn to_object_store( pub async fn to_object_store(&self, data: Vec<u8>, path: &ParquetFilePath) -> Result<()> {
&self,
data: Vec<u8>,
file_name: &object_store::path::Path,
) -> Result<()> {
let len = data.len(); let len = data.len();
let data = Bytes::from(data); let data = Bytes::from(data);
let stream_data = Result::Ok(data); let stream_data = Result::Ok(data);
self.iox_object_store self.iox_object_store
.put( .put_parquet_file(
file_name, path,
futures::stream::once(async move { stream_data }), futures::stream::once(async move { stream_data }),
Some(len), Some(len),
) )
@ -266,7 +238,7 @@ impl Storage {
async fn download_and_scan_parquet( async fn download_and_scan_parquet(
predicate: Option<Expr>, predicate: Option<Expr>,
projection: Vec<usize>, projection: Vec<usize>,
path: Path, path: ParquetFilePath,
store: Arc<IoxObjectStore>, store: Arc<IoxObjectStore>,
tx: tokio::sync::mpsc::Sender<ArrowResult<RecordBatch>>, tx: tokio::sync::mpsc::Sender<ArrowResult<RecordBatch>>,
) -> Result<()> { ) -> Result<()> {
@ -288,7 +260,10 @@ impl Storage {
.context(OpenTempFile)?; .context(OpenTempFile)?;
debug!(?path, ?temp_file, "Beginning to read parquet to temp file"); debug!(?path, ?temp_file, "Beginning to read parquet to temp file");
let mut read_stream = store.get(&path).await.context(ReadingObjectStore)?; let mut read_stream = store
.get_parquet_file(&path)
.await
.context(ReadingObjectStore)?;
while let Some(bytes) = read_stream.next().await { while let Some(bytes) = read_stream.next().await {
let bytes = bytes.context(ReadingObjectStore)?; let bytes = bytes.context(ReadingObjectStore)?;
@ -345,7 +320,7 @@ impl Storage {
predicate: &Predicate, predicate: &Predicate,
selection: Selection<'_>, selection: Selection<'_>,
schema: SchemaRef, schema: SchemaRef,
path: Path, path: ParquetFilePath,
store: Arc<IoxObjectStore>, store: Arc<IoxObjectStore>,
) -> Result<SendableRecordBatchStream> { ) -> Result<SendableRecordBatchStream> {
// fire up a async task that will fetch the parquet file // fire up a async task that will fetch the parquet file
@ -568,22 +543,6 @@ mod tests {
assert_batches_eq!(&expected, &read_batches); assert_batches_eq!(&expected, &read_batches);
} }
#[test]
fn test_locations_are_unique() {
let iox_object_store = make_iox_object_store();
let storage = Storage::new(Arc::clone(&iox_object_store));
let chunk_addr = ChunkAddr {
db_name: iox_object_store.database_name().into(),
table_name: Arc::from("my_table"),
partition_key: Arc::from("my_partition"),
chunk_id: 13,
};
let l1 = storage.location(&chunk_addr);
let l2 = storage.location(&chunk_addr);
assert_ne!(l1, l2);
}
#[test] #[test]
fn test_props_have_compression() { fn test_props_have_compression() {
// should be writing with compression // should be writing with compression

View File

@ -25,8 +25,8 @@ use internal_types::{
schema::{builder::SchemaBuilder, Schema, TIME_COLUMN_NAME}, schema::{builder::SchemaBuilder, Schema, TIME_COLUMN_NAME},
selection::Selection, selection::Selection,
}; };
use iox_object_store::IoxObjectStore; use iox_object_store::{IoxObjectStore, ParquetFilePath};
use object_store::{path::Path, ObjectStore}; use object_store::ObjectStore;
use parquet::{ use parquet::{
arrow::{ArrowReader, ParquetFileArrowReader}, arrow::{ArrowReader, ParquetFileArrowReader},
file::serialized_reader::{SerializedFileReader, SliceableCursor}, file::serialized_reader::{SerializedFileReader, SliceableCursor},
@ -65,15 +65,15 @@ pub async fn load_parquet_from_store_for_chunk(
store: Arc<IoxObjectStore>, store: Arc<IoxObjectStore>,
) -> Result<Vec<u8>> { ) -> Result<Vec<u8>> {
let path = chunk.path(); let path = chunk.path();
Ok(load_parquet_from_store_for_path(&path, store).await?) Ok(load_parquet_from_store_for_path(path, store).await?)
} }
pub async fn load_parquet_from_store_for_path( pub async fn load_parquet_from_store_for_path(
path: &Path, path: &ParquetFilePath,
store: Arc<IoxObjectStore>, store: Arc<IoxObjectStore>,
) -> Result<Vec<u8>> { ) -> Result<Vec<u8>> {
let parquet_data = store let parquet_data = store
.get(path) .get_parquet_file(path)
.await .await
.context(GettingDataFromObjectStore)? .context(GettingDataFromObjectStore)?
.map_ok(|bytes| bytes.to_vec()) .map_ok(|bytes| bytes.to_vec())
@ -173,7 +173,7 @@ pub async fn make_chunk_given_record_batch(
addr.partition_key, addr.partition_key,
Arc::new(table_summary), Arc::new(table_summary),
Arc::new(schema), Arc::new(schema),
path, &path,
Arc::clone(&iox_object_store), Arc::clone(&iox_object_store),
file_size_bytes, file_size_bytes,
Arc::new(parquet_metadata), Arc::new(parquet_metadata),
@ -764,6 +764,12 @@ pub fn read_data_from_parquet_data(schema: SchemaRef, parquet_data: Vec<u8>) ->
record_batches record_batches
} }
/// Create an arbitrary ParquetFilePath
pub fn make_parquet_file_path() -> ParquetFilePath {
let chunk_addr = chunk_addr(3);
ParquetFilePath::new(&chunk_addr)
}
/// Create test metadata by creating a parquet file and reading it back into memory. /// Create test metadata by creating a parquet file and reading it back into memory.
/// ///
/// See [`make_chunk`] for details. /// See [`make_chunk`] for details.
@ -771,13 +777,13 @@ pub async fn make_metadata(
iox_object_store: &Arc<IoxObjectStore>, iox_object_store: &Arc<IoxObjectStore>,
column_prefix: &str, column_prefix: &str,
addr: ChunkAddr, addr: ChunkAddr,
) -> (Path, IoxParquetMetaData) { ) -> (ParquetFilePath, IoxParquetMetaData) {
let chunk = make_chunk(Arc::clone(iox_object_store), column_prefix, addr).await; let chunk = make_chunk(Arc::clone(iox_object_store), column_prefix, addr).await;
let parquet_data = load_parquet_from_store(&chunk, Arc::clone(iox_object_store)) let parquet_data = load_parquet_from_store(&chunk, Arc::clone(iox_object_store))
.await .await
.unwrap(); .unwrap();
( (
chunk.path(), chunk.path().clone(),
IoxParquetMetaData::from_file_bytes(parquet_data).unwrap(), IoxParquetMetaData::from_file_bytes(parquet_data).unwrap(),
) )
} }

View File

@ -31,7 +31,6 @@ use internal_types::schema::Schema;
use iox_object_store::IoxObjectStore; use iox_object_store::IoxObjectStore;
use metrics::KeyValue; use metrics::KeyValue;
use mutable_buffer::chunk::{ChunkMetrics as MutableBufferChunkMetrics, MBChunk}; use mutable_buffer::chunk::{ChunkMetrics as MutableBufferChunkMetrics, MBChunk};
use object_store::path::parsed::DirsAndFileName;
use observability_deps::tracing::{debug, error, info}; use observability_deps::tracing::{debug, error, info};
use parking_lot::{Mutex, RwLock}; use parking_lot::{Mutex, RwLock};
use parquet_file::{ use parquet_file::{
@ -1407,7 +1406,7 @@ pub(crate) fn checkpoint_data_from_catalog(catalog: &Catalog) -> CheckpointData
for chunk in catalog.chunks() { for chunk in catalog.chunks() {
let guard = chunk.read(); let guard = chunk.read();
if let ChunkStage::Persisted { parquet, .. } = guard.stage() { if let ChunkStage::Persisted { parquet, .. } = guard.stage() {
let path: DirsAndFileName = parquet.path().into(); let path = parquet.path().clone();
let m = CatalogParquetInfo { let m = CatalogParquetInfo {
path: path.clone(), path: path.clone(),
@ -1510,7 +1509,7 @@ mod tests {
use bytes::Bytes; use bytes::Bytes;
use chrono::{DateTime, TimeZone}; use chrono::{DateTime, TimeZone};
use data_types::{ use data_types::{
chunk_metadata::ChunkStorage, chunk_metadata::{ChunkAddr, ChunkStorage},
database_rules::{LifecycleRules, PartitionTemplate, TemplatePart}, database_rules::{LifecycleRules, PartitionTemplate, TemplatePart},
partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics, TableSummary}, partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics, TableSummary},
write_summary::TimestampSummary, write_summary::TimestampSummary,
@ -1518,10 +1517,8 @@ mod tests {
use entry::{test_helpers::lp_to_entry, Sequence}; use entry::{test_helpers::lp_to_entry, Sequence};
use futures::{stream, StreamExt, TryStreamExt}; use futures::{stream, StreamExt, TryStreamExt};
use internal_types::{schema::Schema, selection::Selection}; use internal_types::{schema::Schema, selection::Selection};
use object_store::{ use iox_object_store::ParquetFilePath;
path::{parts::PathPart, Path}, use object_store::{path::parsed::DirsAndFileName, ObjectStore, ObjectStoreApi};
ObjectStore, ObjectStoreApi,
};
use parquet_file::{ use parquet_file::{
catalog::test_helpers::TestCatalogState, catalog::test_helpers::TestCatalogState,
metadata::IoxParquetMetaData, metadata::IoxParquetMetaData,
@ -1530,7 +1527,7 @@ mod tests {
use persistence_windows::min_max_sequence::MinMaxSequence; use persistence_windows::min_max_sequence::MinMaxSequence;
use query::{frontend::sql::SqlQueryPlanner, QueryChunk, QueryDatabase}; use query::{frontend::sql::SqlQueryPlanner, QueryChunk, QueryDatabase};
use std::{ use std::{
collections::{BTreeMap, HashSet}, collections::BTreeMap,
convert::TryFrom, convert::TryFrom,
iter::Iterator, iter::Iterator,
num::{NonZeroU32, NonZeroU64, NonZeroUsize}, num::{NonZeroU32, NonZeroU64, NonZeroUsize},
@ -2591,12 +2588,9 @@ mod tests {
); );
} }
async fn flatten_list_stream( async fn parquet_files(iox_storage: &IoxObjectStore) -> Result<Vec<ParquetFilePath>> {
storage: Arc<ObjectStore>, iox_storage
prefix: Option<&Path>, .parquet_files()
) -> Result<Vec<Path>> {
storage
.list(prefix)
.await? .await?
.map_ok(|v| stream::iter(v).map(Ok)) .map_ok(|v| stream::iter(v).map(Ok))
.try_flatten() .try_flatten()
@ -2679,11 +2673,9 @@ mod tests {
let path = pq_chunk.object_store_path().unwrap(); let path = pq_chunk.object_store_path().unwrap();
// Check that the path must exist in the object store // Check that the path must exist in the object store
let path_list = flatten_list_stream(Arc::clone(&object_store), Some(&path)) let path_list = parquet_files(&db.iox_object_store).await.unwrap();
.await
.unwrap();
assert_eq!(path_list.len(), 1); assert_eq!(path_list.len(), 1);
assert_eq!(path_list[0], path); assert_eq!(&path_list[0], path);
// Now read data from that path // Now read data from that path
let parquet_data = let parquet_data =
@ -2821,12 +2813,10 @@ mod tests {
let path = pq_chunk.object_store_path().unwrap(); let path = pq_chunk.object_store_path().unwrap();
// Check that the path must exist in the object store // Check that the path must exist in the object store
let path_list = flatten_list_stream(Arc::clone(&object_store), Some(&path)) let path_list = parquet_files(&db.iox_object_store).await.unwrap();
.await
.unwrap();
println!("path_list: {:#?}", path_list); println!("path_list: {:#?}", path_list);
assert_eq!(path_list.len(), 1); assert_eq!(path_list.len(), 1);
assert_eq!(path_list[0], path); assert_eq!(&path_list[0], path);
// Now read data from that path // Now read data from that path
let parquet_data = let parquet_data =
@ -3913,7 +3903,7 @@ mod tests {
let chunk = db.chunk(table_name, partition_key, *chunk_id).unwrap(); let chunk = db.chunk(table_name, partition_key, *chunk_id).unwrap();
let chunk = chunk.read(); let chunk = chunk.read();
if let ChunkStage::Persisted { parquet, .. } = chunk.stage() { if let ChunkStage::Persisted { parquet, .. } = chunk.stage() {
paths_expected.push(parquet.path().to_string()); paths_expected.push(parquet.path().clone());
} else { } else {
panic!("Wrong chunk state."); panic!("Wrong chunk state.");
} }
@ -3925,15 +3915,7 @@ mod tests {
.unwrap() .unwrap()
.unwrap(); .unwrap();
let paths_actual = { let paths_actual = {
let mut tmp: Vec<String> = catalog let mut tmp: Vec<_> = catalog.parquet_files.keys().cloned().collect();
.parquet_files
.keys()
.map(|p| {
object_store
.path_from_dirs_and_filename(p.clone())
.to_string()
})
.collect();
tmp.sort(); tmp.sort();
tmp tmp
}; };
@ -4010,7 +3992,7 @@ mod tests {
let chunk = db.chunk(&table_name, &partition_key, chunk_id).unwrap(); let chunk = db.chunk(&table_name, &partition_key, chunk_id).unwrap();
let chunk = chunk.read(); let chunk = chunk.read();
if let ChunkStage::Persisted { parquet, .. } = chunk.stage() { if let ChunkStage::Persisted { parquet, .. } = chunk.stage() {
paths_keep.push(parquet.path()); paths_keep.push(parquet.path().clone());
} else { } else {
panic!("Wrong chunk state."); panic!("Wrong chunk state.");
} }
@ -4030,18 +4012,18 @@ mod tests {
} }
// ==================== do: create garbage ==================== // ==================== do: create garbage ====================
let mut path: DirsAndFileName = paths_keep[0].clone().into(); let path_delete = ParquetFilePath::new(&ChunkAddr {
path.file_name = Some(PathPart::from( table_name: "cpu".into(),
format!("prefix_{}", path.file_name.unwrap().encoded()).as_ref(), partition_key: "123".into(),
)); chunk_id: 3,
let path_delete = object_store.path_from_dirs_and_filename(path); db_name: "not used".into(),
create_empty_file(&object_store, &path_delete).await; });
let path_delete = path_delete.to_string(); create_empty_file(&db.iox_object_store, &path_delete).await;
// ==================== check: all files are there ==================== // ==================== check: all files are there ====================
let all_files = get_object_store_files(&object_store).await; let all_files = parquet_files(&db.iox_object_store).await.unwrap();
for path in &paths_keep { for path in &paths_keep {
assert!(all_files.contains(&path.to_string())); assert!(all_files.contains(path));
} }
// ==================== do: start background task loop ==================== // ==================== do: start background task loop ====================
@ -4054,7 +4036,7 @@ mod tests {
// ==================== check: after a while the dropped file should be gone ==================== // ==================== check: after a while the dropped file should be gone ====================
let t_0 = Instant::now(); let t_0 = Instant::now();
loop { loop {
let all_files = get_object_store_files(&object_store).await; let all_files = parquet_files(&db.iox_object_store).await.unwrap();
if !all_files.contains(&path_delete) { if !all_files.contains(&path_delete) {
break; break;
} }
@ -4067,10 +4049,10 @@ mod tests {
join_handle.await.unwrap(); join_handle.await.unwrap();
// ==================== check: some files are there ==================== // ==================== check: some files are there ====================
let all_files = get_object_store_files(&object_store).await; let all_files = parquet_files(&db.iox_object_store).await.unwrap();
assert!(!all_files.contains(&path_delete)); assert!(!all_files.contains(&path_delete));
for path in &paths_keep { for path in &paths_keep {
assert!(all_files.contains(&path.to_string())); assert!(all_files.contains(path));
} }
} }
@ -4359,25 +4341,12 @@ mod tests {
(table_name.to_string(), partition_key.to_string(), chunk_id) (table_name.to_string(), partition_key.to_string(), chunk_id)
} }
async fn get_object_store_files(object_store: &ObjectStore) -> HashSet<String> { async fn create_empty_file(iox_object_store: &IoxObjectStore, path: &ParquetFilePath) {
object_store
.list(None)
.await
.unwrap()
.try_concat()
.await
.unwrap()
.iter()
.map(|p| p.to_string())
.collect()
}
async fn create_empty_file(object_store: &ObjectStore, path: &Path) {
let data = Bytes::default(); let data = Bytes::default();
let len = data.len(); let len = data.len();
object_store iox_object_store
.put( .put_parquet_file(
path, path,
futures::stream::once(async move { Ok(data) }), futures::stream::once(async move { Ok(data) }),
Some(len), Some(len),

View File

@ -10,8 +10,8 @@ use internal_types::{
schema::{sort::SortKey, Schema}, schema::{sort::SortKey, Schema},
selection::Selection, selection::Selection,
}; };
use iox_object_store::ParquetFilePath;
use mutable_buffer::chunk::snapshot::ChunkSnapshot; use mutable_buffer::chunk::snapshot::ChunkSnapshot;
use object_store::path::Path;
use observability_deps::tracing::debug; use observability_deps::tracing::debug;
use parquet_file::chunk::ParquetChunk; use parquet_file::chunk::ParquetChunk;
use partition_metadata::TableSummary; use partition_metadata::TableSummary;
@ -198,7 +198,7 @@ impl DbChunk {
/// Return the Path in ObjectStorage where this chunk is /// Return the Path in ObjectStorage where this chunk is
/// persisted, if any /// persisted, if any
pub fn object_store_path(&self) -> Option<Path> { pub fn object_store_path(&self) -> Option<&ParquetFilePath> {
match &self.state { match &self.state {
State::ParquetFile { chunk } => Some(chunk.path()), State::ParquetFile { chunk } => Some(chunk.path()),
_ => None, _ => None,

View File

@ -3,7 +3,6 @@ use std::sync::Arc;
use data_types::job::Job; use data_types::job::Job;
use futures::Future; use futures::Future;
use lifecycle::{LifecycleWriteGuard, LockableChunk}; use lifecycle::{LifecycleWriteGuard, LockableChunk};
use object_store::path::parsed::DirsAndFileName;
use observability_deps::tracing::debug; use observability_deps::tracing::debug;
use snafu::ResultExt; use snafu::ResultExt;
use tracker::{TaskTracker, TrackedFuture, TrackedFutureExt}; use tracker::{TaskTracker, TrackedFuture, TrackedFutureExt};
@ -58,10 +57,12 @@ pub fn drop_chunk(
let chunk_read = chunk.read(); let chunk_read = chunk.read();
if let ChunkStage::Persisted { parquet, .. } = chunk_read.stage() { if let ChunkStage::Persisted { parquet, .. } = chunk_read.stage() {
let path: DirsAndFileName = parquet.path().into(); Some(parquet.path().clone())
Some(path)
} else if lifecycle_persist { } else if lifecycle_persist {
unreachable!("Unpersisted chunks in a persisted DB should be ruled out before doing any work.") unreachable!(
"Unpersisted chunks in a persisted DB should be ruled out \
before doing any work."
)
} else { } else {
None None
} }
@ -168,10 +169,12 @@ pub fn drop_partition(
let chunk_read = chunk.read(); let chunk_read = chunk.read();
if let ChunkStage::Persisted { parquet, .. } = chunk_read.stage() { if let ChunkStage::Persisted { parquet, .. } = chunk_read.stage() {
let path: DirsAndFileName = parquet.path().into(); paths.push(parquet.path().clone());
paths.push(path);
} else if lifecycle_persist { } else if lifecycle_persist {
unreachable!("Unpersisted chunks in a persisted DB should be ruled out before doing any work.") unreachable!(
"Unpersisted chunks in a persisted DB should be ruled out \
before doing any work."
)
} }
} }

View File

@ -15,7 +15,6 @@ use ::lifecycle::LifecycleWriteGuard;
use chrono::Utc; use chrono::Utc;
use data_types::{chunk_metadata::ChunkLifecycleAction, job::Job}; use data_types::{chunk_metadata::ChunkLifecycleAction, job::Job};
use internal_types::selection::Selection; use internal_types::selection::Selection;
use object_store::path::parsed::DirsAndFileName;
use observability_deps::tracing::{debug, warn}; use observability_deps::tracing::{debug, warn};
use parquet_file::{ use parquet_file::{
catalog::CatalogParquetInfo, catalog::CatalogParquetInfo,
@ -139,7 +138,7 @@ pub(super) fn write_chunk_to_object_store(
let metrics = ParquetChunkMetrics::new(&metrics); let metrics = ParquetChunkMetrics::new(&metrics);
let parquet_chunk = Arc::new( let parquet_chunk = Arc::new(
ParquetChunk::new( ParquetChunk::new(
path.clone(), &path,
Arc::clone(&db.iox_object_store), Arc::clone(&db.iox_object_store),
file_size_bytes, file_size_bytes,
Arc::clone(&parquet_metadata), Arc::clone(&parquet_metadata),
@ -150,12 +149,10 @@ pub(super) fn write_chunk_to_object_store(
.context(ParquetChunkError)?, .context(ParquetChunkError)?,
); );
let path: DirsAndFileName = path.into(); // IMPORTANT: Start transaction AFTER writing the actual parquet file so we do not hold
// the transaction lock (that is part of the PreservedCatalog) for too long.
// IMPORTANT: Start transaction AFTER writing the actual parquet file so we do not hold the // By using the cleanup lock (see above) it is ensured that the file that we
// transaction lock (that is part of the PreservedCatalog) for too long. By using the // have written is not deleted in between.
// cleanup lock (see above) it is ensured that the file that we have written is not deleted
// in between.
let mut transaction = db.preserved_catalog.open_transaction().await; let mut transaction = db.preserved_catalog.open_transaction().await;
let info = CatalogParquetInfo { let info = CatalogParquetInfo {
path, path,

View File

@ -3,9 +3,8 @@
use super::catalog::{chunk::ChunkStage, table::TableSchemaUpsertHandle, Catalog}; use super::catalog::{chunk::ChunkStage, table::TableSchemaUpsertHandle, Catalog};
use data_types::server_id::ServerId; use data_types::server_id::ServerId;
use iox_object_store::IoxObjectStore; use iox_object_store::{IoxObjectStore, ParquetFilePath};
use metrics::{KeyValue, MetricRegistry}; use metrics::{KeyValue, MetricRegistry};
use object_store::path::parsed::DirsAndFileName;
use observability_deps::tracing::{error, info}; use observability_deps::tracing::{error, info};
use parquet_file::{ use parquet_file::{
catalog::{CatalogParquetInfo, CatalogState, ChunkCreationFailed, PreservedCatalog}, catalog::{CatalogParquetInfo, CatalogState, ChunkCreationFailed, PreservedCatalog},
@ -232,7 +231,7 @@ impl CatalogState for Loader {
let metrics = ParquetChunkMetrics::new(&metrics); let metrics = ParquetChunkMetrics::new(&metrics);
let parquet_chunk = ParquetChunk::new( let parquet_chunk = ParquetChunk::new(
iox_object_store.path_from_dirs_and_filename(info.path.clone()), &info.path,
iox_object_store, iox_object_store,
info.file_size_bytes, info.file_size_bytes,
info.metadata, info.metadata,
@ -240,9 +239,7 @@ impl CatalogState for Loader {
Arc::clone(&iox_md.partition_key), Arc::clone(&iox_md.partition_key),
metrics, metrics,
) )
.context(ChunkCreationFailed { .context(ChunkCreationFailed { path: &info.path })?;
path: info.path.clone(),
})?;
let parquet_chunk = Arc::new(parquet_chunk); let parquet_chunk = Arc::new(parquet_chunk);
// Get partition from the catalog // Get partition from the catalog
@ -268,7 +265,7 @@ impl CatalogState for Loader {
Ok(()) Ok(())
} }
fn remove(&mut self, path: DirsAndFileName) -> parquet_file::catalog::Result<()> { fn remove(&mut self, path: &ParquetFilePath) -> parquet_file::catalog::Result<()> {
let mut removed_any = false; let mut removed_any = false;
for partition in self.catalog.partitions() { for partition in self.catalog.partitions() {
@ -278,8 +275,7 @@ impl CatalogState for Loader {
for chunk in partition.chunks() { for chunk in partition.chunks() {
let chunk = chunk.read(); let chunk = chunk.read();
if let ChunkStage::Persisted { parquet, .. } = chunk.stage() { if let ChunkStage::Persisted { parquet, .. } = chunk.stage() {
let chunk_path: DirsAndFileName = parquet.path().into(); if path == parquet.path() {
if path == chunk_path {
to_remove.push(chunk.id()); to_remove.push(chunk.id());
} }
} }
@ -296,7 +292,7 @@ impl CatalogState for Loader {
if removed_any { if removed_any {
Ok(()) Ok(())
} else { } else {
Err(parquet_file::catalog::Error::ParquetFileDoesNotExist { path }) Err(parquet_file::catalog::Error::ParquetFileDoesNotExist { path: path.clone() })
} }
} }
} }