diff --git a/parquet_file/src/catalog.rs b/parquet_file/src/catalog.rs index 1405bfaded..d463f874f5 100644 --- a/parquet_file/src/catalog.rs +++ b/parquet_file/src/catalog.rs @@ -429,6 +429,21 @@ where .map(|tkey| tkey.revision_counter) .expect("catalog should have at least an empty transaction") } + + /// Object store used by this catalog. + pub fn object_store(&self) -> Arc { + Arc::clone(&self.object_store) + } + + /// Server ID used by this catalog. + pub fn server_id(&self) -> ServerId { + self.server_id + } + + /// Database name used by this catalog. + pub fn db_name(&self) -> &str { + &self.db_name + } } impl Debug for PreservedCatalog @@ -896,6 +911,11 @@ where Ok(()) } + /// Abort transaction w/o commit. + pub fn abort(mut self) { + self.transaction = None; + } + /// Add a new parquet file to the catalog. /// /// If a file with the same path already exists an error will be returned. @@ -1055,15 +1075,12 @@ pub mod test_helpers { } #[cfg(test)] -pub mod tests { +mod tests { use std::{num::NonZeroU32, ops::Deref}; use crate::{ - metadata::{ - read_parquet_metadata_from_file, read_schema_from_parquet_metadata, - read_statistics_from_parquet_metadata, - }, - utils::{load_parquet_from_store, make_chunk, make_object_store}, + metadata::{read_schema_from_parquet_metadata, read_statistics_from_parquet_metadata}, + test_utils::{make_metadata, make_object_store}, }; use object_store::parsed_path; @@ -1681,8 +1698,8 @@ pub mod tests { .unwrap(); // get some test metadata - let metadata1 = make_metadata(object_store, "foo").await; - let metadata2 = make_metadata(object_store, "bar").await; + let (_, metadata1) = make_metadata(object_store, "foo", 1).await; + let (_, metadata2) = make_metadata(object_store, "bar", 1).await; // track all the intermediate results let mut trace = TestTrace::new(); @@ -1992,16 +2009,4 @@ pub mod tests { &get_catalog_parquet_files(trace.states.last().unwrap()), ); } - - /// Create test metadata. See [`make_chunk`] for details. - async fn make_metadata( - object_store: &Arc, - column_prefix: &str, - ) -> ParquetMetaData { - let chunk = make_chunk(Arc::clone(object_store), column_prefix).await; - let (_, parquet_data) = load_parquet_from_store(&chunk, Arc::clone(object_store)) - .await - .unwrap(); - read_parquet_metadata_from_file(parquet_data).unwrap() - } } diff --git a/parquet_file/src/cleanup.rs b/parquet_file/src/cleanup.rs new file mode 100644 index 0000000000..3655189148 --- /dev/null +++ b/parquet_file/src/cleanup.rs @@ -0,0 +1,298 @@ +//! Methods to cleanup the object store. + +use std::{ + collections::HashSet, + sync::{Arc, Mutex}, +}; + +use data_types::server_id::ServerId; +use futures::TryStreamExt; +use object_store::{ + path::{parsed::DirsAndFileName, ObjectStorePath}, + ObjectStore, ObjectStoreApi, +}; +use observability_deps::tracing::info; +use snafu::{ResultExt, Snafu}; + +use crate::{ + catalog::{CatalogParquetInfo, CatalogState, PreservedCatalog}, + storage::data_location, +}; +#[derive(Debug, Snafu)] +pub enum Error { + #[snafu(display("Error from read operation while cleaning object store: {}", source))] + ReadError { + source: ::Error, + }, + + #[snafu(display("Error from write operation while cleaning object store: {}", source))] + WriteError { + source: ::Error, + }, + + #[snafu(display("Error from catalog loading while cleaning object store: {}", source))] + CatalogLoadError { source: crate::catalog::Error }, +} + +pub type Result = std::result::Result; + +/// Delete all unreferenced parquet files. +/// +/// This will hold the transaction lock while the list of files is being gathered. +pub async fn cleanup_unreferenced_parquet_files(catalog: &PreservedCatalog) -> Result<()> +where + S: CatalogState, +{ + // Create a transaction to prevent parallel modifications of the catalog. This avoids that we delete files there + // that are about to get added to the catalog. + let transaction = catalog.open_transaction().await; + + let store = catalog.object_store(); + let server_id = catalog.server_id(); + let db_name = catalog.db_name(); + let all_known = { + // replay catalog transactions to track ALL (even dropped) files that are referenced + let catalog = PreservedCatalog::::load( + Arc::clone(&store), + server_id, + db_name.to_string(), + (), + ) + .await + .context(CatalogLoadError)? + .expect("catalog gone while reading it?"); + catalog + .state() + .files + .lock() + .expect("lock poissened?") + .clone() + }; + + let prefix = data_location(&store, server_id, db_name); + + // gather a list of "files to remove" eagerly so we do not block transactions on the catalog for too long + let to_remove = store + .list(Some(&prefix)) + .await + .context(ReadError)? + .map_ok(|paths| { + paths + .into_iter() + .filter(|path| { + let path_parsed: DirsAndFileName = path.clone().into(); + + // only delete if all of the following conditions are met: + // - filename ends with `.parquet` + // - file is not tracked by the catalog + path_parsed + .file_name + .as_ref() + .map(|part| part.encoded().ends_with(".parquet")) + .unwrap_or(false) + && !all_known.contains(&path_parsed) + }) + .collect::>() + }) + .try_concat() + .await + .context(ReadError)?; + + // abort transaction cleanly to avoid warnings about uncommited transactions + transaction.abort(); + + // now that the transaction lock is dropped, perform the actual (and potentially slow) delete operation + let n_files = to_remove.len(); + info!("Found {} files to delete, start deletion.", n_files); + + for path in to_remove { + info!("Delete file: {}", path.display()); + store.delete(&path).await.context(WriteError)?; + } + + info!("Finished deletion, removed {} files.", n_files); + + Ok(()) +} + +/// Catalog state that traces all used parquet files. +struct TracerCatalogState { + files: Mutex>, +} + +impl CatalogState for TracerCatalogState { + type EmptyInput = (); + + fn new_empty(_data: Self::EmptyInput) -> Self { + Self { + files: Default::default(), + } + } + + fn clone_or_keep(origin: &Arc) -> Arc { + // no copy + Arc::clone(origin) + } + + fn add( + &self, + _object_store: Arc, + _server_id: ServerId, + _db_name: &str, + info: CatalogParquetInfo, + ) -> crate::catalog::Result<()> { + self.files + .lock() + .expect("lock poissened?") + .insert(info.path); + Ok(()) + } + + fn remove(&self, _path: DirsAndFileName) -> crate::catalog::Result<()> { + // Do NOT remove the file since we still need it for time travel + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use std::{collections::HashSet, num::NonZeroU32, sync::Arc}; + + use bytes::Bytes; + use data_types::server_id::ServerId; + use object_store::path::{parsed::DirsAndFileName, ObjectStorePath, Path}; + + use super::*; + use crate::{ + catalog::test_helpers::TestCatalogState, + test_utils::{make_metadata, make_object_store}, + }; + + #[tokio::test] + async fn test_cleanup_rules() { + let object_store = make_object_store(); + let server_id = make_server_id(); + let db_name = "db1"; + + let catalog = PreservedCatalog::::new_empty( + Arc::clone(&object_store), + server_id, + db_name, + (), + ) + .await + .unwrap(); + + // create some data + let mut paths_keep = vec![]; + let mut paths_delete = vec![]; + { + let mut transaction = catalog.open_transaction().await; + + // an ordinary tracked parquet file => keep + let (path, md) = make_metadata(&object_store, "foo", 1).await; + transaction.add_parquet(&path.clone().into(), &md).unwrap(); + paths_keep.push(path.display()); + + // another ordinary tracked parquet file that was added and removed => keep (for time travel) + let (path, md) = make_metadata(&object_store, "foo", 2).await; + transaction.add_parquet(&path.clone().into(), &md).unwrap(); + transaction.remove_parquet(&path.clone().into()).unwrap(); + paths_keep.push(path.display()); + + // not a parquet file => keep + let mut path: DirsAndFileName = path.into(); + path.file_name = Some("foo.txt".into()); + let path = object_store.path_from_dirs_and_filename(path); + create_empty_file(&object_store, &path).await; + paths_keep.push(path.display()); + + // an untracked parquet file => delete + let (path, _md) = make_metadata(&object_store, "foo", 3).await; + paths_delete.push(path.display()); + + transaction.commit().await.unwrap(); + } + + // run clean-up + cleanup_unreferenced_parquet_files(&catalog).await.unwrap(); + + // list all files + let all_files = list_all_files(&object_store).await; + for p in paths_keep { + assert!(dbg!(&all_files).contains(dbg!(&p))); + } + for p in paths_delete { + assert!(!dbg!(&all_files).contains(dbg!(&p))); + } + } + + #[tokio::test] + async fn test_cleanup_with_parallel_transaction() { + let object_store = make_object_store(); + let server_id = make_server_id(); + let db_name = "db1"; + + let catalog = PreservedCatalog::::new_empty( + Arc::clone(&object_store), + server_id, + db_name, + (), + ) + .await + .unwrap(); + + // try multiple times to provoke a conflict + for i in 0..100 { + let (path, _) = tokio::join!( + async { + let mut transaction = catalog.open_transaction().await; + + let (path, md) = make_metadata(&object_store, "foo", i).await; + transaction.add_parquet(&path.clone().into(), &md).unwrap(); + + transaction.commit().await.unwrap(); + + path.display() + }, + async { + cleanup_unreferenced_parquet_files(&catalog).await.unwrap(); + }, + ); + + let all_files = list_all_files(&object_store).await; + assert!(all_files.contains(&path)); + } + } + + fn make_server_id() -> ServerId { + ServerId::new(NonZeroU32::new(1).unwrap()) + } + + async fn create_empty_file(object_store: &ObjectStore, path: &Path) { + let data = Bytes::default(); + let len = data.len(); + + object_store + .put( + &path, + futures::stream::once(async move { Ok(data) }), + Some(len), + ) + .await + .unwrap(); + } + + async fn list_all_files(object_store: &ObjectStore) -> HashSet { + object_store + .list(None) + .await + .unwrap() + .try_concat() + .await + .unwrap() + .iter() + .map(|p| p.display()) + .collect() + } +} diff --git a/parquet_file/src/lib.rs b/parquet_file/src/lib.rs index 032aedbfad..0e03ce2cbc 100644 --- a/parquet_file/src/lib.rs +++ b/parquet_file/src/lib.rs @@ -9,9 +9,10 @@ pub mod catalog; pub mod chunk; +pub mod cleanup; pub mod metadata; pub mod storage; pub mod table; -pub mod utils; +pub mod test_utils; mod storage_testing; diff --git a/parquet_file/src/metadata.rs b/parquet_file/src/metadata.rs index 7e6a4928a9..cfdd418e2a 100644 --- a/parquet_file/src/metadata.rs +++ b/parquet_file/src/metadata.rs @@ -497,7 +497,7 @@ mod tests { use internal_types::{schema::TIME_COLUMN_NAME, selection::Selection}; - use crate::utils::{ + use crate::test_utils::{ load_parquet_from_store, make_chunk, make_chunk_no_row_group, make_object_store, }; @@ -505,7 +505,7 @@ mod tests { async fn test_restore_from_file() { // setup: preserve chunk to object store let store = make_object_store(); - let chunk = make_chunk(Arc::clone(&store), "foo").await; + let chunk = make_chunk(Arc::clone(&store), "foo", 1).await; let (table, parquet_data) = load_parquet_from_store(&chunk, store).await.unwrap(); let parquet_metadata = read_parquet_metadata_from_file(parquet_data).unwrap(); @@ -526,7 +526,7 @@ mod tests { async fn test_restore_from_thrift() { // setup: write chunk to object store and only keep thrift-encoded metadata let store = make_object_store(); - let chunk = make_chunk(Arc::clone(&store), "foo").await; + let chunk = make_chunk(Arc::clone(&store), "foo", 1).await; let (table, parquet_data) = load_parquet_from_store(&chunk, store).await.unwrap(); let parquet_metadata = read_parquet_metadata_from_file(parquet_data).unwrap(); let data = parquet_metadata_to_thrift(&parquet_metadata).unwrap(); @@ -549,7 +549,7 @@ mod tests { async fn test_restore_from_file_no_row_group() { // setup: preserve chunk to object store let store = make_object_store(); - let chunk = make_chunk_no_row_group(Arc::clone(&store), "foo").await; + let chunk = make_chunk_no_row_group(Arc::clone(&store), "foo", 1).await; let (table, parquet_data) = load_parquet_from_store(&chunk, store).await.unwrap(); let parquet_metadata = read_parquet_metadata_from_file(parquet_data).unwrap(); @@ -570,7 +570,7 @@ mod tests { async fn test_restore_from_thrift_no_row_group() { // setup: write chunk to object store and only keep thrift-encoded metadata let store = make_object_store(); - let chunk = make_chunk_no_row_group(Arc::clone(&store), "foo").await; + let chunk = make_chunk_no_row_group(Arc::clone(&store), "foo", 1).await; let (table, parquet_data) = load_parquet_from_store(&chunk, store).await.unwrap(); let parquet_metadata = read_parquet_metadata_from_file(parquet_data).unwrap(); let data = parquet_metadata_to_thrift(&parquet_metadata).unwrap(); @@ -592,7 +592,7 @@ mod tests { #[tokio::test] async fn test_make_chunk() { let store = make_object_store(); - let chunk = make_chunk(Arc::clone(&store), "foo").await; + let chunk = make_chunk(Arc::clone(&store), "foo", 1).await; let (_, parquet_data) = load_parquet_from_store(&chunk, store).await.unwrap(); let parquet_metadata = read_parquet_metadata_from_file(parquet_data).unwrap(); @@ -630,7 +630,7 @@ mod tests { #[tokio::test] async fn test_make_chunk_no_row_group() { let store = make_object_store(); - let chunk = make_chunk_no_row_group(Arc::clone(&store), "foo").await; + let chunk = make_chunk_no_row_group(Arc::clone(&store), "foo", 1).await; let (_, parquet_data) = load_parquet_from_store(&chunk, store).await.unwrap(); let parquet_metadata = read_parquet_metadata_from_file(parquet_data).unwrap(); diff --git a/parquet_file/src/storage.rs b/parquet_file/src/storage.rs index 3da8ac17b0..484190c203 100644 --- a/parquet_file/src/storage.rs +++ b/parquet_file/src/storage.rs @@ -162,13 +162,10 @@ impl Storage { table_name: String, ) -> object_store::path::Path { // Full path of the file in object store - // //data/////data///
.parquet - let mut path = self.object_store.new_path(); - path.push_dir(self.server_id.to_string()); - path.push_dir(self.db_name.clone()); - path.push_dir("data"); + let mut path = data_location(&self.object_store, self.server_id, &self.db_name); path.push_dir(partition_key); path.push_dir(chunk_id.to_string()); let file_name = format!("{}.parquet", table_name); @@ -491,12 +488,31 @@ impl TryClone for MemWriter { } } +/// Location where parquet data goes to. +/// +/// Schema currently is: +/// +/// ```text +/// //data +/// ``` +pub(crate) fn data_location( + object_store: &ObjectStore, + server_id: ServerId, + db_name: &str, +) -> Path { + let mut path = object_store.new_path(); + path.push_dir(server_id.to_string()); + path.push_dir(db_name.to_string()); + path.push_dir("data"); + path +} + #[cfg(test)] mod tests { use std::num::NonZeroU32; use super::*; - use crate::utils::{make_object_store, make_record_batch}; + use crate::test_utils::{make_object_store, make_record_batch}; use datafusion_util::MemoryStream; use object_store::parsed_path; use uuid::Uuid; diff --git a/parquet_file/src/storage_testing.rs b/parquet_file/src/storage_testing.rs index b61c40aaf4..d5a19ea1b2 100644 --- a/parquet_file/src/storage_testing.rs +++ b/parquet_file/src/storage_testing.rs @@ -14,7 +14,10 @@ mod tests { read_parquet_metadata_from_file, read_schema_from_parquet_metadata, read_statistics_from_parquet_metadata, }, - utils::*, + test_utils::{ + load_parquet_from_store, make_chunk_given_record_batch, make_object_store, + make_record_batch, read_data_from_parquet_data, + }, }; #[tokio::test] @@ -22,6 +25,7 @@ mod tests { //////////////////// // Create test data which is also the expected data let table = "table1"; + let chunk_id = 1; let (record_batches, schema, column_summaries, num_rows) = make_record_batch("foo"); let mut table_summary = TableSummary::new(table); table_summary.columns = column_summaries.clone(); @@ -41,6 +45,7 @@ mod tests { schema.clone(), table, column_summaries.clone(), + chunk_id, ) .await; diff --git a/parquet_file/src/utils.rs b/parquet_file/src/test_utils.rs similarity index 93% rename from parquet_file/src/utils.rs rename to parquet_file/src/test_utils.rs index 3fb5f040e4..fa94cece0e 100644 --- a/parquet_file/src/utils.rs +++ b/parquet_file/src/test_utils.rs @@ -23,11 +23,17 @@ use internal_types::{ use object_store::{memory::InMemory, path::Path, ObjectStore, ObjectStoreApi}; use parquet::{ arrow::{ArrowReader, ParquetFileArrowReader}, - file::serialized_reader::{SerializedFileReader, SliceableCursor}, + file::{ + metadata::ParquetMetaData, + serialized_reader::{SerializedFileReader, SliceableCursor}, + }, }; use uuid::Uuid; -use crate::{chunk::ChunkMetrics, metadata::IoxMetadata}; +use crate::{ + chunk::ChunkMetrics, + metadata::{read_parquet_metadata_from_file, IoxMetadata}, +}; use crate::{ chunk::{self, Chunk}, storage::Storage, @@ -84,46 +90,45 @@ pub async fn load_parquet_from_store_for_path( Ok(parquet_data) } +/// Same as [`make_chunk`] but parquet file does not contain any row group. +pub async fn make_chunk(store: Arc, column_prefix: &str, chunk_id: u32) -> Chunk { + let (record_batches, schema, column_summaries, _num_rows) = make_record_batch(column_prefix); + make_chunk_given_record_batch( + store, + record_batches, + schema, + "table1", + column_summaries, + chunk_id, + ) + .await +} + +/// Same as [`make_chunk`] but parquet file does not contain any row group. +pub async fn make_chunk_no_row_group( + store: Arc, + column_prefix: &str, + chunk_id: u32, +) -> Chunk { + let (_, schema, column_summaries, _num_rows) = make_record_batch(column_prefix); + make_chunk_given_record_batch(store, vec![], schema, "table1", column_summaries, chunk_id).await +} + /// Create a test chunk by writing data to object store. /// -/// See [`make_record_batch`] for the data content. +/// TODO: This code creates a chunk that isn't hooked up with metrics pub async fn make_chunk_given_record_batch( store: Arc, record_batches: Vec, schema: Schema, table: &str, column_summaries: Vec, -) -> Chunk { - make_chunk_common(store, record_batches, schema, table, column_summaries).await -} - -/// Same as [`make_chunk`] but parquet file does not contain any row group. -pub async fn make_chunk(store: Arc, column_prefix: &str) -> Chunk { - let (record_batches, schema, column_summaries, _num_rows) = make_record_batch(column_prefix); - make_chunk_common(store, record_batches, schema, "table1", column_summaries).await -} - -/// Same as [`make_chunk`] but parquet file does not contain any row group. -pub async fn make_chunk_no_row_group(store: Arc, column_prefix: &str) -> Chunk { - let (_, schema, column_summaries, _num_rows) = make_record_batch(column_prefix); - make_chunk_common(store, vec![], schema, "table1", column_summaries).await -} - -/// Common code for all [`make_chunk`] and [`make_chunk_no_row_group`]. -/// -/// TODO: This code creates a chunk that isn't hooked up with metrics -async fn make_chunk_common( - store: Arc, - record_batches: Vec, - schema: Schema, - table: &str, - column_summaries: Vec, + chunk_id: u32, ) -> Chunk { let server_id = ServerId::new(NonZeroU32::new(1).unwrap()); let db_name = "db1"; let part_key = "part1"; let table_name = table; - let chunk_id = 1; let storage = Storage::new(Arc::clone(&store), server_id, db_name.to_string()); @@ -556,3 +561,21 @@ pub fn read_data_from_parquet_data(schema: SchemaRef, parquet_data: Vec) -> record_batches } + +/// Create test metadata by creating a parquet file and reading it back into memory. +/// +/// See [`make_chunk`] for details. +pub async fn make_metadata( + object_store: &Arc, + column_prefix: &str, + chunk_id: u32, +) -> (Path, ParquetMetaData) { + let chunk = make_chunk(Arc::clone(object_store), column_prefix, chunk_id).await; + let (_, parquet_data) = load_parquet_from_store(&chunk, Arc::clone(object_store)) + .await + .unwrap(); + ( + chunk.table_path(), + read_parquet_metadata_from_file(parquet_data).unwrap(), + ) +} diff --git a/server/src/config.rs b/server/src/config.rs index 9d232246fc..d0abef68ff 100644 --- a/server/src/config.rs +++ b/server/src/config.rs @@ -395,7 +395,14 @@ mod test { config .db(&name) .expect("expected database") - .worker_iterations() + .worker_iterations_lifecycle() + > 0 + ); + assert!( + config + .db(&name) + .expect("expected database") + .worker_iterations_cleanup() > 0 ); diff --git a/server/src/db.rs b/server/src/db.rs index 59d311c7e0..de727fa97a 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -33,6 +33,7 @@ use parking_lot::{Mutex, RwLock}; use parquet_file::{ catalog::{CatalogParquetInfo, CatalogState, PreservedCatalog}, chunk::{Chunk as ParquetChunk, ChunkMetrics as ParquetChunkMetrics}, + cleanup::cleanup_unreferenced_parquet_files, metadata::{ read_schema_from_parquet_metadata, read_statistics_from_parquet_metadata, IoxMetadata, }, @@ -49,6 +50,7 @@ use std::{ atomic::{AtomicUsize, Ordering}, Arc, }, + time::Duration, }; use system_tables::{SystemSchemaProvider, SYSTEM_SCHEMA}; use tracker::{TaskTracker, TrackedFutureExt}; @@ -305,8 +307,11 @@ pub struct Db { /// Value is nanoseconds since the Unix Epoch. process_clock: process_clock::ProcessClock, - /// Number of iterations of the worker loop for this Db - worker_iterations: AtomicUsize, + /// Number of iterations of the worker lifecycle loop for this Db + worker_iterations_lifecycle: AtomicUsize, + + /// Number of iterations of the worker cleanup loop for this Db + worker_iterations_cleanup: AtomicUsize, /// Metric labels metric_labels: Vec, @@ -429,7 +434,8 @@ impl Db { metrics_registry, system_tables, process_clock, - worker_iterations: AtomicUsize::new(0), + worker_iterations_lifecycle: AtomicUsize::new(0), + worker_iterations_cleanup: AtomicUsize::new(0), metric_labels, } } @@ -900,9 +906,14 @@ impl Db { None } - /// Returns the number of iterations of the background worker loop - pub fn worker_iterations(&self) -> usize { - self.worker_iterations.load(Ordering::Relaxed) + /// Returns the number of iterations of the background worker lifecycle loop + pub fn worker_iterations_lifecycle(&self) -> usize { + self.worker_iterations_lifecycle.load(Ordering::Relaxed) + } + + /// Returns the number of iterations of the background worker lifecycle loop + pub fn worker_iterations_cleanup(&self) -> usize { + self.worker_iterations_cleanup.load(Ordering::Relaxed) } /// Background worker function @@ -912,15 +923,37 @@ impl Db { ) { info!("started background worker"); - let mut lifecycle_manager = LifecycleManager::new(Arc::clone(&self)); + tokio::join!( + // lifecycle manager loop + async { + let mut lifecycle_manager = LifecycleManager::new(Arc::clone(&self)); - while !shutdown.is_cancelled() { - self.worker_iterations.fetch_add(1, Ordering::Relaxed); - tokio::select! { - _ = lifecycle_manager.check_for_work() => {}, - _ = shutdown.cancelled() => break - } - } + while !shutdown.is_cancelled() { + self.worker_iterations_lifecycle + .fetch_add(1, Ordering::Relaxed); + tokio::select! { + _ = lifecycle_manager.check_for_work() => {}, + _ = shutdown.cancelled() => break, + } + } + }, + // object store cleanup loop + async { + while !shutdown.is_cancelled() { + self.worker_iterations_cleanup + .fetch_add(1, Ordering::Relaxed); + tokio::select! { + _ = async { + if let Err(e) = cleanup_unreferenced_parquet_files(&self.catalog).await { + error!("error in background cleanup task: {:?}", e); + } + tokio::time::sleep(Duration::from_secs(500)).await; + } => {}, + _ = shutdown.cancelled() => break, + } + } + }, + ); info!("finished background worker"); } @@ -1261,6 +1294,7 @@ mod tests { use ::test_helpers::assert_contains; use arrow::record_batch::RecordBatch; use arrow_util::{assert_batches_eq, assert_batches_sorted_eq}; + use bytes::Bytes; use chrono::Utc; use data_types::{ chunk_metadata::ChunkStorage, @@ -1271,15 +1305,23 @@ mod tests { use futures::{stream, StreamExt, TryStreamExt}; use object_store::{ memory::InMemory, - path::{ObjectStorePath, Path}, + path::{parts::PathPart, ObjectStorePath, Path}, ObjectStore, ObjectStoreApi, }; use parquet_file::{ metadata::{read_parquet_metadata_from_file, read_schema_from_parquet_metadata}, - utils::{load_parquet_from_store_for_path, read_data_from_parquet_data}, + test_utils::{load_parquet_from_store_for_path, read_data_from_parquet_data}, }; use query::{frontend::sql::SqlQueryPlanner, PartitionChunk}; - use std::{convert::TryFrom, iter::Iterator, num::NonZeroUsize, str}; + use std::{ + collections::HashSet, + convert::TryFrom, + iter::Iterator, + num::NonZeroUsize, + str, + time::{Duration, Instant}, + }; + use tokio_util::sync::CancellationToken; type Error = Box; type Result = std::result::Result; @@ -2844,43 +2886,20 @@ mod tests { // ==================== do: write data to parquet ==================== // create two chunks within the same table (to better test "new chunk ID" and "new table" during transaction // replay) - let mut chunk_ids = vec![]; - let partition_key = "1970-01-01T00"; + let mut chunks = vec![]; for _ in 0..2 { - // Write some line protocols in Mutable buffer of the DB - write_lp(db.as_ref(), "cpu bar=1 10"); - - //Now mark the MB chunk close - let chunk_id = { - let mb_chunk = db - .rollover_partition("1970-01-01T00", "cpu") - .await - .unwrap() - .unwrap(); - mb_chunk.id() - }; - // Move that MB chunk to RB chunk and drop it from MB - db.load_chunk_to_read_buffer(partition_key, "cpu", chunk_id) - .await - .unwrap(); - - // Write the RB chunk to Object Store but keep it in RB - db.write_chunk_to_object_store(partition_key, "cpu", chunk_id) - .await - .unwrap(); - - chunk_ids.push(chunk_id); + chunks.push(create_parquet_chunk(db.as_ref()).await); } // ==================== check: catalog state ==================== // the preserved catalog should now register a single file let mut paths_expected = vec![]; - for chunk_id in &chunk_ids { + for (partition_key, table_name, chunk_id) in &chunks { let chunk = { let partition = db.catalog.state().valid_partition(&partition_key).unwrap(); let partition = partition.read(); - partition.chunk("cpu", *chunk_id).unwrap() + partition.chunk(table_name, *chunk_id).unwrap() }; let chunk = chunk.read(); if let ChunkState::WrittenToObjectStore(_, chunk) = chunk.state() { @@ -2924,12 +2943,12 @@ mod tests { // ==================== check: DB state ==================== // Re-created DB should have an "object store only"-chunk - for chunk_id in &chunk_ids { + for (partition_key, table_name, chunk_id) in &chunks { let chunk = { let partition = db.catalog.state().valid_partition(&partition_key).unwrap(); let partition = partition.read(); - partition.chunk("cpu", *chunk_id).unwrap() + partition.chunk(table_name, *chunk_id).unwrap() }; let chunk = chunk.read(); assert!(matches!(chunk.state(), ChunkState::ObjectStoreOnly(_))); @@ -2938,4 +2957,153 @@ mod tests { // ==================== check: DB still writable ==================== write_lp(db.as_ref(), "cpu bar=1 10"); } + + #[tokio::test] + async fn object_store_cleanup() { + // Test that stale parquet files are removed from object store + + // ==================== setup ==================== + let object_store = Arc::new(ObjectStore::new_in_memory(InMemory::new())); + + // ==================== do: create DB ==================== + // Create a DB given a server id, an object store and a db name + let test_db = TestDb::builder() + .object_store(Arc::clone(&object_store)) + .build() + .await; + let db = Arc::new(test_db.db); + + // ==================== do: write data to parquet ==================== + // create the following chunks: + // 0: ReadBuffer + Parquet + // 1: only Parquet + // 2: dropped (not in current catalog but parquet file still present for time travel) + let mut paths_keep = vec![]; + for i in 0..3i8 { + let (partition_key, table_name, chunk_id) = create_parquet_chunk(db.as_ref()).await; + let chunk = { + let partition = db.catalog.state().valid_partition(&partition_key).unwrap(); + let partition = partition.read(); + + partition.chunk(table_name.clone(), chunk_id).unwrap() + }; + let chunk = chunk.read(); + if let ChunkState::WrittenToObjectStore(_, chunk) = chunk.state() { + paths_keep.push(chunk.table_path()); + } else { + panic!("Wrong chunk state."); + } + + // drop lock + drop(chunk); + + if i == 1 { + db.unload_read_buffer(&partition_key, &table_name, chunk_id) + .await + .unwrap(); + } + if i == 2 { + db.drop_chunk(&partition_key, &table_name, chunk_id) + .unwrap(); + } + } + + // ==================== do: create garbage ==================== + let mut path: DirsAndFileName = paths_keep[0].clone().into(); + path.file_name = Some(PathPart::from( + format!("prefix_{}", path.file_name.unwrap().encoded()).as_ref(), + )); + let path_delete = object_store.path_from_dirs_and_filename(path); + create_empty_file(&object_store, &path_delete).await; + let path_delete = path_delete.display(); + + // ==================== check: all files are there ==================== + let all_files = get_object_store_files(&object_store).await; + for path in &paths_keep { + assert!(all_files.contains(&path.display())); + } + + // ==================== do: start background task loop ==================== + let shutdown: CancellationToken = Default::default(); + let shutdown_captured = shutdown.clone(); + let db_captured = Arc::clone(&db); + let join_handle = + tokio::spawn(async move { db_captured.background_worker(shutdown_captured).await }); + + // ==================== check: after a while the dropped file should be gone ==================== + let t_0 = Instant::now(); + loop { + let all_files = get_object_store_files(&object_store).await; + if !all_files.contains(&path_delete) { + break; + } + assert!(t_0.elapsed() < Duration::from_secs(10)); + tokio::time::sleep(Duration::from_millis(100)).await; + } + + // ==================== do: stop background task loop ==================== + shutdown.cancel(); + join_handle.await.unwrap(); + + // ==================== check: some files are there ==================== + let all_files = get_object_store_files(&object_store).await; + assert!(!all_files.contains(&path_delete)); + for path in &paths_keep { + assert!(all_files.contains(&path.display())); + } + } + + async fn create_parquet_chunk(db: &Db) -> (String, String, u32) { + write_lp(db, "cpu bar=1 10"); + let partition_key = "1970-01-01T00"; + let table_name = "cpu"; + + //Now mark the MB chunk close + let chunk_id = { + let mb_chunk = db + .rollover_partition(partition_key, table_name) + .await + .unwrap() + .unwrap(); + mb_chunk.id() + }; + // Move that MB chunk to RB chunk and drop it from MB + db.load_chunk_to_read_buffer(partition_key, table_name, chunk_id) + .await + .unwrap(); + + // Write the RB chunk to Object Store but keep it in RB + db.write_chunk_to_object_store(partition_key, table_name, chunk_id) + .await + .unwrap(); + + (partition_key.to_string(), table_name.to_string(), chunk_id) + } + + async fn get_object_store_files(object_store: &ObjectStore) -> HashSet { + object_store + .list(None) + .await + .unwrap() + .try_concat() + .await + .unwrap() + .iter() + .map(|p| p.display()) + .collect() + } + + async fn create_empty_file(object_store: &ObjectStore, path: &Path) { + let data = Bytes::default(); + let len = data.len(); + + object_store + .put( + &path, + futures::stream::once(async move { Ok(data) }), + Some(len), + ) + .await + .unwrap(); + } }