Merge pull request #8645 from influxdata/dom/reuse-sort-key

refactor(persist): load sort key once
pull/24376/head
Dom 2023-09-01 14:11:08 +01:00 committed by GitHub
commit 0f138dc2a5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 50 additions and 55 deletions

View File

@ -48,10 +48,10 @@ impl std::fmt::Debug for CompactedStream {
/// data to compact, returning an updated sort key, if any.
pub(super) async fn compact_persisting_batch(
executor: &Executor,
sort_key: Option<SortKey>,
sort_key: Option<&SortKey>,
table_name: TableName,
batch: QueryAdaptor,
) -> Result<CompactedStream, ()> {
) -> CompactedStream {
assert!(!batch.record_batches().is_empty());
// Get sort key from the catalog or compute it from
@ -64,13 +64,13 @@ pub(super) async fn compact_persisting_batch(
//
// If there are any new columns, add them to the end of the sort key in the catalog and
// return that to be updated in the catalog.
adjust_sort_key_columns(&sk, &batch.schema().primary_key())
adjust_sort_key_columns(sk, &batch.schema().primary_key())
}
None => {
let sort_key = compute_sort_key(batch.schema(), batch.record_batches().iter());
// Use the sort key computed from the cardinality as the sort key for this parquet
// file's metadata, also return the sort key to be stored in the catalog
(sort_key.clone(), Some(sort_key))
(sort_key.clone(), Some(sort_key.clone()))
}
};
@ -93,11 +93,11 @@ pub(super) async fn compact_persisting_batch(
// Execute the plan and return the compacted stream
let output_stream = ctx.execute_stream(physical_plan).await.unwrap();
Ok(CompactedStream {
CompactedStream {
stream: output_stream,
catalog_sort_key_update,
data_sort_key,
})
}
}
#[cfg(test)]
@ -135,9 +135,8 @@ mod tests {
// compact
let exc = Executor::new_testing();
let CompactedStream { stream, .. } =
compact_persisting_batch(&exc, Some(SortKey::empty()), "test_table".into(), batch)
.await
.unwrap();
compact_persisting_batch(&exc, Some(&SortKey::empty()), "test_table".into(), batch)
.await;
let output_batches = datafusion::physical_plan::common::collect(stream)
.await
@ -175,9 +174,8 @@ mod tests {
stream,
data_sort_key,
catalog_sort_key_update,
} = compact_persisting_batch(&exc, Some(SortKey::empty()), "test_table".into(), batch)
.await
.unwrap();
} = compact_persisting_batch(&exc, Some(&SortKey::empty()), "test_table".into(), batch)
.await;
let output_batches = datafusion::physical_plan::common::collect(stream)
.await
@ -225,9 +223,8 @@ mod tests {
stream,
data_sort_key,
catalog_sort_key_update,
} = compact_persisting_batch(&exc, Some(SortKey::empty()), "test_table".into(), batch)
.await
.unwrap();
} = compact_persisting_batch(&exc, Some(&SortKey::empty()), "test_table".into(), batch)
.await;
let output_batches = datafusion::physical_plan::common::collect(stream)
.await
@ -282,12 +279,11 @@ mod tests {
catalog_sort_key_update,
} = compact_persisting_batch(
&exc,
Some(SortKey::from_columns(["tag3", "tag1", "time"])),
Some(&SortKey::from_columns(["tag3", "tag1", "time"])),
"test_table".into(),
batch,
)
.await
.unwrap();
.await;
let output_batches = datafusion::physical_plan::common::collect(stream)
.await
@ -342,12 +338,11 @@ mod tests {
catalog_sort_key_update,
} = compact_persisting_batch(
&exc,
Some(SortKey::from_columns(["tag3", "time"])),
Some(&SortKey::from_columns(["tag3", "time"])),
"test_table".into(),
batch,
)
.await
.unwrap();
.await;
let output_batches = datafusion::physical_plan::common::collect(stream)
.await
@ -405,12 +400,11 @@ mod tests {
catalog_sort_key_update,
} = compact_persisting_batch(
&exc,
Some(SortKey::from_columns(["tag3", "tag1", "tag4", "time"])),
Some(&SortKey::from_columns(["tag3", "tag1", "tag4", "time"])),
"test_table".into(),
batch,
)
.await
.unwrap();
.await;
let output_batches = datafusion::physical_plan::common::collect(stream)
.await
@ -461,9 +455,8 @@ mod tests {
// compact
let exc = Executor::new_testing();
let stream = compact_persisting_batch(&exc, Some(sort_key), "test_table".into(), batch)
.await
.unwrap();
let stream =
compact_persisting_batch(&exc, Some(&sort_key), "test_table".into(), batch).await;
let output_batches = datafusion::physical_plan::common::collect(stream.stream)
.await
.unwrap();
@ -501,9 +494,8 @@ mod tests {
// compact
let exc = Executor::new_testing();
let stream = compact_persisting_batch(&exc, Some(sort_key), "test_table".into(), batch)
.await
.unwrap();
let stream =
compact_persisting_batch(&exc, Some(&sort_key), "test_table".into(), batch).await;
let output_batches = datafusion::physical_plan::common::collect(stream.stream)
.await
.unwrap();
@ -549,9 +541,8 @@ mod tests {
// compact
let exc = Executor::new_testing();
let stream = compact_persisting_batch(&exc, Some(sort_key), "test_table".into(), batch)
let stream = compact_persisting_batch(&exc, Some(&sort_key), "test_table".into(), batch)
.await
.unwrap()
.stream;
let output_batches = datafusion::physical_plan::common::collect(stream)
.await
@ -595,9 +586,8 @@ mod tests {
// compact
let exc = Executor::new_testing();
let stream = compact_persisting_batch(&exc, Some(sort_key), "test_table".into(), batch)
let stream = compact_persisting_batch(&exc, Some(&sort_key), "test_table".into(), batch)
.await
.unwrap()
.stream;
let output_batches = datafusion::physical_plan::common::collect(stream)
.await
@ -645,9 +635,8 @@ mod tests {
// compact
let exc = Executor::new_testing();
let stream = compact_persisting_batch(&exc, Some(sort_key), "test_table".into(), batch)
let stream = compact_persisting_batch(&exc, Some(&sort_key), "test_table".into(), batch)
.await
.unwrap()
.stream;
let output_batches = datafusion::physical_plan::common::collect(stream)
.await

View File

@ -169,23 +169,32 @@ async fn compact_and_upload<O>(
where
O: Send + Sync,
{
// load sort key
// Read the partition sort key from the catalog.
//
// Sort keys may be updated by any ingester at any time, and updates to the
// sort key MUST be serialised.
let sort_key = ctx.sort_key().get().await;
// fetch column map
// THIS MUST BE DONE AFTER THE SORT KEY IS LOADED
let (sort_key, columns) = fetch_column_map(ctx, worker_state, sort_key).await?;
let compacted = compact(ctx, worker_state, sort_key).await;
// Fetch the "column name -> column ID" map.
//
// This MUST happen after the sort key has loaded, to ensure all the columns
// defined in the sort key are present in the map. If the values were
// fetched in reverse order, a race exists where the sort key could be
// updated to include a column that does not exist in the column map.
let column_map = fetch_column_map(ctx, worker_state, sort_key.as_ref()).await?;
let compacted = compact(ctx, worker_state, sort_key.as_ref()).await;
let (sort_key_update, parquet_table_data) =
upload(ctx, worker_state, compacted, &columns).await;
upload(ctx, worker_state, compacted, &column_map).await;
if let Some(update) = sort_key_update {
if let Some(sort_key_update) = sort_key_update {
update_catalog_sort_key(
ctx,
worker_state,
update,
sort_key, // Old sort key prior to this persist job
sort_key_update, // New sort key updated by this persist job
parquet_table_data.object_store_id,
&columns,
&column_map,
)
.await?
}
@ -198,7 +207,7 @@ where
async fn compact<O>(
ctx: &Context,
worker_state: &SharedWorkerState<O>,
sort_key: Option<SortKey>,
sort_key: Option<&SortKey>,
) -> CompactedStream
where
O: Send + Sync,
@ -227,7 +236,6 @@ where
ctx.data().query_adaptor(),
)
.await
.expect("unable to compact persisting batch")
}
/// Upload the compacted data in `compacted`, returning the new sort key value
@ -326,8 +334,8 @@ async fn fetch_column_map<O>(
// THIS IS A MUST TO GUARANTEE THE RETURNED COLUMN MAP CONTAINS ALL COLUMNS IN THE SORT KEY
// The purpose to put the sort_key as a param here is to make sure the caller has already loaded the sort key
// and the same sort_key is returned
sort_key: Option<SortKey>,
) -> Result<(Option<SortKey>, ColumnsByName), PersistError>
sort_key: Option<&SortKey>,
) -> Result<ColumnsByName, PersistError>
where
O: Send + Sync,
{
@ -354,7 +362,7 @@ where
}
}
Ok((sort_key, column_map))
Ok(column_map)
}
/// Update the sort key value stored in the catalog for this [`Context`].
@ -367,6 +375,7 @@ where
async fn update_catalog_sort_key<O>(
ctx: &mut Context,
worker_state: &SharedWorkerState<O>,
old_sort_key: Option<SortKey>,
new_sort_key: SortKey,
object_store_id: Uuid,
columns: &ColumnsByName,
@ -374,11 +383,8 @@ async fn update_catalog_sort_key<O>(
where
O: Send + Sync,
{
let old_sort_key = ctx
.sort_key()
.get()
.await
.map(|v| v.to_columns().map(|v| v.to_string()).collect::<Vec<_>>());
let old_sort_key =
old_sort_key.map(|v| v.to_columns().map(|v| v.to_string()).collect::<Vec<_>>());
debug!(
%object_store_id,