refactor: defer querying for table schema

Do not query for the table schema until it is needed.
pull/24376/head
Dom Dwyer 2022-10-13 14:56:35 +02:00
parent e556677192
commit 920f7edf75
1 changed files with 12 additions and 10 deletions

View File

@ -322,16 +322,6 @@ impl Persister for IngesterData {
assert_eq!(batch.table_id(), table_id);
assert_eq!(batch.partition_id(), partition_id);
// lookup column IDs from catalog
// TODO: this can be removed once the ingester uses column IDs internally as well
let table_schema = Backoff::new(&self.backoff_config)
.retry_all_errors("get table schema", || async {
let mut repos = self.catalog.repositories().await;
get_table_schema_by_id(table_id, repos.as_mut()).await
})
.await
.expect("retry forever");
// Read the maximum SequenceNumber in the batch.
let (_min, max_sequence_number) = batch.data.min_max_sequence_numbers();
@ -407,6 +397,18 @@ impl Persister for IngesterData {
);
}
// Read the table schema from the catalog to act as a map of column name
// -> column IDs.
//
// TODO: this can be removed once the ingester uses column IDs
let table_schema = Backoff::new(&self.backoff_config)
.retry_all_errors("get table schema", || async {
let mut repos = self.catalog.repositories().await;
get_table_schema_by_id(table_id, repos.as_mut()).await
})
.await
.expect("retry forever");
// Add the parquet file to the catalog until succeed
let parquet_file = iox_metadata.to_parquet_file(partition_id, file_size, &md, |name| {
table_schema.columns.get(name).expect("Unknown column").id