refactor: address review comments

pull/24376/head
Nga Tran 2021-11-22 13:18:54 -05:00
parent 04ba0b5181
commit 731dee4e91
1 changed files with 41 additions and 102 deletions

View File

@ -96,15 +96,23 @@ pub(crate) fn compact_object_store_chunks(
let _guard = db.cleanup_lock.read().await;
// Step 3.1: Write the chunk as a parquet file into the object store
let iox_metadata = IoxMetadata {
creation_timestamp: db.time_provider.now(),
table_name: Arc::clone(&partition_addr.table_name),
partition_key: Arc::clone(&partition_addr.partition_key),
chunk_id: ChunkId::new(),
partition_checkpoint: compacting_os_chunks.partition_checkpoint.clone(),
database_checkpoint: compacting_os_chunks.database_checkpoint.clone(),
time_of_first_write: compacting_os_chunks.time_of_first_write,
time_of_last_write: compacting_os_chunks.time_of_last_write,
chunk_order: compacting_os_chunks.min_order,
};
let compacted_and_persisted_chunk = persist_stream_to_chunk(
&db,
&partition_addr,
compacted_stream.stream,
compacting_os_chunks.partition_checkpoint,
compacting_os_chunks.database_checkpoint,
compacting_os_chunks.time_of_first_write,
compacting_os_chunks.time_of_last_write,
compacting_os_chunks.min_order,
iox_metadata,
)
.await?;
compacted_rows = compacted_and_persisted_chunk.rows();
@ -159,7 +167,6 @@ pub(crate) fn compact_object_store_chunks(
/// . min(order) of the provided chunks
/// . max(database_checkpoint) of the provided chunks
/// . max(partition_checkpoint) of the provided chunks
#[allow(clippy::type_complexity)]
fn mark_chunks_to_compact(
partition: LifecycleWriteGuard<'_, Partition, LockableCatalogPartition>,
chunks: Vec<LifecycleWriteGuard<'_, CatalogChunk, LockableCatalogChunk>>,
@ -174,18 +181,18 @@ fn mark_chunks_to_compact(
let partition_addr = partition.addr().clone();
// Mark and snapshot chunks, then drop locks
let mut time_of_first_write: Option<Time> = None;
let mut time_of_last_write: Option<Time> = None;
let mut time_of_first_write = Time::MAX;
let mut time_of_last_write = Time::MIN;
let mut chunk_orders = BTreeSet::new();
let mut input_rows = 0;
let mut delete_predicates: HashSet<Arc<DeletePredicate>> = HashSet::new();
let mut min_order = ChunkOrder::MAX;
// Todo: find a better way to initialize these
// initialize checkpoints
let database_checkpoint = DatabaseCheckpoint::new(Default::default());
let partition_checkpoint = PartitionCheckpoint::new(
Arc::from("table"),
Arc::from("part"),
Arc::clone(&partition_addr.table_name),
Arc::clone(&partition_addr.partition_key),
Default::default(),
Time::from_timestamp_nanos(0),
);
@ -208,14 +215,10 @@ fn mark_chunks_to_compact(
input_rows += chunk.table_summary().total_count();
let candidate_first = chunk.time_of_first_write();
time_of_first_write = time_of_first_write
.map(|prev_first| prev_first.min(candidate_first))
.or(Some(candidate_first));
time_of_first_write = std::cmp::min(time_of_first_write, candidate_first);
let candidate_last = chunk.time_of_last_write();
time_of_last_write = time_of_last_write
.map(|prev_last| prev_last.max(candidate_last))
.or(Some(candidate_last));
time_of_last_write = std::cmp::max(time_of_last_write, candidate_last);
delete_predicates.extend(chunk.delete_predicates().iter().cloned());
@ -236,11 +239,8 @@ fn mark_chunks_to_compact(
return ChunksNotContiguous {}.fail();
}
let time_of_first_write = time_of_first_write.expect("Should have had a first write somewhere");
let time_of_last_write = time_of_last_write.expect("Should have had a last write somewhere");
// drop partition lock
let _partition = partition.into_data().partition;
std::mem::drop(partition);
Ok(CompactingOsChunks {
time_of_first_write,
@ -311,46 +311,19 @@ struct CompactedStream {
}
/// Persist a provided stream to a new OS chunk
#[allow(clippy::too_many_arguments)]
async fn persist_stream_to_chunk<'a>(
db: &'a Db,
partition_addr: &'a PartitionAddr,
stream: SendableRecordBatchStream,
partition_checkpoint: PartitionCheckpoint,
database_checkpoint: DatabaseCheckpoint,
time_of_first_write: Time,
time_of_last_write: Time,
chunk_order: ChunkOrder,
iox_metadata: IoxMetadata,
) -> Result<Arc<ParquetChunk>> {
// Todo: ask Marco if this cleanup_lock is needed
// fetch shared (= read) guard preventing the cleanup job from deleting our files
let _guard = db.cleanup_lock.read().await;
// Create a new chunk for this stream data
//let table_name = Arc::from("cpu");
let table_name = Arc::from(partition_addr.table_name.to_string());
let partition_key = Arc::from(partition_addr.partition_key.to_string());
let chunk_id = ChunkId::new();
let metadata = IoxMetadata {
creation_timestamp: db.time_provider.now(),
table_name,
partition_key,
chunk_id,
partition_checkpoint: partition_checkpoint.clone(),
database_checkpoint: database_checkpoint.clone(),
time_of_first_write,
time_of_last_write,
chunk_order,
};
// Create a storage to save data of this chunk
let storage = Storage::new(Arc::clone(&db.iox_object_store));
// Write the chunk stream data into a parquet file in the storage
let chunk_addr = ChunkAddr::new(partition_addr, chunk_id);
let chunk_addr = ChunkAddr::new(partition_addr, iox_metadata.chunk_id);
let (path, file_size_bytes, parquet_metadata) = storage
.write_to_object_store(chunk_addr, stream, metadata)
.write_to_object_store(chunk_addr, stream, iox_metadata)
.await
.context(WritingToObjectStore)?;
@ -378,45 +351,19 @@ async fn persist_stream_to_chunk<'a>(
#[cfg(test)]
mod tests {
use super::*;
use crate::{db::test_helpers::write_lp, utils::TestDb, Db};
use data_types::database_rules::LifecycleRules;
use crate::{db::test_helpers::write_lp, utils::make_db};
use lifecycle::{LockableChunk, LockablePartition};
use query::{QueryChunk, QueryDatabase};
use std::{
num::{NonZeroU32, NonZeroU64},
time::Duration,
};
// Todo: this as copied from persist.rs and should be revived to match the needs here
async fn test_db() -> (Arc<Db>, Arc<time::MockProvider>) {
let time_provider = Arc::new(time::MockProvider::new(Time::from_timestamp(3409, 45)));
let test_db = TestDb::builder()
.lifecycle_rules(LifecycleRules {
late_arrive_window_seconds: NonZeroU32::new(1).unwrap(),
worker_backoff_millis: NonZeroU64::new(u64::MAX).unwrap(),
..Default::default()
})
.time_provider(Arc::<time::MockProvider>::clone(&time_provider))
.build()
.await;
(test_db.db, time_provider)
}
use query::QueryChunk;
#[tokio::test]
async fn test_compact_os_no_chunks() {
test_helpers::maybe_start_logging();
let (db, time) = test_db().await;
let late_arrival = Duration::from_secs(1);
write_lp(db.as_ref(), "cpu,tag1=cupcakes bar=1 10").await;
time.inc(late_arrival);
let partition_keys = db.partition_keys().unwrap();
assert_eq!(partition_keys.len(), 1);
let db_partition = db.partition("cpu", &partition_keys[0]).unwrap();
let db = make_db().await.db;
let partition_key = "1970-01-01T00";
write_lp(&db, "cpu,tag1=cupcakes bar=1 10").await;
let db_partition = db.partition("cpu", partition_key).unwrap();
let partition = LockableCatalogPartition::new(Arc::clone(&db), Arc::clone(&db_partition));
let partition = partition.write();
@ -436,14 +383,11 @@ mod tests {
async fn test_compact_os_non_os_chunks() {
test_helpers::maybe_start_logging();
let (db, time) = test_db().await;
let late_arrival = Duration::from_secs(1);
write_lp(db.as_ref(), "cpu,tag1=cupcakes bar=1 10").await;
time.inc(late_arrival);
let db = make_db().await.db;
let partition_key = "1970-01-01T00";
write_lp(&db, "cpu,tag1=cupcakes bar=1 10").await;
let partition_keys = db.partition_keys().unwrap();
assert_eq!(partition_keys.len(), 1);
let db_partition = db.partition("cpu", &partition_keys[0]).unwrap();
let db_partition = db.partition("cpu", partition_key).unwrap();
// persisted non persisted chunks
let partition = LockableCatalogPartition::new(Arc::clone(&db), Arc::clone(&db_partition));
@ -468,17 +412,14 @@ mod tests {
async fn test_compact_os_non_contiguous_chunks() {
test_helpers::maybe_start_logging();
let (db, time) = test_db().await;
let late_arrival = Duration::from_secs(1);
write_lp(db.as_ref(), "cpu,tag1=cupcakes bar=1 10").await;
time.inc(late_arrival);
let db = make_db().await.db;
let partition_key = "1970-01-01T00";
write_lp(&db, "cpu,tag1=cupcakes bar=1 10").await;
let partition_keys = db.partition_keys().unwrap();
assert_eq!(partition_keys.len(), 1);
let db_partition = db.partition("cpu", &partition_keys[0]).unwrap();
let db_partition = db.partition("cpu", partition_key).unwrap();
// persist chunk 1
db.persist_partition("cpu", partition_keys[0].as_str(), true)
db.persist_partition("cpu", partition_key, true)
.await
.unwrap()
.unwrap()
@ -486,7 +427,7 @@ mod tests {
//
// persist chunk 2
write_lp(db.as_ref(), "cpu,tag1=chunk2,tag2=a bar=2 10").await;
db.persist_partition("cpu", partition_keys[0].as_str(), true)
db.persist_partition("cpu", partition_key, true)
.await
.unwrap()
.unwrap()
@ -494,7 +435,7 @@ mod tests {
//
// persist chunk 3
write_lp(db.as_ref(), "cpu,tag1=chunk3,tag2=a bar=2 30").await;
db.persist_partition("cpu", partition_keys[0].as_str(), true)
db.persist_partition("cpu", partition_key, true)
.await
.unwrap()
.unwrap()
@ -502,8 +443,6 @@ mod tests {
//
// Add a MUB
write_lp(db.as_ref(), "cpu,tag1=chunk4,tag2=a bar=2 40").await;
// todo: Need to ask Marco why there is no handle created here
time.inc(Duration::from_secs(40));
// let compact 2 non contiguous chunk 1 and chunk 3
let partition = LockableCatalogPartition::new(Arc::clone(&db), Arc::clone(&db_partition));