From 8853296bef74c14158fd83d426f590549689d068 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Fri, 1 Sep 2023 14:15:11 +0200 Subject: [PATCH 1/2] refactor(persist): load sort key once Passes the already-loaded sort key into update_catalog_sort_key() This makes the function inputs clear, and avoids loading the same key twice (which should be a no-op now, but not in the future!). --- ingester/src/persist/compact.rs | 28 ++++++++++----------- ingester/src/persist/worker.rs | 43 +++++++++++++++++++-------------- 2 files changed, 39 insertions(+), 32 deletions(-) diff --git a/ingester/src/persist/compact.rs b/ingester/src/persist/compact.rs index 849b762cdc..596d54241a 100644 --- a/ingester/src/persist/compact.rs +++ b/ingester/src/persist/compact.rs @@ -48,7 +48,7 @@ impl std::fmt::Debug for CompactedStream { /// data to compact, returning an updated sort key, if any. pub(super) async fn compact_persisting_batch( executor: &Executor, - sort_key: Option, + sort_key: Option<&SortKey>, table_name: TableName, batch: QueryAdaptor, ) -> Result { @@ -64,13 +64,13 @@ pub(super) async fn compact_persisting_batch( // // If there are any new columns, add them to the end of the sort key in the catalog and // return that to be updated in the catalog. - adjust_sort_key_columns(&sk, &batch.schema().primary_key()) + adjust_sort_key_columns(sk, &batch.schema().primary_key()) } None => { let sort_key = compute_sort_key(batch.schema(), batch.record_batches().iter()); // Use the sort key computed from the cardinality as the sort key for this parquet // file's metadata, also return the sort key to be stored in the catalog - (sort_key.clone(), Some(sort_key)) + (sort_key.clone(), Some(sort_key.clone())) } }; @@ -135,7 +135,7 @@ mod tests { // compact let exc = Executor::new_testing(); let CompactedStream { stream, .. } = - compact_persisting_batch(&exc, Some(SortKey::empty()), "test_table".into(), batch) + compact_persisting_batch(&exc, Some(&SortKey::empty()), "test_table".into(), batch) .await .unwrap(); @@ -175,7 +175,7 @@ mod tests { stream, data_sort_key, catalog_sort_key_update, - } = compact_persisting_batch(&exc, Some(SortKey::empty()), "test_table".into(), batch) + } = compact_persisting_batch(&exc, Some(&SortKey::empty()), "test_table".into(), batch) .await .unwrap(); @@ -225,7 +225,7 @@ mod tests { stream, data_sort_key, catalog_sort_key_update, - } = compact_persisting_batch(&exc, Some(SortKey::empty()), "test_table".into(), batch) + } = compact_persisting_batch(&exc, Some(&SortKey::empty()), "test_table".into(), batch) .await .unwrap(); @@ -282,7 +282,7 @@ mod tests { catalog_sort_key_update, } = compact_persisting_batch( &exc, - Some(SortKey::from_columns(["tag3", "tag1", "time"])), + Some(&SortKey::from_columns(["tag3", "tag1", "time"])), "test_table".into(), batch, ) @@ -342,7 +342,7 @@ mod tests { catalog_sort_key_update, } = compact_persisting_batch( &exc, - Some(SortKey::from_columns(["tag3", "time"])), + Some(&SortKey::from_columns(["tag3", "time"])), "test_table".into(), batch, ) @@ -405,7 +405,7 @@ mod tests { catalog_sort_key_update, } = compact_persisting_batch( &exc, - Some(SortKey::from_columns(["tag3", "tag1", "tag4", "time"])), + Some(&SortKey::from_columns(["tag3", "tag1", "tag4", "time"])), "test_table".into(), batch, ) @@ -461,7 +461,7 @@ mod tests { // compact let exc = Executor::new_testing(); - let stream = compact_persisting_batch(&exc, Some(sort_key), "test_table".into(), batch) + let stream = compact_persisting_batch(&exc, Some(&sort_key), "test_table".into(), batch) .await .unwrap(); let output_batches = datafusion::physical_plan::common::collect(stream.stream) @@ -501,7 +501,7 @@ mod tests { // compact let exc = Executor::new_testing(); - let stream = compact_persisting_batch(&exc, Some(sort_key), "test_table".into(), batch) + let stream = compact_persisting_batch(&exc, Some(&sort_key), "test_table".into(), batch) .await .unwrap(); let output_batches = datafusion::physical_plan::common::collect(stream.stream) @@ -549,7 +549,7 @@ mod tests { // compact let exc = Executor::new_testing(); - let stream = compact_persisting_batch(&exc, Some(sort_key), "test_table".into(), batch) + let stream = compact_persisting_batch(&exc, Some(&sort_key), "test_table".into(), batch) .await .unwrap() .stream; @@ -595,7 +595,7 @@ mod tests { // compact let exc = Executor::new_testing(); - let stream = compact_persisting_batch(&exc, Some(sort_key), "test_table".into(), batch) + let stream = compact_persisting_batch(&exc, Some(&sort_key), "test_table".into(), batch) .await .unwrap() .stream; @@ -645,7 +645,7 @@ mod tests { // compact let exc = Executor::new_testing(); - let stream = compact_persisting_batch(&exc, Some(sort_key), "test_table".into(), batch) + let stream = compact_persisting_batch(&exc, Some(&sort_key), "test_table".into(), batch) .await .unwrap() .stream; diff --git a/ingester/src/persist/worker.rs b/ingester/src/persist/worker.rs index 0b9ecd16f6..c99e6075b8 100644 --- a/ingester/src/persist/worker.rs +++ b/ingester/src/persist/worker.rs @@ -169,23 +169,32 @@ async fn compact_and_upload( where O: Send + Sync, { - // load sort key + // Read the partition sort key from the catalog. + // + // Sort keys may be updated by any ingester at any time, and updates to the + // sort key MUST be serialised. let sort_key = ctx.sort_key().get().await; - // fetch column map - // THIS MUST BE DONE AFTER THE SORT KEY IS LOADED - let (sort_key, columns) = fetch_column_map(ctx, worker_state, sort_key).await?; - let compacted = compact(ctx, worker_state, sort_key).await; + // Fetch the "column name -> column ID" map. + // + // This MUST happen after the sort key has loaded, to ensure all the columns + // defined in the sort key are present in the map. If the values were + // fetched in reverse order, a race exists where the sort key could be + // updated to include a column that does not exist in the column map. + let column_map = fetch_column_map(ctx, worker_state, sort_key.as_ref()).await?; + + let compacted = compact(ctx, worker_state, sort_key.as_ref()).await; let (sort_key_update, parquet_table_data) = - upload(ctx, worker_state, compacted, &columns).await; + upload(ctx, worker_state, compacted, &column_map).await; - if let Some(update) = sort_key_update { + if let Some(sort_key_update) = sort_key_update { update_catalog_sort_key( ctx, worker_state, - update, + sort_key, // Old sort key prior to this persist job + sort_key_update, // New sort key updated by this persist job parquet_table_data.object_store_id, - &columns, + &column_map, ) .await? } @@ -198,7 +207,7 @@ where async fn compact( ctx: &Context, worker_state: &SharedWorkerState, - sort_key: Option, + sort_key: Option<&SortKey>, ) -> CompactedStream where O: Send + Sync, @@ -326,8 +335,8 @@ async fn fetch_column_map( // THIS IS A MUST TO GUARANTEE THE RETURNED COLUMN MAP CONTAINS ALL COLUMNS IN THE SORT KEY // The purpose to put the sort_key as a param here is to make sure the caller has already loaded the sort key // and the same sort_key is returned - sort_key: Option, -) -> Result<(Option, ColumnsByName), PersistError> + sort_key: Option<&SortKey>, +) -> Result where O: Send + Sync, { @@ -354,7 +363,7 @@ where } } - Ok((sort_key, column_map)) + Ok(column_map) } /// Update the sort key value stored in the catalog for this [`Context`]. @@ -367,6 +376,7 @@ where async fn update_catalog_sort_key( ctx: &mut Context, worker_state: &SharedWorkerState, + old_sort_key: Option, new_sort_key: SortKey, object_store_id: Uuid, columns: &ColumnsByName, @@ -374,11 +384,8 @@ async fn update_catalog_sort_key( where O: Send + Sync, { - let old_sort_key = ctx - .sort_key() - .get() - .await - .map(|v| v.to_columns().map(|v| v.to_string()).collect::>()); + let old_sort_key = + old_sort_key.map(|v| v.to_columns().map(|v| v.to_string()).collect::>()); debug!( %object_store_id, From e5e12ebb1a319f89cfe295baf637a13951543333 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Fri, 1 Sep 2023 14:20:06 +0200 Subject: [PATCH 2/2] refactor: remove Result from infallible return The compact_persisting_batch() call is infallible, but prior to this commit, would return a Result with a () unit type as the error type. This is misleading - it's never going to return an error, so call sites checking for errors are misleading. --- ingester/src/persist/compact.rs | 37 ++++++++++++--------------------- ingester/src/persist/worker.rs | 1 - 2 files changed, 13 insertions(+), 25 deletions(-) diff --git a/ingester/src/persist/compact.rs b/ingester/src/persist/compact.rs index 596d54241a..f5d3bd2e9e 100644 --- a/ingester/src/persist/compact.rs +++ b/ingester/src/persist/compact.rs @@ -51,7 +51,7 @@ pub(super) async fn compact_persisting_batch( sort_key: Option<&SortKey>, table_name: TableName, batch: QueryAdaptor, -) -> Result { +) -> CompactedStream { assert!(!batch.record_batches().is_empty()); // Get sort key from the catalog or compute it from @@ -93,11 +93,11 @@ pub(super) async fn compact_persisting_batch( // Execute the plan and return the compacted stream let output_stream = ctx.execute_stream(physical_plan).await.unwrap(); - Ok(CompactedStream { + CompactedStream { stream: output_stream, catalog_sort_key_update, data_sort_key, - }) + } } #[cfg(test)] @@ -136,8 +136,7 @@ mod tests { let exc = Executor::new_testing(); let CompactedStream { stream, .. } = compact_persisting_batch(&exc, Some(&SortKey::empty()), "test_table".into(), batch) - .await - .unwrap(); + .await; let output_batches = datafusion::physical_plan::common::collect(stream) .await @@ -176,8 +175,7 @@ mod tests { data_sort_key, catalog_sort_key_update, } = compact_persisting_batch(&exc, Some(&SortKey::empty()), "test_table".into(), batch) - .await - .unwrap(); + .await; let output_batches = datafusion::physical_plan::common::collect(stream) .await @@ -226,8 +224,7 @@ mod tests { data_sort_key, catalog_sort_key_update, } = compact_persisting_batch(&exc, Some(&SortKey::empty()), "test_table".into(), batch) - .await - .unwrap(); + .await; let output_batches = datafusion::physical_plan::common::collect(stream) .await @@ -286,8 +283,7 @@ mod tests { "test_table".into(), batch, ) - .await - .unwrap(); + .await; let output_batches = datafusion::physical_plan::common::collect(stream) .await @@ -346,8 +342,7 @@ mod tests { "test_table".into(), batch, ) - .await - .unwrap(); + .await; let output_batches = datafusion::physical_plan::common::collect(stream) .await @@ -409,8 +404,7 @@ mod tests { "test_table".into(), batch, ) - .await - .unwrap(); + .await; let output_batches = datafusion::physical_plan::common::collect(stream) .await @@ -461,9 +455,8 @@ mod tests { // compact let exc = Executor::new_testing(); - let stream = compact_persisting_batch(&exc, Some(&sort_key), "test_table".into(), batch) - .await - .unwrap(); + let stream = + compact_persisting_batch(&exc, Some(&sort_key), "test_table".into(), batch).await; let output_batches = datafusion::physical_plan::common::collect(stream.stream) .await .unwrap(); @@ -501,9 +494,8 @@ mod tests { // compact let exc = Executor::new_testing(); - let stream = compact_persisting_batch(&exc, Some(&sort_key), "test_table".into(), batch) - .await - .unwrap(); + let stream = + compact_persisting_batch(&exc, Some(&sort_key), "test_table".into(), batch).await; let output_batches = datafusion::physical_plan::common::collect(stream.stream) .await .unwrap(); @@ -551,7 +543,6 @@ mod tests { let exc = Executor::new_testing(); let stream = compact_persisting_batch(&exc, Some(&sort_key), "test_table".into(), batch) .await - .unwrap() .stream; let output_batches = datafusion::physical_plan::common::collect(stream) .await @@ -597,7 +588,6 @@ mod tests { let exc = Executor::new_testing(); let stream = compact_persisting_batch(&exc, Some(&sort_key), "test_table".into(), batch) .await - .unwrap() .stream; let output_batches = datafusion::physical_plan::common::collect(stream) .await @@ -647,7 +637,6 @@ mod tests { let exc = Executor::new_testing(); let stream = compact_persisting_batch(&exc, Some(&sort_key), "test_table".into(), batch) .await - .unwrap() .stream; let output_batches = datafusion::physical_plan::common::collect(stream) .await diff --git a/ingester/src/persist/worker.rs b/ingester/src/persist/worker.rs index c99e6075b8..946a324599 100644 --- a/ingester/src/persist/worker.rs +++ b/ingester/src/persist/worker.rs @@ -236,7 +236,6 @@ where ctx.data().query_adaptor(), ) .await - .expect("unable to compact persisting batch") } /// Upload the compacted data in `compacted`, returning the new sort key value