Merge pull request #1554 from influxdata/crepererum/issue1313

feat: add background task to clean up OS
pull/24376/head
kodiakhq[bot] 2021-05-26 14:51:29 +00:00 committed by GitHub
commit 0c6277c2e9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 634 additions and 111 deletions

View File

@ -429,6 +429,21 @@ where
.map(|tkey| tkey.revision_counter)
.expect("catalog should have at least an empty transaction")
}
/// Object store used by this catalog.
pub fn object_store(&self) -> Arc<ObjectStore> {
Arc::clone(&self.object_store)
}
/// Server ID used by this catalog.
pub fn server_id(&self) -> ServerId {
self.server_id
}
/// Database name used by this catalog.
pub fn db_name(&self) -> &str {
&self.db_name
}
}
impl<S> Debug for PreservedCatalog<S>
@ -896,6 +911,11 @@ where
Ok(())
}
/// Abort transaction w/o commit.
pub fn abort(mut self) {
self.transaction = None;
}
/// Add a new parquet file to the catalog.
///
/// If a file with the same path already exists an error will be returned.
@ -1055,15 +1075,12 @@ pub mod test_helpers {
}
#[cfg(test)]
pub mod tests {
mod tests {
use std::{num::NonZeroU32, ops::Deref};
use crate::{
metadata::{
read_parquet_metadata_from_file, read_schema_from_parquet_metadata,
read_statistics_from_parquet_metadata,
},
utils::{load_parquet_from_store, make_chunk, make_object_store},
metadata::{read_schema_from_parquet_metadata, read_statistics_from_parquet_metadata},
test_utils::{make_metadata, make_object_store},
};
use object_store::parsed_path;
@ -1681,8 +1698,8 @@ pub mod tests {
.unwrap();
// get some test metadata
let metadata1 = make_metadata(object_store, "foo").await;
let metadata2 = make_metadata(object_store, "bar").await;
let (_, metadata1) = make_metadata(object_store, "foo", 1).await;
let (_, metadata2) = make_metadata(object_store, "bar", 1).await;
// track all the intermediate results
let mut trace = TestTrace::new();
@ -1992,16 +2009,4 @@ pub mod tests {
&get_catalog_parquet_files(trace.states.last().unwrap()),
);
}
/// Create test metadata. See [`make_chunk`] for details.
async fn make_metadata(
object_store: &Arc<ObjectStore>,
column_prefix: &str,
) -> ParquetMetaData {
let chunk = make_chunk(Arc::clone(object_store), column_prefix).await;
let (_, parquet_data) = load_parquet_from_store(&chunk, Arc::clone(object_store))
.await
.unwrap();
read_parquet_metadata_from_file(parquet_data).unwrap()
}
}

298
parquet_file/src/cleanup.rs Normal file
View File

@ -0,0 +1,298 @@
//! Methods to cleanup the object store.
use std::{
collections::HashSet,
sync::{Arc, Mutex},
};
use data_types::server_id::ServerId;
use futures::TryStreamExt;
use object_store::{
path::{parsed::DirsAndFileName, ObjectStorePath},
ObjectStore, ObjectStoreApi,
};
use observability_deps::tracing::info;
use snafu::{ResultExt, Snafu};
use crate::{
catalog::{CatalogParquetInfo, CatalogState, PreservedCatalog},
storage::data_location,
};
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Error from read operation while cleaning object store: {}", source))]
ReadError {
source: <ObjectStore as ObjectStoreApi>::Error,
},
#[snafu(display("Error from write operation while cleaning object store: {}", source))]
WriteError {
source: <ObjectStore as ObjectStoreApi>::Error,
},
#[snafu(display("Error from catalog loading while cleaning object store: {}", source))]
CatalogLoadError { source: crate::catalog::Error },
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Delete all unreferenced parquet files.
///
/// This will hold the transaction lock while the list of files is being gathered.
pub async fn cleanup_unreferenced_parquet_files<S>(catalog: &PreservedCatalog<S>) -> Result<()>
where
S: CatalogState,
{
// Create a transaction to prevent parallel modifications of the catalog. This avoids that we delete files there
// that are about to get added to the catalog.
let transaction = catalog.open_transaction().await;
let store = catalog.object_store();
let server_id = catalog.server_id();
let db_name = catalog.db_name();
let all_known = {
// replay catalog transactions to track ALL (even dropped) files that are referenced
let catalog = PreservedCatalog::<TracerCatalogState>::load(
Arc::clone(&store),
server_id,
db_name.to_string(),
(),
)
.await
.context(CatalogLoadError)?
.expect("catalog gone while reading it?");
catalog
.state()
.files
.lock()
.expect("lock poissened?")
.clone()
};
let prefix = data_location(&store, server_id, db_name);
// gather a list of "files to remove" eagerly so we do not block transactions on the catalog for too long
let to_remove = store
.list(Some(&prefix))
.await
.context(ReadError)?
.map_ok(|paths| {
paths
.into_iter()
.filter(|path| {
let path_parsed: DirsAndFileName = path.clone().into();
// only delete if all of the following conditions are met:
// - filename ends with `.parquet`
// - file is not tracked by the catalog
path_parsed
.file_name
.as_ref()
.map(|part| part.encoded().ends_with(".parquet"))
.unwrap_or(false)
&& !all_known.contains(&path_parsed)
})
.collect::<Vec<_>>()
})
.try_concat()
.await
.context(ReadError)?;
// abort transaction cleanly to avoid warnings about uncommited transactions
transaction.abort();
// now that the transaction lock is dropped, perform the actual (and potentially slow) delete operation
let n_files = to_remove.len();
info!("Found {} files to delete, start deletion.", n_files);
for path in to_remove {
info!("Delete file: {}", path.display());
store.delete(&path).await.context(WriteError)?;
}
info!("Finished deletion, removed {} files.", n_files);
Ok(())
}
/// Catalog state that traces all used parquet files.
struct TracerCatalogState {
files: Mutex<HashSet<DirsAndFileName>>,
}
impl CatalogState for TracerCatalogState {
type EmptyInput = ();
fn new_empty(_data: Self::EmptyInput) -> Self {
Self {
files: Default::default(),
}
}
fn clone_or_keep(origin: &Arc<Self>) -> Arc<Self> {
// no copy
Arc::clone(origin)
}
fn add(
&self,
_object_store: Arc<ObjectStore>,
_server_id: ServerId,
_db_name: &str,
info: CatalogParquetInfo,
) -> crate::catalog::Result<()> {
self.files
.lock()
.expect("lock poissened?")
.insert(info.path);
Ok(())
}
fn remove(&self, _path: DirsAndFileName) -> crate::catalog::Result<()> {
// Do NOT remove the file since we still need it for time travel
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::{collections::HashSet, num::NonZeroU32, sync::Arc};
use bytes::Bytes;
use data_types::server_id::ServerId;
use object_store::path::{parsed::DirsAndFileName, ObjectStorePath, Path};
use super::*;
use crate::{
catalog::test_helpers::TestCatalogState,
test_utils::{make_metadata, make_object_store},
};
#[tokio::test]
async fn test_cleanup_rules() {
let object_store = make_object_store();
let server_id = make_server_id();
let db_name = "db1";
let catalog = PreservedCatalog::<TestCatalogState>::new_empty(
Arc::clone(&object_store),
server_id,
db_name,
(),
)
.await
.unwrap();
// create some data
let mut paths_keep = vec![];
let mut paths_delete = vec![];
{
let mut transaction = catalog.open_transaction().await;
// an ordinary tracked parquet file => keep
let (path, md) = make_metadata(&object_store, "foo", 1).await;
transaction.add_parquet(&path.clone().into(), &md).unwrap();
paths_keep.push(path.display());
// another ordinary tracked parquet file that was added and removed => keep (for time travel)
let (path, md) = make_metadata(&object_store, "foo", 2).await;
transaction.add_parquet(&path.clone().into(), &md).unwrap();
transaction.remove_parquet(&path.clone().into()).unwrap();
paths_keep.push(path.display());
// not a parquet file => keep
let mut path: DirsAndFileName = path.into();
path.file_name = Some("foo.txt".into());
let path = object_store.path_from_dirs_and_filename(path);
create_empty_file(&object_store, &path).await;
paths_keep.push(path.display());
// an untracked parquet file => delete
let (path, _md) = make_metadata(&object_store, "foo", 3).await;
paths_delete.push(path.display());
transaction.commit().await.unwrap();
}
// run clean-up
cleanup_unreferenced_parquet_files(&catalog).await.unwrap();
// list all files
let all_files = list_all_files(&object_store).await;
for p in paths_keep {
assert!(dbg!(&all_files).contains(dbg!(&p)));
}
for p in paths_delete {
assert!(!dbg!(&all_files).contains(dbg!(&p)));
}
}
#[tokio::test]
async fn test_cleanup_with_parallel_transaction() {
let object_store = make_object_store();
let server_id = make_server_id();
let db_name = "db1";
let catalog = PreservedCatalog::<TestCatalogState>::new_empty(
Arc::clone(&object_store),
server_id,
db_name,
(),
)
.await
.unwrap();
// try multiple times to provoke a conflict
for i in 0..100 {
let (path, _) = tokio::join!(
async {
let mut transaction = catalog.open_transaction().await;
let (path, md) = make_metadata(&object_store, "foo", i).await;
transaction.add_parquet(&path.clone().into(), &md).unwrap();
transaction.commit().await.unwrap();
path.display()
},
async {
cleanup_unreferenced_parquet_files(&catalog).await.unwrap();
},
);
let all_files = list_all_files(&object_store).await;
assert!(all_files.contains(&path));
}
}
fn make_server_id() -> ServerId {
ServerId::new(NonZeroU32::new(1).unwrap())
}
async fn create_empty_file(object_store: &ObjectStore, path: &Path) {
let data = Bytes::default();
let len = data.len();
object_store
.put(
&path,
futures::stream::once(async move { Ok(data) }),
Some(len),
)
.await
.unwrap();
}
async fn list_all_files(object_store: &ObjectStore) -> HashSet<String> {
object_store
.list(None)
.await
.unwrap()
.try_concat()
.await
.unwrap()
.iter()
.map(|p| p.display())
.collect()
}
}

View File

@ -9,9 +9,10 @@
pub mod catalog;
pub mod chunk;
pub mod cleanup;
pub mod metadata;
pub mod storage;
pub mod table;
pub mod utils;
pub mod test_utils;
mod storage_testing;

View File

@ -497,7 +497,7 @@ mod tests {
use internal_types::{schema::TIME_COLUMN_NAME, selection::Selection};
use crate::utils::{
use crate::test_utils::{
load_parquet_from_store, make_chunk, make_chunk_no_row_group, make_object_store,
};
@ -505,7 +505,7 @@ mod tests {
async fn test_restore_from_file() {
// setup: preserve chunk to object store
let store = make_object_store();
let chunk = make_chunk(Arc::clone(&store), "foo").await;
let chunk = make_chunk(Arc::clone(&store), "foo", 1).await;
let (table, parquet_data) = load_parquet_from_store(&chunk, store).await.unwrap();
let parquet_metadata = read_parquet_metadata_from_file(parquet_data).unwrap();
@ -526,7 +526,7 @@ mod tests {
async fn test_restore_from_thrift() {
// setup: write chunk to object store and only keep thrift-encoded metadata
let store = make_object_store();
let chunk = make_chunk(Arc::clone(&store), "foo").await;
let chunk = make_chunk(Arc::clone(&store), "foo", 1).await;
let (table, parquet_data) = load_parquet_from_store(&chunk, store).await.unwrap();
let parquet_metadata = read_parquet_metadata_from_file(parquet_data).unwrap();
let data = parquet_metadata_to_thrift(&parquet_metadata).unwrap();
@ -549,7 +549,7 @@ mod tests {
async fn test_restore_from_file_no_row_group() {
// setup: preserve chunk to object store
let store = make_object_store();
let chunk = make_chunk_no_row_group(Arc::clone(&store), "foo").await;
let chunk = make_chunk_no_row_group(Arc::clone(&store), "foo", 1).await;
let (table, parquet_data) = load_parquet_from_store(&chunk, store).await.unwrap();
let parquet_metadata = read_parquet_metadata_from_file(parquet_data).unwrap();
@ -570,7 +570,7 @@ mod tests {
async fn test_restore_from_thrift_no_row_group() {
// setup: write chunk to object store and only keep thrift-encoded metadata
let store = make_object_store();
let chunk = make_chunk_no_row_group(Arc::clone(&store), "foo").await;
let chunk = make_chunk_no_row_group(Arc::clone(&store), "foo", 1).await;
let (table, parquet_data) = load_parquet_from_store(&chunk, store).await.unwrap();
let parquet_metadata = read_parquet_metadata_from_file(parquet_data).unwrap();
let data = parquet_metadata_to_thrift(&parquet_metadata).unwrap();
@ -592,7 +592,7 @@ mod tests {
#[tokio::test]
async fn test_make_chunk() {
let store = make_object_store();
let chunk = make_chunk(Arc::clone(&store), "foo").await;
let chunk = make_chunk(Arc::clone(&store), "foo", 1).await;
let (_, parquet_data) = load_parquet_from_store(&chunk, store).await.unwrap();
let parquet_metadata = read_parquet_metadata_from_file(parquet_data).unwrap();
@ -630,7 +630,7 @@ mod tests {
#[tokio::test]
async fn test_make_chunk_no_row_group() {
let store = make_object_store();
let chunk = make_chunk_no_row_group(Arc::clone(&store), "foo").await;
let chunk = make_chunk_no_row_group(Arc::clone(&store), "foo", 1).await;
let (_, parquet_data) = load_parquet_from_store(&chunk, store).await.unwrap();
let parquet_metadata = read_parquet_metadata_from_file(parquet_data).unwrap();

View File

@ -162,13 +162,10 @@ impl Storage {
table_name: String,
) -> object_store::path::Path {
// Full path of the file in object store
// <writer id>/<database>/data/<partition key>/<chunk id>/<table
// <server id>/<database>/data/<partition key>/<chunk id>/<table
// name>.parquet
let mut path = self.object_store.new_path();
path.push_dir(self.server_id.to_string());
path.push_dir(self.db_name.clone());
path.push_dir("data");
let mut path = data_location(&self.object_store, self.server_id, &self.db_name);
path.push_dir(partition_key);
path.push_dir(chunk_id.to_string());
let file_name = format!("{}.parquet", table_name);
@ -491,12 +488,31 @@ impl TryClone for MemWriter {
}
}
/// Location where parquet data goes to.
///
/// Schema currently is:
///
/// ```text
/// <writer_id>/<database>/data
/// ```
pub(crate) fn data_location(
object_store: &ObjectStore,
server_id: ServerId,
db_name: &str,
) -> Path {
let mut path = object_store.new_path();
path.push_dir(server_id.to_string());
path.push_dir(db_name.to_string());
path.push_dir("data");
path
}
#[cfg(test)]
mod tests {
use std::num::NonZeroU32;
use super::*;
use crate::utils::{make_object_store, make_record_batch};
use crate::test_utils::{make_object_store, make_record_batch};
use datafusion_util::MemoryStream;
use object_store::parsed_path;
use uuid::Uuid;

View File

@ -14,7 +14,10 @@ mod tests {
read_parquet_metadata_from_file, read_schema_from_parquet_metadata,
read_statistics_from_parquet_metadata,
},
utils::*,
test_utils::{
load_parquet_from_store, make_chunk_given_record_batch, make_object_store,
make_record_batch, read_data_from_parquet_data,
},
};
#[tokio::test]
@ -22,6 +25,7 @@ mod tests {
////////////////////
// Create test data which is also the expected data
let table = "table1";
let chunk_id = 1;
let (record_batches, schema, column_summaries, num_rows) = make_record_batch("foo");
let mut table_summary = TableSummary::new(table);
table_summary.columns = column_summaries.clone();
@ -41,6 +45,7 @@ mod tests {
schema.clone(),
table,
column_summaries.clone(),
chunk_id,
)
.await;

View File

@ -23,11 +23,17 @@ use internal_types::{
use object_store::{memory::InMemory, path::Path, ObjectStore, ObjectStoreApi};
use parquet::{
arrow::{ArrowReader, ParquetFileArrowReader},
file::serialized_reader::{SerializedFileReader, SliceableCursor},
file::{
metadata::ParquetMetaData,
serialized_reader::{SerializedFileReader, SliceableCursor},
},
};
use uuid::Uuid;
use crate::{chunk::ChunkMetrics, metadata::IoxMetadata};
use crate::{
chunk::ChunkMetrics,
metadata::{read_parquet_metadata_from_file, IoxMetadata},
};
use crate::{
chunk::{self, Chunk},
storage::Storage,
@ -84,46 +90,45 @@ pub async fn load_parquet_from_store_for_path(
Ok(parquet_data)
}
/// Same as [`make_chunk`] but parquet file does not contain any row group.
pub async fn make_chunk(store: Arc<ObjectStore>, column_prefix: &str, chunk_id: u32) -> Chunk {
let (record_batches, schema, column_summaries, _num_rows) = make_record_batch(column_prefix);
make_chunk_given_record_batch(
store,
record_batches,
schema,
"table1",
column_summaries,
chunk_id,
)
.await
}
/// Same as [`make_chunk`] but parquet file does not contain any row group.
pub async fn make_chunk_no_row_group(
store: Arc<ObjectStore>,
column_prefix: &str,
chunk_id: u32,
) -> Chunk {
let (_, schema, column_summaries, _num_rows) = make_record_batch(column_prefix);
make_chunk_given_record_batch(store, vec![], schema, "table1", column_summaries, chunk_id).await
}
/// Create a test chunk by writing data to object store.
///
/// See [`make_record_batch`] for the data content.
/// TODO: This code creates a chunk that isn't hooked up with metrics
pub async fn make_chunk_given_record_batch(
store: Arc<ObjectStore>,
record_batches: Vec<RecordBatch>,
schema: Schema,
table: &str,
column_summaries: Vec<ColumnSummary>,
) -> Chunk {
make_chunk_common(store, record_batches, schema, table, column_summaries).await
}
/// Same as [`make_chunk`] but parquet file does not contain any row group.
pub async fn make_chunk(store: Arc<ObjectStore>, column_prefix: &str) -> Chunk {
let (record_batches, schema, column_summaries, _num_rows) = make_record_batch(column_prefix);
make_chunk_common(store, record_batches, schema, "table1", column_summaries).await
}
/// Same as [`make_chunk`] but parquet file does not contain any row group.
pub async fn make_chunk_no_row_group(store: Arc<ObjectStore>, column_prefix: &str) -> Chunk {
let (_, schema, column_summaries, _num_rows) = make_record_batch(column_prefix);
make_chunk_common(store, vec![], schema, "table1", column_summaries).await
}
/// Common code for all [`make_chunk`] and [`make_chunk_no_row_group`].
///
/// TODO: This code creates a chunk that isn't hooked up with metrics
async fn make_chunk_common(
store: Arc<ObjectStore>,
record_batches: Vec<RecordBatch>,
schema: Schema,
table: &str,
column_summaries: Vec<ColumnSummary>,
chunk_id: u32,
) -> Chunk {
let server_id = ServerId::new(NonZeroU32::new(1).unwrap());
let db_name = "db1";
let part_key = "part1";
let table_name = table;
let chunk_id = 1;
let storage = Storage::new(Arc::clone(&store), server_id, db_name.to_string());
@ -556,3 +561,21 @@ pub fn read_data_from_parquet_data(schema: SchemaRef, parquet_data: Vec<u8>) ->
record_batches
}
/// Create test metadata by creating a parquet file and reading it back into memory.
///
/// See [`make_chunk`] for details.
pub async fn make_metadata(
object_store: &Arc<ObjectStore>,
column_prefix: &str,
chunk_id: u32,
) -> (Path, ParquetMetaData) {
let chunk = make_chunk(Arc::clone(object_store), column_prefix, chunk_id).await;
let (_, parquet_data) = load_parquet_from_store(&chunk, Arc::clone(object_store))
.await
.unwrap();
(
chunk.table_path(),
read_parquet_metadata_from_file(parquet_data).unwrap(),
)
}

View File

@ -395,7 +395,14 @@ mod test {
config
.db(&name)
.expect("expected database")
.worker_iterations()
.worker_iterations_lifecycle()
> 0
);
assert!(
config
.db(&name)
.expect("expected database")
.worker_iterations_cleanup()
> 0
);

View File

@ -33,6 +33,7 @@ use parking_lot::{Mutex, RwLock};
use parquet_file::{
catalog::{CatalogParquetInfo, CatalogState, PreservedCatalog},
chunk::{Chunk as ParquetChunk, ChunkMetrics as ParquetChunkMetrics},
cleanup::cleanup_unreferenced_parquet_files,
metadata::{
read_schema_from_parquet_metadata, read_statistics_from_parquet_metadata, IoxMetadata,
},
@ -49,6 +50,7 @@ use std::{
atomic::{AtomicUsize, Ordering},
Arc,
},
time::Duration,
};
use system_tables::{SystemSchemaProvider, SYSTEM_SCHEMA};
use tracker::{TaskTracker, TrackedFutureExt};
@ -305,8 +307,11 @@ pub struct Db {
/// Value is nanoseconds since the Unix Epoch.
process_clock: process_clock::ProcessClock,
/// Number of iterations of the worker loop for this Db
worker_iterations: AtomicUsize,
/// Number of iterations of the worker lifecycle loop for this Db
worker_iterations_lifecycle: AtomicUsize,
/// Number of iterations of the worker cleanup loop for this Db
worker_iterations_cleanup: AtomicUsize,
/// Metric labels
metric_labels: Vec<KeyValue>,
@ -429,7 +434,8 @@ impl Db {
metrics_registry,
system_tables,
process_clock,
worker_iterations: AtomicUsize::new(0),
worker_iterations_lifecycle: AtomicUsize::new(0),
worker_iterations_cleanup: AtomicUsize::new(0),
metric_labels,
}
}
@ -900,9 +906,14 @@ impl Db {
None
}
/// Returns the number of iterations of the background worker loop
pub fn worker_iterations(&self) -> usize {
self.worker_iterations.load(Ordering::Relaxed)
/// Returns the number of iterations of the background worker lifecycle loop
pub fn worker_iterations_lifecycle(&self) -> usize {
self.worker_iterations_lifecycle.load(Ordering::Relaxed)
}
/// Returns the number of iterations of the background worker lifecycle loop
pub fn worker_iterations_cleanup(&self) -> usize {
self.worker_iterations_cleanup.load(Ordering::Relaxed)
}
/// Background worker function
@ -912,15 +923,37 @@ impl Db {
) {
info!("started background worker");
tokio::join!(
// lifecycle manager loop
async {
let mut lifecycle_manager = LifecycleManager::new(Arc::clone(&self));
while !shutdown.is_cancelled() {
self.worker_iterations.fetch_add(1, Ordering::Relaxed);
self.worker_iterations_lifecycle
.fetch_add(1, Ordering::Relaxed);
tokio::select! {
_ = lifecycle_manager.check_for_work() => {},
_ = shutdown.cancelled() => break
_ = shutdown.cancelled() => break,
}
}
},
// object store cleanup loop
async {
while !shutdown.is_cancelled() {
self.worker_iterations_cleanup
.fetch_add(1, Ordering::Relaxed);
tokio::select! {
_ = async {
if let Err(e) = cleanup_unreferenced_parquet_files(&self.catalog).await {
error!("error in background cleanup task: {:?}", e);
}
tokio::time::sleep(Duration::from_secs(500)).await;
} => {},
_ = shutdown.cancelled() => break,
}
}
},
);
info!("finished background worker");
}
@ -1261,6 +1294,7 @@ mod tests {
use ::test_helpers::assert_contains;
use arrow::record_batch::RecordBatch;
use arrow_util::{assert_batches_eq, assert_batches_sorted_eq};
use bytes::Bytes;
use chrono::Utc;
use data_types::{
chunk_metadata::ChunkStorage,
@ -1271,15 +1305,23 @@ mod tests {
use futures::{stream, StreamExt, TryStreamExt};
use object_store::{
memory::InMemory,
path::{ObjectStorePath, Path},
path::{parts::PathPart, ObjectStorePath, Path},
ObjectStore, ObjectStoreApi,
};
use parquet_file::{
metadata::{read_parquet_metadata_from_file, read_schema_from_parquet_metadata},
utils::{load_parquet_from_store_for_path, read_data_from_parquet_data},
test_utils::{load_parquet_from_store_for_path, read_data_from_parquet_data},
};
use query::{frontend::sql::SqlQueryPlanner, PartitionChunk};
use std::{convert::TryFrom, iter::Iterator, num::NonZeroUsize, str};
use std::{
collections::HashSet,
convert::TryFrom,
iter::Iterator,
num::NonZeroUsize,
str,
time::{Duration, Instant},
};
use tokio_util::sync::CancellationToken;
type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
type Result<T, E = Error> = std::result::Result<T, E>;
@ -2844,43 +2886,20 @@ mod tests {
// ==================== do: write data to parquet ====================
// create two chunks within the same table (to better test "new chunk ID" and "new table" during transaction
// replay)
let mut chunk_ids = vec![];
let partition_key = "1970-01-01T00";
let mut chunks = vec![];
for _ in 0..2 {
// Write some line protocols in Mutable buffer of the DB
write_lp(db.as_ref(), "cpu bar=1 10");
//Now mark the MB chunk close
let chunk_id = {
let mb_chunk = db
.rollover_partition("1970-01-01T00", "cpu")
.await
.unwrap()
.unwrap();
mb_chunk.id()
};
// Move that MB chunk to RB chunk and drop it from MB
db.load_chunk_to_read_buffer(partition_key, "cpu", chunk_id)
.await
.unwrap();
// Write the RB chunk to Object Store but keep it in RB
db.write_chunk_to_object_store(partition_key, "cpu", chunk_id)
.await
.unwrap();
chunk_ids.push(chunk_id);
chunks.push(create_parquet_chunk(db.as_ref()).await);
}
// ==================== check: catalog state ====================
// the preserved catalog should now register a single file
let mut paths_expected = vec![];
for chunk_id in &chunk_ids {
for (partition_key, table_name, chunk_id) in &chunks {
let chunk = {
let partition = db.catalog.state().valid_partition(&partition_key).unwrap();
let partition = partition.read();
partition.chunk("cpu", *chunk_id).unwrap()
partition.chunk(table_name, *chunk_id).unwrap()
};
let chunk = chunk.read();
if let ChunkState::WrittenToObjectStore(_, chunk) = chunk.state() {
@ -2924,12 +2943,12 @@ mod tests {
// ==================== check: DB state ====================
// Re-created DB should have an "object store only"-chunk
for chunk_id in &chunk_ids {
for (partition_key, table_name, chunk_id) in &chunks {
let chunk = {
let partition = db.catalog.state().valid_partition(&partition_key).unwrap();
let partition = partition.read();
partition.chunk("cpu", *chunk_id).unwrap()
partition.chunk(table_name, *chunk_id).unwrap()
};
let chunk = chunk.read();
assert!(matches!(chunk.state(), ChunkState::ObjectStoreOnly(_)));
@ -2938,4 +2957,153 @@ mod tests {
// ==================== check: DB still writable ====================
write_lp(db.as_ref(), "cpu bar=1 10");
}
#[tokio::test]
async fn object_store_cleanup() {
// Test that stale parquet files are removed from object store
// ==================== setup ====================
let object_store = Arc::new(ObjectStore::new_in_memory(InMemory::new()));
// ==================== do: create DB ====================
// Create a DB given a server id, an object store and a db name
let test_db = TestDb::builder()
.object_store(Arc::clone(&object_store))
.build()
.await;
let db = Arc::new(test_db.db);
// ==================== do: write data to parquet ====================
// create the following chunks:
// 0: ReadBuffer + Parquet
// 1: only Parquet
// 2: dropped (not in current catalog but parquet file still present for time travel)
let mut paths_keep = vec![];
for i in 0..3i8 {
let (partition_key, table_name, chunk_id) = create_parquet_chunk(db.as_ref()).await;
let chunk = {
let partition = db.catalog.state().valid_partition(&partition_key).unwrap();
let partition = partition.read();
partition.chunk(table_name.clone(), chunk_id).unwrap()
};
let chunk = chunk.read();
if let ChunkState::WrittenToObjectStore(_, chunk) = chunk.state() {
paths_keep.push(chunk.table_path());
} else {
panic!("Wrong chunk state.");
}
// drop lock
drop(chunk);
if i == 1 {
db.unload_read_buffer(&partition_key, &table_name, chunk_id)
.await
.unwrap();
}
if i == 2 {
db.drop_chunk(&partition_key, &table_name, chunk_id)
.unwrap();
}
}
// ==================== do: create garbage ====================
let mut path: DirsAndFileName = paths_keep[0].clone().into();
path.file_name = Some(PathPart::from(
format!("prefix_{}", path.file_name.unwrap().encoded()).as_ref(),
));
let path_delete = object_store.path_from_dirs_and_filename(path);
create_empty_file(&object_store, &path_delete).await;
let path_delete = path_delete.display();
// ==================== check: all files are there ====================
let all_files = get_object_store_files(&object_store).await;
for path in &paths_keep {
assert!(all_files.contains(&path.display()));
}
// ==================== do: start background task loop ====================
let shutdown: CancellationToken = Default::default();
let shutdown_captured = shutdown.clone();
let db_captured = Arc::clone(&db);
let join_handle =
tokio::spawn(async move { db_captured.background_worker(shutdown_captured).await });
// ==================== check: after a while the dropped file should be gone ====================
let t_0 = Instant::now();
loop {
let all_files = get_object_store_files(&object_store).await;
if !all_files.contains(&path_delete) {
break;
}
assert!(t_0.elapsed() < Duration::from_secs(10));
tokio::time::sleep(Duration::from_millis(100)).await;
}
// ==================== do: stop background task loop ====================
shutdown.cancel();
join_handle.await.unwrap();
// ==================== check: some files are there ====================
let all_files = get_object_store_files(&object_store).await;
assert!(!all_files.contains(&path_delete));
for path in &paths_keep {
assert!(all_files.contains(&path.display()));
}
}
async fn create_parquet_chunk(db: &Db) -> (String, String, u32) {
write_lp(db, "cpu bar=1 10");
let partition_key = "1970-01-01T00";
let table_name = "cpu";
//Now mark the MB chunk close
let chunk_id = {
let mb_chunk = db
.rollover_partition(partition_key, table_name)
.await
.unwrap()
.unwrap();
mb_chunk.id()
};
// Move that MB chunk to RB chunk and drop it from MB
db.load_chunk_to_read_buffer(partition_key, table_name, chunk_id)
.await
.unwrap();
// Write the RB chunk to Object Store but keep it in RB
db.write_chunk_to_object_store(partition_key, table_name, chunk_id)
.await
.unwrap();
(partition_key.to_string(), table_name.to_string(), chunk_id)
}
async fn get_object_store_files(object_store: &ObjectStore) -> HashSet<String> {
object_store
.list(None)
.await
.unwrap()
.try_concat()
.await
.unwrap()
.iter()
.map(|p| p.display())
.collect()
}
async fn create_empty_file(object_store: &ObjectStore, path: &Path) {
let data = Bytes::default();
let len = data.len();
object_store
.put(
&path,
futures::stream::once(async move { Ok(data) }),
Some(len),
)
.await
.unwrap();
}
}