diff --git a/server/src/db.rs b/server/src/db.rs index 80ff678968..10cb9b600b 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -676,7 +676,9 @@ impl Db { let partition = partition.upgrade(); // invoke compact - let (_, fut) = lifecycle::compact_object_store::compact_object_store_chunks(partition, chunks).context(LifecycleError)?; + let (_, fut) = + lifecycle::compact_object_store::compact_object_store_chunks(partition, chunks) + .context(LifecycleError)?; fut }; diff --git a/server/src/db/lifecycle.rs b/server/src/db/lifecycle.rs index 3750d7df2c..4d41bfd7fb 100644 --- a/server/src/db/lifecycle.rs +++ b/server/src/db/lifecycle.rs @@ -205,7 +205,7 @@ impl LockablePartition for LockableCatalogPartition { fn compact_object_store_chunks( partition: LifecycleWriteGuard<'_, Partition, Self>, chunks: Vec>, - ) -> Result, Self::Error> { + ) -> Result, Self::Error> { info!(table=%partition.table_name(), partition=%partition.partition_key(), "compacting object store chunks"); let (tracker, fut) = compact_object_store::compact_object_store_chunks(partition, chunks)?; let _ = diff --git a/server/src/db/lifecycle/compact_object_store.rs b/server/src/db/lifecycle/compact_object_store.rs index d7380b9269..66a3806eb5 100644 --- a/server/src/db/lifecycle/compact_object_store.rs +++ b/server/src/db/lifecycle/compact_object_store.rs @@ -4,7 +4,14 @@ use super::{ error::{ChunksNotContiguous, ChunksNotInPartition, EmptyChunks}, LockableCatalogChunk, LockableCatalogPartition, Result, }; -use crate::{Db, db::{DbChunk, catalog::{chunk::CatalogChunk, partition::Partition}, lifecycle::merge_schemas}}; +use crate::{ + db::{ + catalog::{chunk::CatalogChunk, partition::Partition}, + lifecycle::merge_schemas, + DbChunk, + }, + Db, +}; use data_types::{chunk_metadata::ChunkOrder, delete_predicate::DeletePredicate, job::Job}; use datafusion::physical_plan::SendableRecordBatchStream; use futures::Future; @@ -35,7 +42,7 @@ use tracker::{TaskRegistration, TaskTracker, TrackedFuture, TrackedFutureExt}; pub(crate) fn compact_object_store_chunks( partition: LifecycleWriteGuard<'_, Partition, LockableCatalogPartition>, chunks: Vec>, -) ->Result<( +) -> Result<( TaskTracker, TrackedFuture>>> + Send>, )> { @@ -51,13 +58,13 @@ pub(crate) fn compact_object_store_chunks( chunks: chunk_ids.clone(), }); - // Verify input while marking and snapshoting the chunks for compacting + // Step 1: Verify input while marking and snapshoting the chunks for compacting let ( _time_of_first_write, _time_of_last_write, input_rows, _delete_predicates_before, - _query_chunks, + _os_chunks, _min_order, ) = mark_chunks_to_compact(partition, chunks, ®istration)?; @@ -65,15 +72,31 @@ pub(crate) fn compact_object_store_chunks( // track future runtime let fut_now = std::time::Instant::now(); - // Compact & persist the chunks into one chunk - // let (_compacted_stream, _schema, sort_key) = compact_chunks(&db, &query_chunks).await?; - let compacted_rows = 0; // todo: set it here + // Step 2: Compact & Persistent the os_chunks in one os_chunk + // Todo: This will be done in a sub-function that: + // . Build a compact plan that scan all os_chunks + // . Execute it the get the compacted output + // . The compacted output will be written to OS directly without going thru RUB + // and return a chunk named os_chunk + // - Extra note: since each os chunk includes 2 checkpoints: chunk and DB, + // these 2 checkpoints of the newly created os_chunk will be MAX of + // the corresponding checkpoints in each chunk of the os_chunks + let compacted_rows = 0; // todo: will be the number of rows in the output os_chunk - // Persist the newly created chunk - // todo + // Step 3: Update the preserved & in-memory catalogs to use the newly created os_chunk + // Todo: This will be done in a sub-function that creates a single transaction that: + // . Drop all os_chunks fro the preserved catalog + // . Add the newly created os_chunk into the preserved catalog - // Drop old chunks and make the new chunk available in one transaction - // todo + // Step 4: Update the in-memory catalogs to use the newly created os_chunk + // . Drop all os_chunks from the in-memory catalog + // . Add the new created os_chunk in the in-memory catalog + // This step can be done outside a transaction because the in-memory catalog + // was design to false tolerant + + // - Extra note: If there is a risk that the parquet files of os_chunks are + // permanently deleted from the Object Store between step 3 and step 4, + // we might need to put steps 3 and 4 in the same transaction // Log the summary let elapsed = now.elapsed(); @@ -87,22 +110,7 @@ pub(crate) fn compact_object_store_chunks( rows_per_sec=?throughput, "object store chunk(s) compacted"); - // todo: remove these temp lines when the todos above done - // Temp chunk to avoid compile error - // let metric_registry = Arc::clone(&db.metric_registry); - // let chunk = collect_rub(compacted_stream, &partition_addr, metric_registry.as_ref()).await.unwrap().unwrap(); - // let (_, new_chunk) = partition.create_rub_chunk( - // chunk, - // time_of_first_write, - // time_of_last_write, - // schema, - // vec![], - // min_order, - // None, - // ); - // let chunk = DbChunk::snapshot(&new_chunk.read()); - // Ok(Some(chunk)) - Ok(None) + Ok(None) // todo: will be a real chunk when all todos done }; Ok((tracker, fut.track(registration))) @@ -120,7 +128,7 @@ pub(crate) fn compact_object_store_chunks( /// . all delete predicates of the provided chunks /// . snapshot of the provided chunks /// . min(order) of the provided chunks -#[warn(clippy::type_complexity)] +#[allow(clippy::type_complexity)] fn mark_chunks_to_compact( partition: LifecycleWriteGuard<'_, Partition, LockableCatalogPartition>, chunks: Vec>, @@ -215,6 +223,7 @@ fn mark_chunks_to_compact( /// Deleted and duplicated data will be eliminated during the scan /// . Output schema of the compact plan /// . Sort Key of the output data +#[allow(dead_code)] async fn compact_chunks( db: &Db, query_chunks: &[Arc],