From 38183928c87d638b135d1f9baaf17ecaf374e8c6 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Tue, 25 May 2021 13:45:12 +0200 Subject: [PATCH 01/15] refactor: extract path generator for data location --- parquet_file/src/storage.rs | 26 +++++++++++++++++++++----- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/parquet_file/src/storage.rs b/parquet_file/src/storage.rs index 3da8ac17b0..e18acdaf36 100644 --- a/parquet_file/src/storage.rs +++ b/parquet_file/src/storage.rs @@ -162,13 +162,10 @@ impl Storage { table_name: String, ) -> object_store::path::Path { // Full path of the file in object store - // //data/////data///
.parquet - let mut path = self.object_store.new_path(); - path.push_dir(self.server_id.to_string()); - path.push_dir(self.db_name.clone()); - path.push_dir("data"); + let mut path = data_location(&self.object_store, self.server_id, &self.db_name); path.push_dir(partition_key); path.push_dir(chunk_id.to_string()); let file_name = format!("{}.parquet", table_name); @@ -491,6 +488,25 @@ impl TryClone for MemWriter { } } +/// Location where parquet data goes to. +/// +/// Schema currently is: +/// +/// ```text +/// //data +/// ``` +pub(crate) fn data_location( + object_store: &ObjectStore, + server_id: ServerId, + db_name: &str, +) -> Path { + let mut path = object_store.new_path(); + path.push_dir(server_id.to_string()); + path.push_dir(db_name.to_string()); + path.push_dir("data"); + path +} + #[cfg(test)] mod tests { use std::num::NonZeroU32; From 9daa4d00d6f4e73d3241884b5140db9eac42181f Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Tue, 25 May 2021 13:51:27 +0200 Subject: [PATCH 02/15] test: re-organize `parquet_file` test utils a bit --- parquet_file/src/catalog.rs | 25 ++------- parquet_file/src/metadata.rs | 12 ++--- parquet_file/src/storage_testing.rs | 2 + parquet_file/src/utils.rs | 81 ++++++++++++++++++----------- 4 files changed, 65 insertions(+), 55 deletions(-) diff --git a/parquet_file/src/catalog.rs b/parquet_file/src/catalog.rs index 1405bfaded..05ff6d999e 100644 --- a/parquet_file/src/catalog.rs +++ b/parquet_file/src/catalog.rs @@ -1055,15 +1055,12 @@ pub mod test_helpers { } #[cfg(test)] -pub mod tests { +mod tests { use std::{num::NonZeroU32, ops::Deref}; use crate::{ - metadata::{ - read_parquet_metadata_from_file, read_schema_from_parquet_metadata, - read_statistics_from_parquet_metadata, - }, - utils::{load_parquet_from_store, make_chunk, make_object_store}, + metadata::{read_schema_from_parquet_metadata, read_statistics_from_parquet_metadata}, + utils::{make_metadata, make_object_store}, }; use object_store::parsed_path; @@ -1681,8 +1678,8 @@ pub mod tests { .unwrap(); // get some test metadata - let metadata1 = make_metadata(object_store, "foo").await; - let metadata2 = make_metadata(object_store, "bar").await; + let (_, metadata1) = make_metadata(object_store, "foo", 1).await; + let (_, metadata2) = make_metadata(object_store, "bar", 1).await; // track all the intermediate results let mut trace = TestTrace::new(); @@ -1992,16 +1989,4 @@ pub mod tests { &get_catalog_parquet_files(trace.states.last().unwrap()), ); } - - /// Create test metadata. See [`make_chunk`] for details. - async fn make_metadata( - object_store: &Arc, - column_prefix: &str, - ) -> ParquetMetaData { - let chunk = make_chunk(Arc::clone(object_store), column_prefix).await; - let (_, parquet_data) = load_parquet_from_store(&chunk, Arc::clone(object_store)) - .await - .unwrap(); - read_parquet_metadata_from_file(parquet_data).unwrap() - } } diff --git a/parquet_file/src/metadata.rs b/parquet_file/src/metadata.rs index 7e6a4928a9..6cd32f999b 100644 --- a/parquet_file/src/metadata.rs +++ b/parquet_file/src/metadata.rs @@ -505,7 +505,7 @@ mod tests { async fn test_restore_from_file() { // setup: preserve chunk to object store let store = make_object_store(); - let chunk = make_chunk(Arc::clone(&store), "foo").await; + let chunk = make_chunk(Arc::clone(&store), "foo", 1).await; let (table, parquet_data) = load_parquet_from_store(&chunk, store).await.unwrap(); let parquet_metadata = read_parquet_metadata_from_file(parquet_data).unwrap(); @@ -526,7 +526,7 @@ mod tests { async fn test_restore_from_thrift() { // setup: write chunk to object store and only keep thrift-encoded metadata let store = make_object_store(); - let chunk = make_chunk(Arc::clone(&store), "foo").await; + let chunk = make_chunk(Arc::clone(&store), "foo", 1).await; let (table, parquet_data) = load_parquet_from_store(&chunk, store).await.unwrap(); let parquet_metadata = read_parquet_metadata_from_file(parquet_data).unwrap(); let data = parquet_metadata_to_thrift(&parquet_metadata).unwrap(); @@ -549,7 +549,7 @@ mod tests { async fn test_restore_from_file_no_row_group() { // setup: preserve chunk to object store let store = make_object_store(); - let chunk = make_chunk_no_row_group(Arc::clone(&store), "foo").await; + let chunk = make_chunk_no_row_group(Arc::clone(&store), "foo", 1).await; let (table, parquet_data) = load_parquet_from_store(&chunk, store).await.unwrap(); let parquet_metadata = read_parquet_metadata_from_file(parquet_data).unwrap(); @@ -570,7 +570,7 @@ mod tests { async fn test_restore_from_thrift_no_row_group() { // setup: write chunk to object store and only keep thrift-encoded metadata let store = make_object_store(); - let chunk = make_chunk_no_row_group(Arc::clone(&store), "foo").await; + let chunk = make_chunk_no_row_group(Arc::clone(&store), "foo", 1).await; let (table, parquet_data) = load_parquet_from_store(&chunk, store).await.unwrap(); let parquet_metadata = read_parquet_metadata_from_file(parquet_data).unwrap(); let data = parquet_metadata_to_thrift(&parquet_metadata).unwrap(); @@ -592,7 +592,7 @@ mod tests { #[tokio::test] async fn test_make_chunk() { let store = make_object_store(); - let chunk = make_chunk(Arc::clone(&store), "foo").await; + let chunk = make_chunk(Arc::clone(&store), "foo", 1).await; let (_, parquet_data) = load_parquet_from_store(&chunk, store).await.unwrap(); let parquet_metadata = read_parquet_metadata_from_file(parquet_data).unwrap(); @@ -630,7 +630,7 @@ mod tests { #[tokio::test] async fn test_make_chunk_no_row_group() { let store = make_object_store(); - let chunk = make_chunk_no_row_group(Arc::clone(&store), "foo").await; + let chunk = make_chunk_no_row_group(Arc::clone(&store), "foo", 1).await; let (_, parquet_data) = load_parquet_from_store(&chunk, store).await.unwrap(); let parquet_metadata = read_parquet_metadata_from_file(parquet_data).unwrap(); diff --git a/parquet_file/src/storage_testing.rs b/parquet_file/src/storage_testing.rs index b61c40aaf4..002cdc8968 100644 --- a/parquet_file/src/storage_testing.rs +++ b/parquet_file/src/storage_testing.rs @@ -22,6 +22,7 @@ mod tests { //////////////////// // Create test data which is also the expected data let table = "table1"; + let chunk_id = 1; let (record_batches, schema, column_summaries, num_rows) = make_record_batch("foo"); let mut table_summary = TableSummary::new(table); table_summary.columns = column_summaries.clone(); @@ -41,6 +42,7 @@ mod tests { schema.clone(), table, column_summaries.clone(), + chunk_id, ) .await; diff --git a/parquet_file/src/utils.rs b/parquet_file/src/utils.rs index 3fb5f040e4..fa94cece0e 100644 --- a/parquet_file/src/utils.rs +++ b/parquet_file/src/utils.rs @@ -23,11 +23,17 @@ use internal_types::{ use object_store::{memory::InMemory, path::Path, ObjectStore, ObjectStoreApi}; use parquet::{ arrow::{ArrowReader, ParquetFileArrowReader}, - file::serialized_reader::{SerializedFileReader, SliceableCursor}, + file::{ + metadata::ParquetMetaData, + serialized_reader::{SerializedFileReader, SliceableCursor}, + }, }; use uuid::Uuid; -use crate::{chunk::ChunkMetrics, metadata::IoxMetadata}; +use crate::{ + chunk::ChunkMetrics, + metadata::{read_parquet_metadata_from_file, IoxMetadata}, +}; use crate::{ chunk::{self, Chunk}, storage::Storage, @@ -84,46 +90,45 @@ pub async fn load_parquet_from_store_for_path( Ok(parquet_data) } +/// Same as [`make_chunk`] but parquet file does not contain any row group. +pub async fn make_chunk(store: Arc, column_prefix: &str, chunk_id: u32) -> Chunk { + let (record_batches, schema, column_summaries, _num_rows) = make_record_batch(column_prefix); + make_chunk_given_record_batch( + store, + record_batches, + schema, + "table1", + column_summaries, + chunk_id, + ) + .await +} + +/// Same as [`make_chunk`] but parquet file does not contain any row group. +pub async fn make_chunk_no_row_group( + store: Arc, + column_prefix: &str, + chunk_id: u32, +) -> Chunk { + let (_, schema, column_summaries, _num_rows) = make_record_batch(column_prefix); + make_chunk_given_record_batch(store, vec![], schema, "table1", column_summaries, chunk_id).await +} + /// Create a test chunk by writing data to object store. /// -/// See [`make_record_batch`] for the data content. +/// TODO: This code creates a chunk that isn't hooked up with metrics pub async fn make_chunk_given_record_batch( store: Arc, record_batches: Vec, schema: Schema, table: &str, column_summaries: Vec, -) -> Chunk { - make_chunk_common(store, record_batches, schema, table, column_summaries).await -} - -/// Same as [`make_chunk`] but parquet file does not contain any row group. -pub async fn make_chunk(store: Arc, column_prefix: &str) -> Chunk { - let (record_batches, schema, column_summaries, _num_rows) = make_record_batch(column_prefix); - make_chunk_common(store, record_batches, schema, "table1", column_summaries).await -} - -/// Same as [`make_chunk`] but parquet file does not contain any row group. -pub async fn make_chunk_no_row_group(store: Arc, column_prefix: &str) -> Chunk { - let (_, schema, column_summaries, _num_rows) = make_record_batch(column_prefix); - make_chunk_common(store, vec![], schema, "table1", column_summaries).await -} - -/// Common code for all [`make_chunk`] and [`make_chunk_no_row_group`]. -/// -/// TODO: This code creates a chunk that isn't hooked up with metrics -async fn make_chunk_common( - store: Arc, - record_batches: Vec, - schema: Schema, - table: &str, - column_summaries: Vec, + chunk_id: u32, ) -> Chunk { let server_id = ServerId::new(NonZeroU32::new(1).unwrap()); let db_name = "db1"; let part_key = "part1"; let table_name = table; - let chunk_id = 1; let storage = Storage::new(Arc::clone(&store), server_id, db_name.to_string()); @@ -556,3 +561,21 @@ pub fn read_data_from_parquet_data(schema: SchemaRef, parquet_data: Vec) -> record_batches } + +/// Create test metadata by creating a parquet file and reading it back into memory. +/// +/// See [`make_chunk`] for details. +pub async fn make_metadata( + object_store: &Arc, + column_prefix: &str, + chunk_id: u32, +) -> (Path, ParquetMetaData) { + let chunk = make_chunk(Arc::clone(object_store), column_prefix, chunk_id).await; + let (_, parquet_data) = load_parquet_from_store(&chunk, Arc::clone(object_store)) + .await + .unwrap(); + ( + chunk.table_path(), + read_parquet_metadata_from_file(parquet_data).unwrap(), + ) +} From 92fcd7e940385cdbbae7eae090151c8bf795abd1 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Tue, 25 May 2021 14:15:31 +0200 Subject: [PATCH 03/15] feat: add a way to get OS, server ID and DB name from catalog --- parquet_file/src/catalog.rs | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/parquet_file/src/catalog.rs b/parquet_file/src/catalog.rs index 05ff6d999e..277ef970a5 100644 --- a/parquet_file/src/catalog.rs +++ b/parquet_file/src/catalog.rs @@ -429,6 +429,21 @@ where .map(|tkey| tkey.revision_counter) .expect("catalog should have at least an empty transaction") } + + /// Object store used by this catalog. + pub fn object_store(&self) -> Arc { + Arc::clone(&self.object_store) + } + + /// Server ID used by this catalog. + pub fn server_id(&self) -> ServerId { + self.server_id + } + + /// Database name used by this catalog. + pub fn db_name(&self) -> &str { + &self.db_name + } } impl Debug for PreservedCatalog From 953114af2e7b224317e207abe8d2057ae93dcfc0 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Tue, 25 May 2021 14:16:05 +0200 Subject: [PATCH 04/15] feat: add method to abort catalog transaction --- parquet_file/src/catalog.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/parquet_file/src/catalog.rs b/parquet_file/src/catalog.rs index 277ef970a5..7cc5439a20 100644 --- a/parquet_file/src/catalog.rs +++ b/parquet_file/src/catalog.rs @@ -911,6 +911,11 @@ where Ok(()) } + /// Abort transaction w/o commit. + pub fn abort(mut self) { + self.transaction = None; + } + /// Add a new parquet file to the catalog. /// /// If a file with the same path already exists an error will be returned. From cc78b5317d39960da696d2df037e0057196f1c7d Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Tue, 25 May 2021 14:17:16 +0200 Subject: [PATCH 05/15] feat: add method to get all parquet files from catalog state --- parquet_file/src/catalog.rs | 9 ++++++++- server/src/db.rs | 4 ++++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/parquet_file/src/catalog.rs b/parquet_file/src/catalog.rs index 7cc5439a20..4be25e7ecf 100644 --- a/parquet_file/src/catalog.rs +++ b/parquet_file/src/catalog.rs @@ -2,7 +2,7 @@ use std::{ collections::{ hash_map::Entry::{Occupied, Vacant}, - HashMap, + HashMap, HashSet, }, fmt::{Debug, Display}, str::FromStr, @@ -207,6 +207,9 @@ pub trait CatalogState { /// Remove parquet file from state. fn remove(&self, path: DirsAndFileName) -> Result<()>; + + /// Get a set of all parquet file paths currently tracked by this state. + fn tracked_parquet_files(&self) -> HashSet; } /// Inner mutable part of the preserved catalog. @@ -1046,6 +1049,10 @@ pub mod test_helpers { Ok(()) } + + fn tracked_parquet_files(&self) -> HashSet { + self.inner.borrow().parquet_files.keys().cloned().collect() + } } /// Break preserved catalog by moving one of the transaction files into a weird unknown version. diff --git a/server/src/db.rs b/server/src/db.rs index 59d311c7e0..2ababd94dc 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -1213,6 +1213,10 @@ impl CatalogState for Catalog { fn remove(&self, _path: DirsAndFileName) -> parquet_file::catalog::Result<()> { unimplemented!("parquet files cannot be removed from the catalog for now") } + + fn tracked_parquet_files(&self) -> std::collections::HashSet { + todo!() + } } pub mod test_helpers { From 14fdf3b7c7096d73765c26810ca93d0fb2095008 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Tue, 25 May 2021 14:18:57 +0200 Subject: [PATCH 06/15] feat: implement object store cleanup core routine --- parquet_file/src/cleanup.rs | 130 ++++++++++++++++++++++++++++++++++++ parquet_file/src/lib.rs | 1 + 2 files changed, 131 insertions(+) create mode 100644 parquet_file/src/cleanup.rs diff --git a/parquet_file/src/cleanup.rs b/parquet_file/src/cleanup.rs new file mode 100644 index 0000000000..ece8e29b8a --- /dev/null +++ b/parquet_file/src/cleanup.rs @@ -0,0 +1,130 @@ +//! Methods to cleanup the object store. + +use futures::TryStreamExt; +use object_store::{ObjectStore, ObjectStoreApi}; +use observability_deps::tracing::info; +use snafu::{ResultExt, Snafu}; + +use crate::{ + catalog::{CatalogState, PreservedCatalog}, + storage::data_location, +}; +#[derive(Debug, Snafu)] +pub enum Error { + #[snafu(display("Error during store read operation: {}", source))] + ReadError { + source: ::Error, + }, + + #[snafu(display("Error during store write operation: {}", source))] + WriteError { + source: ::Error, + }, +} + +pub type Result = std::result::Result; + +/// Delete all unreferenced parquet files. +pub async fn cleanup_unreferenced_parquet_files(catalog: &PreservedCatalog) -> Result<()> +where + S: CatalogState, +{ + // Create a transaction to prevent parallel modifications of the catalog. This avoids that we delete files there + // that are about to get added to the catalog. + let transaction = catalog.open_transaction().await; + + let all_known = catalog.state().tracked_parquet_files(); + let store = catalog.object_store(); + let prefix = data_location(&store, catalog.server_id(), catalog.db_name()); + + let mut stream = store.list(Some(&prefix)).await.context(ReadError)?; + let mut files_removed = 0; + while let Some(paths) = stream.try_next().await.context(ReadError)? { + for path in paths { + if !all_known.contains(&path.clone().into()) { + store.delete(&path).await.context(WriteError)?; + files_removed += 1; + } + } + } + + transaction.abort(); + info!("Removed {} files during clean-up.", files_removed); + + Ok(()) +} + +#[cfg(test)] +mod tests { + use std::{collections::HashSet, num::NonZeroU32, sync::Arc}; + + use data_types::server_id::ServerId; + use object_store::path::ObjectStorePath; + + use super::*; + use crate::{ + catalog::test_helpers::TestCatalogState, + utils::{make_metadata, make_object_store}, + }; + + #[tokio::test] + async fn test_cleanup() { + let object_store = make_object_store(); + let server_id = make_server_id(); + let db_name = "db1"; + + let catalog = PreservedCatalog::::new_empty( + Arc::clone(&object_store), + server_id, + db_name, + (), + ) + .await + .unwrap(); + + // create some data + let mut paths_tracked = vec![]; + let mut paths_untracked = vec![]; + { + let mut transaction = catalog.open_transaction().await; + + let (path, md) = make_metadata(&object_store, "foo", 1).await; + transaction.add_parquet(&path.clone().into(), &md).unwrap(); + paths_tracked.push(path.display()); + + let (path, md) = make_metadata(&object_store, "foo", 2).await; + transaction.add_parquet(&path.clone().into(), &md).unwrap(); + paths_tracked.push(path.display()); + + let (path, _md) = make_metadata(&object_store, "foo", 3).await; + paths_untracked.push(path.display()); + + transaction.commit().await.unwrap(); + } + + // run clean-up + cleanup_unreferenced_parquet_files(&catalog).await.unwrap(); + + // list all files + let all_files: HashSet<_> = object_store + .list(None) + .await + .unwrap() + .try_concat() + .await + .unwrap() + .iter() + .map(|p| p.display()) + .collect(); + for p in paths_tracked { + assert!(dbg!(&all_files).contains(dbg!(&p))); + } + for p in paths_untracked { + assert!(!dbg!(&all_files).contains(dbg!(&p))); + } + } + + fn make_server_id() -> ServerId { + ServerId::new(NonZeroU32::new(1).unwrap()) + } +} diff --git a/parquet_file/src/lib.rs b/parquet_file/src/lib.rs index 032aedbfad..e2969394f3 100644 --- a/parquet_file/src/lib.rs +++ b/parquet_file/src/lib.rs @@ -9,6 +9,7 @@ pub mod catalog; pub mod chunk; +pub mod cleanup; pub mod metadata; pub mod storage; pub mod table; From dd6bbeec42c9e4ba8a88937ac31f54c3189b564e Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Tue, 25 May 2021 17:12:14 +0200 Subject: [PATCH 07/15] feat: add background task to clean up OS Closes #1313. --- server/src/db.rs | 205 +++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 172 insertions(+), 33 deletions(-) diff --git a/server/src/db.rs b/server/src/db.rs index 2ababd94dc..ba932e3076 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -1,6 +1,8 @@ //! This module contains the main IOx Database object which has the //! instances of the mutable buffer, read buffer, and object store +use crate::db::catalog::chunk::ChunkState; + use super::{ buffer::{self, Buffer}, JobRegistry, @@ -33,6 +35,7 @@ use parking_lot::{Mutex, RwLock}; use parquet_file::{ catalog::{CatalogParquetInfo, CatalogState, PreservedCatalog}, chunk::{Chunk as ParquetChunk, ChunkMetrics as ParquetChunkMetrics}, + cleanup::cleanup_unreferenced_parquet_files, metadata::{ read_schema_from_parquet_metadata, read_statistics_from_parquet_metadata, IoxMetadata, }, @@ -44,11 +47,13 @@ use read_buffer::{Chunk as ReadBufferChunk, ChunkMetrics as ReadBufferChunkMetri use snafu::{ResultExt, Snafu}; use std::{ any::Any, + collections::HashSet, num::NonZeroUsize, sync::{ atomic::{AtomicUsize, Ordering}, Arc, }, + time::{Duration, Instant}, }; use system_tables::{SystemSchemaProvider, SYSTEM_SCHEMA}; use tracker::{TaskTracker, TrackedFutureExt}; @@ -913,11 +918,20 @@ impl Db { info!("started background worker"); let mut lifecycle_manager = LifecycleManager::new(Arc::clone(&self)); + let mut last_cleanup: Option = None; while !shutdown.is_cancelled() { self.worker_iterations.fetch_add(1, Ordering::Relaxed); tokio::select! { - _ = lifecycle_manager.check_for_work() => {}, + _ = async { + lifecycle_manager.check_for_work().await; + if last_cleanup.map(|d| d.elapsed() >= Duration::from_secs(500)).unwrap_or(true) { + if let Err(e) = cleanup_unreferenced_parquet_files(&self.catalog).await { + error!("error in background cleanup task: {:?}", e); + } + last_cleanup = Some(Instant::now()); + } + } => {}, _ = shutdown.cancelled() => break } } @@ -1214,8 +1228,23 @@ impl CatalogState for Catalog { unimplemented!("parquet files cannot be removed from the catalog for now") } - fn tracked_parquet_files(&self) -> std::collections::HashSet { - todo!() + fn tracked_parquet_files(&self) -> HashSet { + let mut files = HashSet::new(); + + for part in self.partitions() { + let part_guard = part.read(); + for chunk in part_guard.chunks() { + let chunk_guard = chunk.read(); + if let ChunkState::WrittenToObjectStore(_, pq_chunk) + | ChunkState::ObjectStoreOnly(pq_chunk) = chunk_guard.state() + { + let path: DirsAndFileName = pq_chunk.table_path().into(); + files.insert(path); + } + } + } + + files } } @@ -1283,7 +1312,15 @@ mod tests { utils::{load_parquet_from_store_for_path, read_data_from_parquet_data}, }; use query::{frontend::sql::SqlQueryPlanner, PartitionChunk}; - use std::{convert::TryFrom, iter::Iterator, num::NonZeroUsize, str}; + use std::{ + collections::HashSet, + convert::TryFrom, + iter::Iterator, + num::NonZeroUsize, + str, + time::{Duration, Instant}, + }; + use tokio_util::sync::CancellationToken; type Error = Box; type Result = std::result::Result; @@ -2848,43 +2885,20 @@ mod tests { // ==================== do: write data to parquet ==================== // create two chunks within the same table (to better test "new chunk ID" and "new table" during transaction // replay) - let mut chunk_ids = vec![]; - let partition_key = "1970-01-01T00"; + let mut chunks = vec![]; for _ in 0..2 { - // Write some line protocols in Mutable buffer of the DB - write_lp(db.as_ref(), "cpu bar=1 10"); - - //Now mark the MB chunk close - let chunk_id = { - let mb_chunk = db - .rollover_partition("1970-01-01T00", "cpu") - .await - .unwrap() - .unwrap(); - mb_chunk.id() - }; - // Move that MB chunk to RB chunk and drop it from MB - db.load_chunk_to_read_buffer(partition_key, "cpu", chunk_id) - .await - .unwrap(); - - // Write the RB chunk to Object Store but keep it in RB - db.write_chunk_to_object_store(partition_key, "cpu", chunk_id) - .await - .unwrap(); - - chunk_ids.push(chunk_id); + chunks.push(create_parquet_chunk(db.as_ref()).await); } // ==================== check: catalog state ==================== // the preserved catalog should now register a single file let mut paths_expected = vec![]; - for chunk_id in &chunk_ids { + for (partition_key, table_name, chunk_id) in &chunks { let chunk = { let partition = db.catalog.state().valid_partition(&partition_key).unwrap(); let partition = partition.read(); - partition.chunk("cpu", *chunk_id).unwrap() + partition.chunk(table_name, *chunk_id).unwrap() }; let chunk = chunk.read(); if let ChunkState::WrittenToObjectStore(_, chunk) = chunk.state() { @@ -2928,12 +2942,12 @@ mod tests { // ==================== check: DB state ==================== // Re-created DB should have an "object store only"-chunk - for chunk_id in &chunk_ids { + for (partition_key, table_name, chunk_id) in &chunks { let chunk = { let partition = db.catalog.state().valid_partition(&partition_key).unwrap(); let partition = partition.read(); - partition.chunk("cpu", *chunk_id).unwrap() + partition.chunk(table_name, *chunk_id).unwrap() }; let chunk = chunk.read(); assert!(matches!(chunk.state(), ChunkState::ObjectStoreOnly(_))); @@ -2942,4 +2956,129 @@ mod tests { // ==================== check: DB still writable ==================== write_lp(db.as_ref(), "cpu bar=1 10"); } + + #[tokio::test] + async fn object_store_cleanup() { + // Test that stale parquet files are removed from object store + + // ==================== setup ==================== + let object_store = Arc::new(ObjectStore::new_in_memory(InMemory::new())); + + // ==================== do: create DB ==================== + // Create a DB given a server id, an object store and a db name + let test_db = TestDb::builder() + .object_store(Arc::clone(&object_store)) + .build() + .await; + let db = Arc::new(test_db.db); + + // ==================== do: write data to parquet ==================== + // create the following chunks: + // 0: ReadBuffer + Parquet + // 1: only Parquet + // 2: dropped (not in catalog but parquet file still present) + let mut paths = vec![]; + for i in 0..3i8 { + let (partition_key, table_name, chunk_id) = create_parquet_chunk(db.as_ref()).await; + let chunk = { + let partition = db.catalog.state().valid_partition(&partition_key).unwrap(); + let partition = partition.read(); + + partition.chunk(table_name.clone(), chunk_id).unwrap() + }; + let chunk = chunk.read(); + if let ChunkState::WrittenToObjectStore(_, chunk) = chunk.state() { + paths.push(chunk.table_path().display()); + } else { + panic!("Wrong chunk state."); + } + + // drop lock + drop(chunk); + + if i == 1 { + db.unload_read_buffer(&partition_key, &table_name, chunk_id) + .await + .unwrap(); + } + if i == 2 { + db.drop_chunk(&partition_key, &table_name, chunk_id) + .unwrap(); + } + } + + // ==================== check: all files are there ==================== + let all_files = get_object_store_files(&object_store).await; + assert!(all_files.contains(&paths[0])); + assert!(all_files.contains(&paths[1])); + assert!(all_files.contains(&paths[2])); + + // ==================== do: start background task loop ==================== + let shutdown: CancellationToken = Default::default(); + let shutdown_captured = shutdown.clone(); + let db_captured = Arc::clone(&db); + let join_handle = + tokio::spawn(async move { db_captured.background_worker(shutdown_captured).await }); + + // ==================== check: after a while the dropped file should be gone ==================== + let t_0 = Instant::now(); + loop { + let all_files = get_object_store_files(&object_store).await; + if !all_files.contains(&paths[2]) { + break; + } + assert!(t_0.elapsed() < Duration::from_secs(10)); + tokio::time::sleep(Duration::from_millis(100)).await; + } + + // ==================== do: stop background task loop ==================== + shutdown.cancel(); + join_handle.await.unwrap(); + + // ==================== check: some files are there ==================== + let all_files = get_object_store_files(&object_store).await; + assert!(all_files.contains(&paths[0])); + assert!(all_files.contains(&paths[1])); + assert!(!all_files.contains(&paths[2])); + } + + async fn create_parquet_chunk(db: &Db) -> (String, String, u32) { + write_lp(db, "cpu bar=1 10"); + let partition_key = "1970-01-01T00"; + let table_name = "cpu"; + + //Now mark the MB chunk close + let chunk_id = { + let mb_chunk = db + .rollover_partition(partition_key, table_name) + .await + .unwrap() + .unwrap(); + mb_chunk.id() + }; + // Move that MB chunk to RB chunk and drop it from MB + db.load_chunk_to_read_buffer(partition_key, table_name, chunk_id) + .await + .unwrap(); + + // Write the RB chunk to Object Store but keep it in RB + db.write_chunk_to_object_store(partition_key, table_name, chunk_id) + .await + .unwrap(); + + (partition_key.to_string(), table_name.to_string(), chunk_id) + } + + async fn get_object_store_files(object_store: &ObjectStore) -> HashSet { + object_store + .list(None) + .await + .unwrap() + .try_concat() + .await + .unwrap() + .iter() + .map(|p| p.display()) + .collect() + } } From 5ed16ff294a2f41bedad5c0342e2fc2b9cbe5aee Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Wed, 26 May 2021 09:42:13 +0200 Subject: [PATCH 08/15] refactor: improve error message in `parquet_file::cleanup` --- parquet_file/src/cleanup.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/parquet_file/src/cleanup.rs b/parquet_file/src/cleanup.rs index ece8e29b8a..c7746ff675 100644 --- a/parquet_file/src/cleanup.rs +++ b/parquet_file/src/cleanup.rs @@ -11,12 +11,12 @@ use crate::{ }; #[derive(Debug, Snafu)] pub enum Error { - #[snafu(display("Error during store read operation: {}", source))] + #[snafu(display("Error from read operation while cleaning object store: {}", source))] ReadError { source: ::Error, }, - #[snafu(display("Error during store write operation: {}", source))] + #[snafu(display("Error from write operation while cleaning object store: {}", source))] WriteError { source: ::Error, }, From b55eae98da7ec64dca287d10e9d088574980bfd3 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Wed, 26 May 2021 09:50:03 +0200 Subject: [PATCH 09/15] fix: do not delete non-parquet files during catalog-driven cleanup --- parquet_file/src/cleanup.rs | 56 ++++++++++++++++++++++++++++++------- 1 file changed, 46 insertions(+), 10 deletions(-) diff --git a/parquet_file/src/cleanup.rs b/parquet_file/src/cleanup.rs index c7746ff675..8cc2017647 100644 --- a/parquet_file/src/cleanup.rs +++ b/parquet_file/src/cleanup.rs @@ -1,7 +1,7 @@ //! Methods to cleanup the object store. use futures::TryStreamExt; -use object_store::{ObjectStore, ObjectStoreApi}; +use object_store::{path::parsed::DirsAndFileName, ObjectStore, ObjectStoreApi}; use observability_deps::tracing::info; use snafu::{ResultExt, Snafu}; @@ -41,7 +41,18 @@ where let mut files_removed = 0; while let Some(paths) = stream.try_next().await.context(ReadError)? { for path in paths { - if !all_known.contains(&path.clone().into()) { + let path_parsed: DirsAndFileName = path.clone().into(); + + // only delete if all of the following conditions are met: + // - filename ends with `.parquet` + // - file is not tracked by the catalog + if path_parsed + .file_name + .as_ref() + .map(|part| part.encoded().ends_with(".parquet")) + .unwrap_or(false) + && !all_known.contains(&path_parsed) + { store.delete(&path).await.context(WriteError)?; files_removed += 1; } @@ -58,8 +69,9 @@ where mod tests { use std::{collections::HashSet, num::NonZeroU32, sync::Arc}; + use bytes::Bytes; use data_types::server_id::ServerId; - use object_store::path::ObjectStorePath; + use object_store::path::{parsed::DirsAndFileName, ObjectStorePath, Path}; use super::*; use crate::{ @@ -83,21 +95,31 @@ mod tests { .unwrap(); // create some data - let mut paths_tracked = vec![]; - let mut paths_untracked = vec![]; + let mut paths_keep = vec![]; + let mut paths_delete = vec![]; { let mut transaction = catalog.open_transaction().await; + // an ordinary tracked parquet file => keep let (path, md) = make_metadata(&object_store, "foo", 1).await; transaction.add_parquet(&path.clone().into(), &md).unwrap(); - paths_tracked.push(path.display()); + paths_keep.push(path.display()); + // another ordinary tracked parquet file => keep let (path, md) = make_metadata(&object_store, "foo", 2).await; transaction.add_parquet(&path.clone().into(), &md).unwrap(); - paths_tracked.push(path.display()); + paths_keep.push(path.display()); + // not a parquet file => keep + let mut path: DirsAndFileName = path.into(); + path.file_name = Some("foo.txt".into()); + let path = object_store.path_from_dirs_and_filename(path); + create_empty_file(&object_store, &path).await; + paths_keep.push(path.display()); + + // an untracked parquet file => delete let (path, _md) = make_metadata(&object_store, "foo", 3).await; - paths_untracked.push(path.display()); + paths_delete.push(path.display()); transaction.commit().await.unwrap(); } @@ -116,10 +138,10 @@ mod tests { .iter() .map(|p| p.display()) .collect(); - for p in paths_tracked { + for p in paths_keep { assert!(dbg!(&all_files).contains(dbg!(&p))); } - for p in paths_untracked { + for p in paths_delete { assert!(!dbg!(&all_files).contains(dbg!(&p))); } } @@ -127,4 +149,18 @@ mod tests { fn make_server_id() -> ServerId { ServerId::new(NonZeroU32::new(1).unwrap()) } + + async fn create_empty_file(object_store: &ObjectStore, path: &Path) { + let data = Bytes::default(); + let len = data.len(); + + object_store + .put( + &path, + futures::stream::once(async move { Ok(data) }), + Some(len), + ) + .await + .unwrap(); + } } From 18f5dd9ae15f046473db86f890832d7223c3038e Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Wed, 26 May 2021 09:59:46 +0200 Subject: [PATCH 10/15] test: ensure transaction lock exists during cleanup planning --- parquet_file/src/cleanup.rs | 64 ++++++++++++++++++++++++++++++------- 1 file changed, 53 insertions(+), 11 deletions(-) diff --git a/parquet_file/src/cleanup.rs b/parquet_file/src/cleanup.rs index 8cc2017647..a0702b401a 100644 --- a/parquet_file/src/cleanup.rs +++ b/parquet_file/src/cleanup.rs @@ -80,7 +80,7 @@ mod tests { }; #[tokio::test] - async fn test_cleanup() { + async fn test_cleanup_rules() { let object_store = make_object_store(); let server_id = make_server_id(); let db_name = "db1"; @@ -128,16 +128,7 @@ mod tests { cleanup_unreferenced_parquet_files(&catalog).await.unwrap(); // list all files - let all_files: HashSet<_> = object_store - .list(None) - .await - .unwrap() - .try_concat() - .await - .unwrap() - .iter() - .map(|p| p.display()) - .collect(); + let all_files = list_all_files(&object_store).await; for p in paths_keep { assert!(dbg!(&all_files).contains(dbg!(&p))); } @@ -146,6 +137,44 @@ mod tests { } } + #[tokio::test] + async fn test_cleanup_with_parallel_transaction() { + let object_store = make_object_store(); + let server_id = make_server_id(); + let db_name = "db1"; + + let catalog = PreservedCatalog::::new_empty( + Arc::clone(&object_store), + server_id, + db_name, + (), + ) + .await + .unwrap(); + + // try multiple times to provoke a conflict + for i in 0..100 { + let (path, _) = tokio::join!( + async { + let mut transaction = catalog.open_transaction().await; + + let (path, md) = make_metadata(&object_store, "foo", i).await; + transaction.add_parquet(&path.clone().into(), &md).unwrap(); + + transaction.commit().await.unwrap(); + + path.display() + }, + async { + cleanup_unreferenced_parquet_files(&catalog).await.unwrap(); + }, + ); + + let all_files = list_all_files(&object_store).await; + assert!(all_files.contains(&path)); + } + } + fn make_server_id() -> ServerId { ServerId::new(NonZeroU32::new(1).unwrap()) } @@ -163,4 +192,17 @@ mod tests { .await .unwrap(); } + + async fn list_all_files(object_store: &ObjectStore) -> HashSet { + object_store + .list(None) + .await + .unwrap() + .try_concat() + .await + .unwrap() + .iter() + .map(|p| p.display()) + .collect() + } } From d7e3bc569e9385c60fe9490bd92201f951b3af0d Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Wed, 26 May 2021 10:27:47 +0200 Subject: [PATCH 11/15] refactor: shorten time we hold the transaction lock during clean-up --- parquet_file/src/cleanup.rs | 59 ++++++++++++++++++++++++------------- 1 file changed, 38 insertions(+), 21 deletions(-) diff --git a/parquet_file/src/cleanup.rs b/parquet_file/src/cleanup.rs index a0702b401a..3a829e38cb 100644 --- a/parquet_file/src/cleanup.rs +++ b/parquet_file/src/cleanup.rs @@ -25,6 +25,8 @@ pub enum Error { pub type Result = std::result::Result; /// Delete all unreferenced parquet files. +/// +/// This will hold the transaction lock while the list of files is being gathered. pub async fn cleanup_unreferenced_parquet_files(catalog: &PreservedCatalog) -> Result<()> where S: CatalogState, @@ -37,30 +39,45 @@ where let store = catalog.object_store(); let prefix = data_location(&store, catalog.server_id(), catalog.db_name()); - let mut stream = store.list(Some(&prefix)).await.context(ReadError)?; - let mut files_removed = 0; - while let Some(paths) = stream.try_next().await.context(ReadError)? { - for path in paths { - let path_parsed: DirsAndFileName = path.clone().into(); + // gather a list of "files to remove" eagerly so we do not block transactions on the catalog for too long + let to_remove = store + .list(Some(&prefix)) + .await + .context(ReadError)? + .map_ok(|paths| { + paths + .into_iter() + .filter(|path| { + let path_parsed: DirsAndFileName = path.clone().into(); - // only delete if all of the following conditions are met: - // - filename ends with `.parquet` - // - file is not tracked by the catalog - if path_parsed - .file_name - .as_ref() - .map(|part| part.encoded().ends_with(".parquet")) - .unwrap_or(false) - && !all_known.contains(&path_parsed) - { - store.delete(&path).await.context(WriteError)?; - files_removed += 1; - } - } + // only delete if all of the following conditions are met: + // - filename ends with `.parquet` + // - file is not tracked by the catalog + path_parsed + .file_name + .as_ref() + .map(|part| part.encoded().ends_with(".parquet")) + .unwrap_or(false) + && !all_known.contains(&path_parsed) + }) + .collect::>() + }) + .try_concat() + .await + .context(ReadError)?; + + // abort transaction cleanly to avoid warnings about uncommited transactions + transaction.abort(); + + // now that the transaction lock is dropped, perform the actual (and potentially slow) delete operation + let n_files = to_remove.len(); + info!("Found {} files to delete, start deletion.", n_files); + + for path in to_remove { + store.delete(&path).await.context(WriteError)?; } - transaction.abort(); - info!("Removed {} files during clean-up.", files_removed); + info!("Finished deletion, removed {} files.", n_files); Ok(()) } From 5983336366eb80113df6d1f90e42ac1c88c43fe1 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Wed, 26 May 2021 10:34:09 +0200 Subject: [PATCH 12/15] refactor: rename `parquet_file::{utils => test_utils}` --- parquet_file/src/catalog.rs | 2 +- parquet_file/src/cleanup.rs | 2 +- parquet_file/src/lib.rs | 2 +- parquet_file/src/metadata.rs | 2 +- parquet_file/src/storage.rs | 2 +- parquet_file/src/storage_testing.rs | 5 ++++- parquet_file/src/{utils.rs => test_utils.rs} | 0 server/src/db.rs | 2 +- 8 files changed, 10 insertions(+), 7 deletions(-) rename parquet_file/src/{utils.rs => test_utils.rs} (100%) diff --git a/parquet_file/src/catalog.rs b/parquet_file/src/catalog.rs index 4be25e7ecf..e1eed14003 100644 --- a/parquet_file/src/catalog.rs +++ b/parquet_file/src/catalog.rs @@ -1087,7 +1087,7 @@ mod tests { use crate::{ metadata::{read_schema_from_parquet_metadata, read_statistics_from_parquet_metadata}, - utils::{make_metadata, make_object_store}, + test_utils::{make_metadata, make_object_store}, }; use object_store::parsed_path; diff --git a/parquet_file/src/cleanup.rs b/parquet_file/src/cleanup.rs index 3a829e38cb..10e01bb406 100644 --- a/parquet_file/src/cleanup.rs +++ b/parquet_file/src/cleanup.rs @@ -93,7 +93,7 @@ mod tests { use super::*; use crate::{ catalog::test_helpers::TestCatalogState, - utils::{make_metadata, make_object_store}, + test_utils::{make_metadata, make_object_store}, }; #[tokio::test] diff --git a/parquet_file/src/lib.rs b/parquet_file/src/lib.rs index e2969394f3..0e03ce2cbc 100644 --- a/parquet_file/src/lib.rs +++ b/parquet_file/src/lib.rs @@ -13,6 +13,6 @@ pub mod cleanup; pub mod metadata; pub mod storage; pub mod table; -pub mod utils; +pub mod test_utils; mod storage_testing; diff --git a/parquet_file/src/metadata.rs b/parquet_file/src/metadata.rs index 6cd32f999b..cfdd418e2a 100644 --- a/parquet_file/src/metadata.rs +++ b/parquet_file/src/metadata.rs @@ -497,7 +497,7 @@ mod tests { use internal_types::{schema::TIME_COLUMN_NAME, selection::Selection}; - use crate::utils::{ + use crate::test_utils::{ load_parquet_from_store, make_chunk, make_chunk_no_row_group, make_object_store, }; diff --git a/parquet_file/src/storage.rs b/parquet_file/src/storage.rs index e18acdaf36..484190c203 100644 --- a/parquet_file/src/storage.rs +++ b/parquet_file/src/storage.rs @@ -512,7 +512,7 @@ mod tests { use std::num::NonZeroU32; use super::*; - use crate::utils::{make_object_store, make_record_batch}; + use crate::test_utils::{make_object_store, make_record_batch}; use datafusion_util::MemoryStream; use object_store::parsed_path; use uuid::Uuid; diff --git a/parquet_file/src/storage_testing.rs b/parquet_file/src/storage_testing.rs index 002cdc8968..d5a19ea1b2 100644 --- a/parquet_file/src/storage_testing.rs +++ b/parquet_file/src/storage_testing.rs @@ -14,7 +14,10 @@ mod tests { read_parquet_metadata_from_file, read_schema_from_parquet_metadata, read_statistics_from_parquet_metadata, }, - utils::*, + test_utils::{ + load_parquet_from_store, make_chunk_given_record_batch, make_object_store, + make_record_batch, read_data_from_parquet_data, + }, }; #[tokio::test] diff --git a/parquet_file/src/utils.rs b/parquet_file/src/test_utils.rs similarity index 100% rename from parquet_file/src/utils.rs rename to parquet_file/src/test_utils.rs diff --git a/server/src/db.rs b/server/src/db.rs index ba932e3076..cafb651001 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -1309,7 +1309,7 @@ mod tests { }; use parquet_file::{ metadata::{read_parquet_metadata_from_file, read_schema_from_parquet_metadata}, - utils::{load_parquet_from_store_for_path, read_data_from_parquet_data}, + test_utils::{load_parquet_from_store_for_path, read_data_from_parquet_data}, }; use query::{frontend::sql::SqlQueryPlanner, PartitionChunk}; use std::{ From 1fb6af236436312a02f3c1133ddd6076f0d02dfd Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Wed, 26 May 2021 10:53:34 +0200 Subject: [PATCH 13/15] refactor: split DB background loop into lifecycle and cleanup This should prevent one from blocking / stalling the other. --- server/src/config.rs | 9 +++++- server/src/db.rs | 68 +++++++++++++++++++++++++++++--------------- 2 files changed, 53 insertions(+), 24 deletions(-) diff --git a/server/src/config.rs b/server/src/config.rs index 9d232246fc..d0abef68ff 100644 --- a/server/src/config.rs +++ b/server/src/config.rs @@ -395,7 +395,14 @@ mod test { config .db(&name) .expect("expected database") - .worker_iterations() + .worker_iterations_lifecycle() + > 0 + ); + assert!( + config + .db(&name) + .expect("expected database") + .worker_iterations_cleanup() > 0 ); diff --git a/server/src/db.rs b/server/src/db.rs index cafb651001..4788c819a9 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -53,7 +53,7 @@ use std::{ atomic::{AtomicUsize, Ordering}, Arc, }, - time::{Duration, Instant}, + time::Duration, }; use system_tables::{SystemSchemaProvider, SYSTEM_SCHEMA}; use tracker::{TaskTracker, TrackedFutureExt}; @@ -310,8 +310,11 @@ pub struct Db { /// Value is nanoseconds since the Unix Epoch. process_clock: process_clock::ProcessClock, - /// Number of iterations of the worker loop for this Db - worker_iterations: AtomicUsize, + /// Number of iterations of the worker lifecycle loop for this Db + worker_iterations_lifecycle: AtomicUsize, + + /// Number of iterations of the worker cleanup loop for this Db + worker_iterations_cleanup: AtomicUsize, /// Metric labels metric_labels: Vec, @@ -434,7 +437,8 @@ impl Db { metrics_registry, system_tables, process_clock, - worker_iterations: AtomicUsize::new(0), + worker_iterations_lifecycle: AtomicUsize::new(0), + worker_iterations_cleanup: AtomicUsize::new(0), metric_labels, } } @@ -905,9 +909,14 @@ impl Db { None } - /// Returns the number of iterations of the background worker loop - pub fn worker_iterations(&self) -> usize { - self.worker_iterations.load(Ordering::Relaxed) + /// Returns the number of iterations of the background worker lifecycle loop + pub fn worker_iterations_lifecycle(&self) -> usize { + self.worker_iterations_lifecycle.load(Ordering::Relaxed) + } + + /// Returns the number of iterations of the background worker lifecycle loop + pub fn worker_iterations_cleanup(&self) -> usize { + self.worker_iterations_cleanup.load(Ordering::Relaxed) } /// Background worker function @@ -917,24 +926,37 @@ impl Db { ) { info!("started background worker"); - let mut lifecycle_manager = LifecycleManager::new(Arc::clone(&self)); - let mut last_cleanup: Option = None; + tokio::join!( + // lifecycle manager loop + async { + let mut lifecycle_manager = LifecycleManager::new(Arc::clone(&self)); - while !shutdown.is_cancelled() { - self.worker_iterations.fetch_add(1, Ordering::Relaxed); - tokio::select! { - _ = async { - lifecycle_manager.check_for_work().await; - if last_cleanup.map(|d| d.elapsed() >= Duration::from_secs(500)).unwrap_or(true) { - if let Err(e) = cleanup_unreferenced_parquet_files(&self.catalog).await { - error!("error in background cleanup task: {:?}", e); - } - last_cleanup = Some(Instant::now()); + while !shutdown.is_cancelled() { + self.worker_iterations_lifecycle + .fetch_add(1, Ordering::Relaxed); + tokio::select! { + _ = lifecycle_manager.check_for_work() => {}, + _ = shutdown.cancelled() => break, } - } => {}, - _ = shutdown.cancelled() => break - } - } + } + }, + // object store cleanup loop + async { + while !shutdown.is_cancelled() { + self.worker_iterations_cleanup + .fetch_add(1, Ordering::Relaxed); + tokio::select! { + _ = async { + if let Err(e) = cleanup_unreferenced_parquet_files(&self.catalog).await { + error!("error in background cleanup task: {:?}", e); + } + tokio::time::sleep(Duration::from_secs(500)).await; + } => {}, + _ = shutdown.cancelled() => break, + } + } + }, + ); info!("finished background worker"); } From 24ec1a472e4b03b7e2ac16d3432aef322bcc8f18 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Wed, 26 May 2021 12:38:54 +0200 Subject: [PATCH 14/15] fix: do NOT delete parquet files that are reachable by time travel --- parquet_file/src/catalog.rs | 9 +---- parquet_file/src/cleanup.rs | 77 +++++++++++++++++++++++++++++++++++-- server/src/db.rs | 69 +++++++++++++++++---------------- 3 files changed, 110 insertions(+), 45 deletions(-) diff --git a/parquet_file/src/catalog.rs b/parquet_file/src/catalog.rs index e1eed14003..d463f874f5 100644 --- a/parquet_file/src/catalog.rs +++ b/parquet_file/src/catalog.rs @@ -2,7 +2,7 @@ use std::{ collections::{ hash_map::Entry::{Occupied, Vacant}, - HashMap, HashSet, + HashMap, }, fmt::{Debug, Display}, str::FromStr, @@ -207,9 +207,6 @@ pub trait CatalogState { /// Remove parquet file from state. fn remove(&self, path: DirsAndFileName) -> Result<()>; - - /// Get a set of all parquet file paths currently tracked by this state. - fn tracked_parquet_files(&self) -> HashSet; } /// Inner mutable part of the preserved catalog. @@ -1049,10 +1046,6 @@ pub mod test_helpers { Ok(()) } - - fn tracked_parquet_files(&self) -> HashSet { - self.inner.borrow().parquet_files.keys().cloned().collect() - } } /// Break preserved catalog by moving one of the transaction files into a weird unknown version. diff --git a/parquet_file/src/cleanup.rs b/parquet_file/src/cleanup.rs index 10e01bb406..f79844f44f 100644 --- a/parquet_file/src/cleanup.rs +++ b/parquet_file/src/cleanup.rs @@ -1,12 +1,18 @@ //! Methods to cleanup the object store. +use std::{ + collections::HashSet, + sync::{Arc, Mutex}, +}; + +use data_types::server_id::ServerId; use futures::TryStreamExt; use object_store::{path::parsed::DirsAndFileName, ObjectStore, ObjectStoreApi}; use observability_deps::tracing::info; use snafu::{ResultExt, Snafu}; use crate::{ - catalog::{CatalogState, PreservedCatalog}, + catalog::{CatalogParquetInfo, CatalogState, PreservedCatalog}, storage::data_location, }; #[derive(Debug, Snafu)] @@ -20,6 +26,9 @@ pub enum Error { WriteError { source: ::Error, }, + + #[snafu(display("Error from catalog loading while cleaning object store: {}", source))] + CatalogLoadError { source: crate::catalog::Error }, } pub type Result = std::result::Result; @@ -35,9 +44,29 @@ where // that are about to get added to the catalog. let transaction = catalog.open_transaction().await; - let all_known = catalog.state().tracked_parquet_files(); let store = catalog.object_store(); - let prefix = data_location(&store, catalog.server_id(), catalog.db_name()); + let server_id = catalog.server_id(); + let db_name = catalog.db_name(); + let all_known = { + // replay catalog transactions to track ALL (even dropped) files that are referenced + let catalog = PreservedCatalog::::load( + Arc::clone(&store), + server_id, + db_name.to_string(), + (), + ) + .await + .context(CatalogLoadError)? + .expect("catalog gone while reading it?"); + catalog + .state() + .files + .lock() + .expect("lock poissened?") + .clone() + }; + + let prefix = data_location(&store, server_id, db_name); // gather a list of "files to remove" eagerly so we do not block transactions on the catalog for too long let to_remove = store @@ -82,6 +111,45 @@ where Ok(()) } +/// Catalog state that traces all used parquet files. +struct TracerCatalogState { + files: Mutex>, +} + +impl CatalogState for TracerCatalogState { + type EmptyInput = (); + + fn new_empty(_data: Self::EmptyInput) -> Self { + Self { + files: Default::default(), + } + } + + fn clone_or_keep(origin: &Arc) -> Arc { + // no copy + Arc::clone(origin) + } + + fn add( + &self, + _object_store: Arc, + _server_id: ServerId, + _db_name: &str, + info: CatalogParquetInfo, + ) -> crate::catalog::Result<()> { + self.files + .lock() + .expect("lock poissened?") + .insert(info.path); + Ok(()) + } + + fn remove(&self, _path: DirsAndFileName) -> crate::catalog::Result<()> { + // Do NOT remove the file since we still need it for time travel + Ok(()) + } +} + #[cfg(test)] mod tests { use std::{collections::HashSet, num::NonZeroU32, sync::Arc}; @@ -122,9 +190,10 @@ mod tests { transaction.add_parquet(&path.clone().into(), &md).unwrap(); paths_keep.push(path.display()); - // another ordinary tracked parquet file => keep + // another ordinary tracked parquet file that was added and removed => keep (for time travel) let (path, md) = make_metadata(&object_store, "foo", 2).await; transaction.add_parquet(&path.clone().into(), &md).unwrap(); + transaction.remove_parquet(&path.clone().into()).unwrap(); paths_keep.push(path.display()); // not a parquet file => keep diff --git a/server/src/db.rs b/server/src/db.rs index 4788c819a9..de727fa97a 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -1,8 +1,6 @@ //! This module contains the main IOx Database object which has the //! instances of the mutable buffer, read buffer, and object store -use crate::db::catalog::chunk::ChunkState; - use super::{ buffer::{self, Buffer}, JobRegistry, @@ -47,7 +45,6 @@ use read_buffer::{Chunk as ReadBufferChunk, ChunkMetrics as ReadBufferChunkMetri use snafu::{ResultExt, Snafu}; use std::{ any::Any, - collections::HashSet, num::NonZeroUsize, sync::{ atomic::{AtomicUsize, Ordering}, @@ -1249,25 +1246,6 @@ impl CatalogState for Catalog { fn remove(&self, _path: DirsAndFileName) -> parquet_file::catalog::Result<()> { unimplemented!("parquet files cannot be removed from the catalog for now") } - - fn tracked_parquet_files(&self) -> HashSet { - let mut files = HashSet::new(); - - for part in self.partitions() { - let part_guard = part.read(); - for chunk in part_guard.chunks() { - let chunk_guard = chunk.read(); - if let ChunkState::WrittenToObjectStore(_, pq_chunk) - | ChunkState::ObjectStoreOnly(pq_chunk) = chunk_guard.state() - { - let path: DirsAndFileName = pq_chunk.table_path().into(); - files.insert(path); - } - } - } - - files - } } pub mod test_helpers { @@ -1316,6 +1294,7 @@ mod tests { use ::test_helpers::assert_contains; use arrow::record_batch::RecordBatch; use arrow_util::{assert_batches_eq, assert_batches_sorted_eq}; + use bytes::Bytes; use chrono::Utc; use data_types::{ chunk_metadata::ChunkStorage, @@ -1326,7 +1305,7 @@ mod tests { use futures::{stream, StreamExt, TryStreamExt}; use object_store::{ memory::InMemory, - path::{ObjectStorePath, Path}, + path::{parts::PathPart, ObjectStorePath, Path}, ObjectStore, ObjectStoreApi, }; use parquet_file::{ @@ -2998,8 +2977,8 @@ mod tests { // create the following chunks: // 0: ReadBuffer + Parquet // 1: only Parquet - // 2: dropped (not in catalog but parquet file still present) - let mut paths = vec![]; + // 2: dropped (not in current catalog but parquet file still present for time travel) + let mut paths_keep = vec![]; for i in 0..3i8 { let (partition_key, table_name, chunk_id) = create_parquet_chunk(db.as_ref()).await; let chunk = { @@ -3010,7 +2989,7 @@ mod tests { }; let chunk = chunk.read(); if let ChunkState::WrittenToObjectStore(_, chunk) = chunk.state() { - paths.push(chunk.table_path().display()); + paths_keep.push(chunk.table_path()); } else { panic!("Wrong chunk state."); } @@ -3029,11 +3008,20 @@ mod tests { } } + // ==================== do: create garbage ==================== + let mut path: DirsAndFileName = paths_keep[0].clone().into(); + path.file_name = Some(PathPart::from( + format!("prefix_{}", path.file_name.unwrap().encoded()).as_ref(), + )); + let path_delete = object_store.path_from_dirs_and_filename(path); + create_empty_file(&object_store, &path_delete).await; + let path_delete = path_delete.display(); + // ==================== check: all files are there ==================== let all_files = get_object_store_files(&object_store).await; - assert!(all_files.contains(&paths[0])); - assert!(all_files.contains(&paths[1])); - assert!(all_files.contains(&paths[2])); + for path in &paths_keep { + assert!(all_files.contains(&path.display())); + } // ==================== do: start background task loop ==================== let shutdown: CancellationToken = Default::default(); @@ -3046,7 +3034,7 @@ mod tests { let t_0 = Instant::now(); loop { let all_files = get_object_store_files(&object_store).await; - if !all_files.contains(&paths[2]) { + if !all_files.contains(&path_delete) { break; } assert!(t_0.elapsed() < Duration::from_secs(10)); @@ -3059,9 +3047,10 @@ mod tests { // ==================== check: some files are there ==================== let all_files = get_object_store_files(&object_store).await; - assert!(all_files.contains(&paths[0])); - assert!(all_files.contains(&paths[1])); - assert!(!all_files.contains(&paths[2])); + assert!(!all_files.contains(&path_delete)); + for path in &paths_keep { + assert!(all_files.contains(&path.display())); + } } async fn create_parquet_chunk(db: &Db) -> (String, String, u32) { @@ -3103,4 +3092,18 @@ mod tests { .map(|p| p.display()) .collect() } + + async fn create_empty_file(object_store: &ObjectStore, path: &Path) { + let data = Bytes::default(); + let len = data.len(); + + object_store + .put( + &path, + futures::stream::once(async move { Ok(data) }), + Some(len), + ) + .await + .unwrap(); + } } From 9f451423d5e8c8faedf36bd6b8624b7fdca10dbe Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Wed, 26 May 2021 12:49:44 +0200 Subject: [PATCH 15/15] feat: log files that are deleted --- parquet_file/src/cleanup.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/parquet_file/src/cleanup.rs b/parquet_file/src/cleanup.rs index f79844f44f..3655189148 100644 --- a/parquet_file/src/cleanup.rs +++ b/parquet_file/src/cleanup.rs @@ -7,7 +7,10 @@ use std::{ use data_types::server_id::ServerId; use futures::TryStreamExt; -use object_store::{path::parsed::DirsAndFileName, ObjectStore, ObjectStoreApi}; +use object_store::{ + path::{parsed::DirsAndFileName, ObjectStorePath}, + ObjectStore, ObjectStoreApi, +}; use observability_deps::tracing::info; use snafu::{ResultExt, Snafu}; @@ -103,6 +106,7 @@ where info!("Found {} files to delete, start deletion.", n_files); for path in to_remove { + info!("Delete file: {}", path.display()); store.delete(&path).await.context(WriteError)?; }