refactor: address review comments

pull/24376/head
Nga Tran 2021-12-01 13:11:17 -05:00
parent 254d35a6c2
commit 60807c2d95
2 changed files with 11 additions and 21 deletions

View File

@ -574,16 +574,12 @@ impl Db {
// We should only report persisted chunks or chunks that are currently being persisted, because the
// preserved catalog does not care about purely in-mem chunks.
// Capture ID of to be created chunk for the in-progress compacting OS chunks
let (to_be_created_chunk_id, compacting_os) =
match chunk.in_lifecycle_compacting_object_store() {
Some(id) => (Some(id), true),
_ => (None, false),
};
// Capture ID of to-be-created chunk for the in-progress compacting OS chunks
let to_be_created_chunk_id = chunk.in_lifecycle_compacting_object_store();
if matches!(chunk.stage(), ChunkStage::Persisted { .. })
|| chunk.is_in_lifecycle(ChunkLifecycleAction::Persisting)
|| compacting_os
|| to_be_created_chunk_id.is_some()
{
affected_persisted_chunks.push(ChunkAddrWithoutDatabase {
table_name: Arc::clone(&chunk.addr().table_name),
@ -591,11 +587,11 @@ impl Db {
chunk_id: chunk.addr().chunk_id,
});
if compacting_os {
if let Some(chunk_id) = to_be_created_chunk_id {
affected_persisted_chunks.push(ChunkAddrWithoutDatabase {
table_name: Arc::clone(&chunk.addr().table_name),
partition_key: Arc::clone(&chunk.addr().partition_key),
chunk_id: to_be_created_chunk_id.unwrap(),
chunk_id,
});
}
}

View File

@ -149,7 +149,7 @@ pub(crate) fn compact_object_store_chunks(
compacting_os_chunks.partition,
delete_predicates_before,
)
.await?;
.await;
if let Some(compacted_and_persisted_chunk) = compacted_and_persisted_chunk {
compacted_rows = compacted_and_persisted_chunk.rows();
@ -168,7 +168,7 @@ pub(crate) fn compact_object_store_chunks(
rows_per_sec=?throughput,
"object store chunk(s) compacted");
Ok(dbchunk) // todo: consider to return Ok(None) when appropriate
Ok(dbchunk)
};
Ok((tracker, fut.track(registration)))
@ -427,11 +427,7 @@ async fn update_preserved_catalog(
// Add new chunk if compaction returns some data
if let Some(parquet_chunk) = parquet_chunk {
let catalog_parquet_info = CatalogParquetInfo {
path: parquet_chunk.path().clone(),
file_size_bytes: parquet_chunk.file_size_bytes(),
metadata: parquet_chunk.parquet_metadata(),
};
let catalog_parquet_info = CatalogParquetInfo::from_chunk(parquet_chunk);
transaction.add_parquet(&catalog_parquet_info);
}
@ -447,7 +443,7 @@ async fn update_in_memory_catalog(
parquet_chunk: Option<Arc<ParquetChunk>>,
partition: Arc<RwLock<Partition>>,
delete_predicates_before: HashSet<Arc<DeletePredicate>>,
) -> Result<Option<Arc<DbChunk>>> {
) -> Option<Arc<DbChunk>> {
// Acquire write lock to drop the old chunks while also getting delete predicates added during compaction
let mut partition = partition.write();
@ -491,7 +487,7 @@ async fn update_in_memory_catalog(
// drop partition lock
std::mem::drop(partition);
Ok(dbchunk)
dbchunk
}
////////////////////////////////////////////////////////////
@ -895,9 +891,7 @@ mod tests {
// todo: add tests
// . compact with deletes happening during compaction
// . compact with deletes/duplicates that lead to empty result
// --- this needs more coding before testing
// . verify checkpoints
// . replay
// . en-to-end tests to not only verify row num but also data
// . end-to-end tests to not only verify row num but also data
}