feat: clean up and add comments for next steps

pull/24376/head
Nga Tran 2021-11-18 10:11:51 -05:00
parent a5c04e5fe4
commit ccef3b535a
3 changed files with 41 additions and 30 deletions

View File

@ -676,7 +676,9 @@ impl Db {
let partition = partition.upgrade();
// invoke compact
let (_, fut) = lifecycle::compact_object_store::compact_object_store_chunks(partition, chunks).context(LifecycleError)?;
let (_, fut) =
lifecycle::compact_object_store::compact_object_store_chunks(partition, chunks)
.context(LifecycleError)?;
fut
};

View File

@ -205,7 +205,7 @@ impl LockablePartition for LockableCatalogPartition {
fn compact_object_store_chunks(
partition: LifecycleWriteGuard<'_, Partition, Self>,
chunks: Vec<LifecycleWriteGuard<'_, CatalogChunk, Self::Chunk>>,
) -> Result<TaskTracker<Job>, Self::Error> {
) -> Result<TaskTracker<Job>, Self::Error> {
info!(table=%partition.table_name(), partition=%partition.partition_key(), "compacting object store chunks");
let (tracker, fut) = compact_object_store::compact_object_store_chunks(partition, chunks)?;
let _ =

View File

@ -4,7 +4,14 @@ use super::{
error::{ChunksNotContiguous, ChunksNotInPartition, EmptyChunks},
LockableCatalogChunk, LockableCatalogPartition, Result,
};
use crate::{Db, db::{DbChunk, catalog::{chunk::CatalogChunk, partition::Partition}, lifecycle::merge_schemas}};
use crate::{
db::{
catalog::{chunk::CatalogChunk, partition::Partition},
lifecycle::merge_schemas,
DbChunk,
},
Db,
};
use data_types::{chunk_metadata::ChunkOrder, delete_predicate::DeletePredicate, job::Job};
use datafusion::physical_plan::SendableRecordBatchStream;
use futures::Future;
@ -35,7 +42,7 @@ use tracker::{TaskRegistration, TaskTracker, TrackedFuture, TrackedFutureExt};
pub(crate) fn compact_object_store_chunks(
partition: LifecycleWriteGuard<'_, Partition, LockableCatalogPartition>,
chunks: Vec<LifecycleWriteGuard<'_, CatalogChunk, LockableCatalogChunk>>,
) ->Result<(
) -> Result<(
TaskTracker<Job>,
TrackedFuture<impl Future<Output = Result<Option<Arc<DbChunk>>>> + Send>,
)> {
@ -51,13 +58,13 @@ pub(crate) fn compact_object_store_chunks(
chunks: chunk_ids.clone(),
});
// Verify input while marking and snapshoting the chunks for compacting
// Step 1: Verify input while marking and snapshoting the chunks for compacting
let (
_time_of_first_write,
_time_of_last_write,
input_rows,
_delete_predicates_before,
_query_chunks,
_os_chunks,
_min_order,
) = mark_chunks_to_compact(partition, chunks, &registration)?;
@ -65,15 +72,31 @@ pub(crate) fn compact_object_store_chunks(
// track future runtime
let fut_now = std::time::Instant::now();
// Compact & persist the chunks into one chunk
// let (_compacted_stream, _schema, sort_key) = compact_chunks(&db, &query_chunks).await?;
let compacted_rows = 0; // todo: set it here
// Step 2: Compact & Persistent the os_chunks in one os_chunk
// Todo: This will be done in a sub-function that:
// . Build a compact plan that scan all os_chunks
// . Execute it the get the compacted output
// . The compacted output will be written to OS directly without going thru RUB
// and return a chunk named os_chunk
// - Extra note: since each os chunk includes 2 checkpoints: chunk and DB,
// these 2 checkpoints of the newly created os_chunk will be MAX of
// the corresponding checkpoints in each chunk of the os_chunks
let compacted_rows = 0; // todo: will be the number of rows in the output os_chunk
// Persist the newly created chunk
// todo
// Step 3: Update the preserved & in-memory catalogs to use the newly created os_chunk
// Todo: This will be done in a sub-function that creates a single transaction that:
// . Drop all os_chunks fro the preserved catalog
// . Add the newly created os_chunk into the preserved catalog
// Drop old chunks and make the new chunk available in one transaction
// todo
// Step 4: Update the in-memory catalogs to use the newly created os_chunk
// . Drop all os_chunks from the in-memory catalog
// . Add the new created os_chunk in the in-memory catalog
// This step can be done outside a transaction because the in-memory catalog
// was design to false tolerant
// - Extra note: If there is a risk that the parquet files of os_chunks are
// permanently deleted from the Object Store between step 3 and step 4,
// we might need to put steps 3 and 4 in the same transaction
// Log the summary
let elapsed = now.elapsed();
@ -87,22 +110,7 @@ pub(crate) fn compact_object_store_chunks(
rows_per_sec=?throughput,
"object store chunk(s) compacted");
// todo: remove these temp lines when the todos above done
// Temp chunk to avoid compile error
// let metric_registry = Arc::clone(&db.metric_registry);
// let chunk = collect_rub(compacted_stream, &partition_addr, metric_registry.as_ref()).await.unwrap().unwrap();
// let (_, new_chunk) = partition.create_rub_chunk(
// chunk,
// time_of_first_write,
// time_of_last_write,
// schema,
// vec![],
// min_order,
// None,
// );
// let chunk = DbChunk::snapshot(&new_chunk.read());
// Ok(Some(chunk))
Ok(None)
Ok(None) // todo: will be a real chunk when all todos done
};
Ok((tracker, fut.track(registration)))
@ -120,7 +128,7 @@ pub(crate) fn compact_object_store_chunks(
/// . all delete predicates of the provided chunks
/// . snapshot of the provided chunks
/// . min(order) of the provided chunks
#[warn(clippy::type_complexity)]
#[allow(clippy::type_complexity)]
fn mark_chunks_to_compact(
partition: LifecycleWriteGuard<'_, Partition, LockableCatalogPartition>,
chunks: Vec<LifecycleWriteGuard<'_, CatalogChunk, LockableCatalogChunk>>,
@ -215,6 +223,7 @@ fn mark_chunks_to_compact(
/// Deleted and duplicated data will be eliminated during the scan
/// . Output schema of the compact plan
/// . Sort Key of the output data
#[allow(dead_code)]
async fn compact_chunks(
db: &Db,
query_chunks: &[Arc<DbChunk>],