refactor: address to cleanup-lock issue

pull/24376/head
Nga Tran 2021-11-22 10:15:43 -05:00
parent 3b2e5c445b
commit 09fd25d28e
1 changed files with 33 additions and 68 deletions

View File

@ -82,26 +82,39 @@ pub(crate) fn compact_object_store_chunks(
// track future runtime
let fut_now = std::time::Instant::now();
// Step 2: Compact & Persistent the os_chunks in one os_chunk
let compacted_and_persisted_chunk = compact_persist_os_chunks(
&db,
&partition_addr,
&compacting_os_chunks.os_chunks,
compacting_os_chunks.partition_checkpoint,
compacting_os_chunks.database_checkpoint,
compacting_os_chunks.time_of_first_write,
compacting_os_chunks.time_of_last_write,
compacting_os_chunks.min_order,
)
.await?;
let compacted_rows = compacted_and_persisted_chunk.parquet_chunk.rows();
let _schema = compacted_and_persisted_chunk.schema;
// Step 2: Compact the os chunks into a stream
let compacted_stream = compact_chunks(&db, &compacting_os_chunks.os_chunks).await?;
let compacted_rows;
let _schema = compacted_stream.schema;
let sort_key = compacted_stream.sort_key;
// Step 3: Update the preserved & in-memory catalogs to use the newly created os_chunk
// Todo: This will be done in a sub-function that creates a single transaction that:
// . Drop all os_chunks from the preserved catalog
// . Add the newly created os_chunk into the preserved catalog
// Extra: delete_predicates_after must be included here or below (detail will be figured out)
// Step 3: Start to persist files and update the preserved catalog accordingly
// This process needs to hold cleanup lock to avoid the persisted file was deleted right after
// it is created and before it is updated in the preserved catalog
{
// fetch shared (= read) guard preventing the cleanup job from deleting our files
let _guard = db.cleanup_lock.read().await;
// Step 3.1: Write the chunk as a parquet file into the object store
let compacted_and_persisted_chunk = persist_stream_to_chunk(
&db,
&partition_addr,
compacted_stream.stream,
compacting_os_chunks.partition_checkpoint,
compacting_os_chunks.database_checkpoint,
compacting_os_chunks.time_of_first_write,
compacting_os_chunks.time_of_last_write,
compacting_os_chunks.min_order,
)
.await?;
compacted_rows = compacted_and_persisted_chunk.rows();
// Step 3.2: Update the preserved catalogs to use the newly created os_chunk
// Todo: This will be done in a sub-function that creates a single transaction that:
// . Drop all os_chunks from the preserved catalog
// . Add the newly created os_chunk into the preserved catalog
// Extra: delete_predicates_after must be included here or below (detail will be figured out)
} // End of cleanup locking
// Step 4: Update the in-memory catalogs to use the newly created os_chunk
// . Drop all os_chunks from the in-memory catalog
@ -120,7 +133,7 @@ pub(crate) fn compact_object_store_chunks(
(compacting_os_chunks.input_rows as u128 * 1_000_000_000) / elapsed.as_nanos();
info!(input_chunks=chunk_ids.len(),
%compacting_os_chunks.input_rows, %compacted_rows,
%compacted_and_persisted_chunk.sort_key,
%sort_key,
compaction_took = ?elapsed,
fut_execution_duration= ?fut_now.elapsed(),
rows_per_sec=?throughput,
@ -255,54 +268,6 @@ struct CompactingOsChunks {
partition_checkpoint: PartitionCheckpoint,
}
// Compact & Persistent the os_chunks in one os_chunk
// . Build a compact plan that scan all os_chunks
// . Execute it the get the compacted output
// . The compacted output will be written to OS directly without going thru RUB
// and return a chunk named os_chunk
// - Extra note: since each os chunk includes 2 checkpoints: chunk and DB,
// these 2 checkpoints of the newly created os_chunk will be MAX of
// the corresponding checkpoints in each chunk of the os_chunks
#[allow(clippy::too_many_arguments)]
async fn compact_persist_os_chunks<'a>(
db: &'a Db,
partition_addr: &'a PartitionAddr,
os_chunks: &'a [Arc<DbChunk>],
partition_checkpoint: PartitionCheckpoint,
database_checkpoint: DatabaseCheckpoint,
time_of_first_write: Time,
time_of_last_write: Time,
chunk_order: ChunkOrder,
) -> Result<PersistedOutput> {
let compacted_stream = compact_chunks(db, os_chunks).await?;
let parquet_chunk = persist_stream_to_chunk(
db,
partition_addr,
compacted_stream.stream,
partition_checkpoint,
database_checkpoint,
time_of_first_write,
time_of_last_write,
chunk_order,
)
.await?;
Ok(PersistedOutput {
parquet_chunk,
schema: compacted_stream.schema,
sort_key: compacted_stream.sort_key,
})
}
/// Struct holding the output of a persisted chunk
#[derive(Debug, Clone)]
struct PersistedOutput {
parquet_chunk: Arc<ParquetChunk>,
schema: Arc<Schema>,
sort_key: String,
}
/// Create query plan to compact the given DbChunks and return its output stream
/// Return:
/// . stream of output record batch of the scanned chunks Result<SendableRecordBatchStream>