feat: Store metadata in catalog, but don't fetch by default

pull/24376/head
Carol (Nichols || Goulding) 2022-04-04 16:53:05 -04:00
parent 7ddbf7c025
commit ee56ebf0e3
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
4 changed files with 41 additions and 18 deletions

View File

@ -720,7 +720,7 @@ pub fn tombstones_to_delete_predicates_iter(
}
/// Data for a parquet file reference that has been inserted in the catalog.
#[derive(Debug, Clone, PartialEq, sqlx::FromRow)]
#[derive(Debug, Clone, Copy, PartialEq, sqlx::FromRow)]
pub struct ParquetFile {
/// the id of the file in the catalog
pub id: ParquetFileId,
@ -746,8 +746,6 @@ pub struct ParquetFile {
pub to_delete: Option<Timestamp>,
/// file size in bytes
pub file_size_bytes: i64,
/// thrift-encoded parquet metadata
pub parquet_metadata: Vec<u8>,
/// the number of rows of data in this file
pub row_count: i64,
/// the compaction level of the file

View File

@ -1770,13 +1770,13 @@ pub(crate) mod test_helpers {
.list_by_sequencer_greater_than(sequencer.id, SequenceNumber::new(1))
.await
.unwrap();
assert_eq!(vec![parquet_file.clone(), other_file.clone()], files);
assert_eq!(vec![parquet_file, other_file], files);
let files = repos
.parquet_files()
.list_by_sequencer_greater_than(sequencer.id, SequenceNumber::new(150))
.await
.unwrap();
assert_eq!(vec![other_file.clone()], files);
assert_eq!(vec![other_file], files);
// verify that to_delete is initially set to null and the file does not get deleted
assert!(parquet_file.to_delete.is_none());
@ -1890,7 +1890,7 @@ pub(crate) mod test_helpers {
.list_by_namespace_not_to_delete(namespace2.id)
.await
.unwrap();
assert_eq!(vec![f1.clone(), f2.clone()], files);
assert_eq!(vec![f1, f2], files);
let f3_params = ParquetFileParams {
object_store_id: Uuid::new_v4(),
@ -1906,7 +1906,7 @@ pub(crate) mod test_helpers {
.list_by_namespace_not_to_delete(namespace2.id)
.await
.unwrap();
assert_eq!(vec![f1.clone(), f2.clone(), f3.clone()], files);
assert_eq!(vec![f1, f2, f3], files);
repos.parquet_files().flag_for_delete(f2.id).await.unwrap();
let files = repos
@ -1914,7 +1914,7 @@ pub(crate) mod test_helpers {
.list_by_namespace_not_to_delete(namespace2.id)
.await
.unwrap();
assert_eq!(vec![f1.clone(), f3.clone()], files);
assert_eq!(vec![f1, f3], files);
let files = repos
.parquet_files()
@ -2519,7 +2519,7 @@ pub(crate) mod test_helpers {
let nonexistent_parquet_file_id = ParquetFileId::new(level_0_file.id.get() + 1);
// Level 0 parquet files should contain both existing files at this point
let expected = vec![parquet_file.clone(), level_0_file.clone()];
let expected = vec![parquet_file, level_0_file];
let level_0 = repos.parquet_files().level_0(sequencer.id).await.unwrap();
let mut level_0_ids: Vec<_> = level_0.iter().map(|pf| pf.id).collect();
level_0_ids.sort();

View File

@ -961,12 +961,11 @@ impl ParquetFileRepo for MemTxn {
row_count,
to_delete: None,
file_size_bytes,
parquet_metadata,
compaction_level,
created_at,
};
stage.parquet_files.push(parquet_file);
Ok(stage.parquet_files.last().unwrap().clone())
Ok(*stage.parquet_files.last().unwrap())
}
async fn flag_for_delete(&mut self, id: ParquetFileId) -> Result<()> {

View File

@ -1502,9 +1502,13 @@ RETURNING *;
sequencer_id: SequencerId,
sequence_number: SequenceNumber,
) -> Result<Vec<ParquetFile>> {
// Deliberately doesn't use `SELECT *` to avoid the performance hit of fetching the large
// `parquet_metadata` column!!
sqlx::query_as::<_, ParquetFile>(
r#"
SELECT *
SELECT id, sequencer_id, namespace_id, table_id, partition_id, object_store_id,
min_sequence_number, max_sequence_number, min_time, max_time, to_delete, file_size_bytes,
row_count, compaction_level, created_at
FROM parquet_file
WHERE sequencer_id = $1
AND max_sequence_number > $2
@ -1522,9 +1526,15 @@ ORDER BY id;
&mut self,
namespace_id: NamespaceId,
) -> Result<Vec<ParquetFile>> {
// Deliberately doesn't use `SELECT *` to avoid the performance hit of fetching the large
// `parquet_metadata` column!!
sqlx::query_as::<_, ParquetFile>(
r#"
SELECT parquet_file.*
SELECT parquet_file.id, parquet_file.sequencer_id, parquet_file.namespace_id,
parquet_file.table_id, parquet_file.partition_id, parquet_file.object_store_id,
parquet_file.min_sequence_number, parquet_file.max_sequence_number, parquet_file.min_time,
parquet_file.max_time, parquet_file.to_delete, parquet_file.file_size_bytes,
parquet_file.row_count, parquet_file.compaction_level, parquet_file.created_at
FROM parquet_file
INNER JOIN table_name on table_name.id = parquet_file.table_id
WHERE table_name.namespace_id = $1
@ -1538,9 +1548,13 @@ WHERE table_name.namespace_id = $1
}
async fn list_by_table_not_to_delete(&mut self, table_id: TableId) -> Result<Vec<ParquetFile>> {
// Deliberately doesn't use `SELECT *` to avoid the performance hit of fetching the large
// `parquet_metadata` column!!
sqlx::query_as::<_, ParquetFile>(
r#"
SELECT *
SELECT id, sequencer_id, namespace_id, table_id, partition_id, object_store_id,
min_sequence_number, max_sequence_number, min_time, max_time, to_delete, file_size_bytes,
row_count, compaction_level, created_at
FROM parquet_file
WHERE table_id = $1 AND to_delete IS NULL;
"#,
@ -1568,10 +1582,14 @@ RETURNING *;
async fn level_0(&mut self, sequencer_id: SequencerId) -> Result<Vec<ParquetFile>> {
// this intentionally limits the returned files to 10,000 as it is used to make
// a decision on the highest priority partitions. If compaction has never been
// run this could end up returning millions of results and taking too long to run
// run this could end up returning millions of results and taking too long to run.
// Deliberately doesn't use `SELECT *` to avoid the performance hit of fetching the large
// `parquet_metadata` column!!
sqlx::query_as::<_, ParquetFile>(
r#"
SELECT *
SELECT id, sequencer_id, namespace_id, table_id, partition_id, object_store_id,
min_sequence_number, max_sequence_number, min_time, max_time, to_delete, file_size_bytes,
row_count, compaction_level, created_at
FROM parquet_file
WHERE parquet_file.sequencer_id = $1
AND parquet_file.compaction_level = 0
@ -1591,9 +1609,13 @@ WHERE parquet_file.sequencer_id = $1
min_time: Timestamp,
max_time: Timestamp,
) -> Result<Vec<ParquetFile>> {
// Deliberately doesn't use `SELECT *` to avoid the performance hit of fetching the large
// `parquet_metadata` column!!
sqlx::query_as::<_, ParquetFile>(
r#"
SELECT *
SELECT id, sequencer_id, namespace_id, table_id, partition_id, object_store_id,
min_sequence_number, max_sequence_number, min_time, max_time, to_delete, file_size_bytes,
row_count, compaction_level, created_at
FROM parquet_file
WHERE parquet_file.sequencer_id = $1
AND parquet_file.table_id = $2
@ -1618,9 +1640,13 @@ WHERE parquet_file.sequencer_id = $1
&mut self,
partition_id: PartitionId,
) -> Result<Vec<ParquetFile>> {
// Deliberately doesn't use `SELECT *` to avoid the performance hit of fetching the large
// `parquet_metadata` column!!
sqlx::query_as::<_, ParquetFile>(
r#"
SELECT *
SELECT id, sequencer_id, namespace_id, table_id, partition_id, object_store_id,
min_sequence_number, max_sequence_number, min_time, max_time, to_delete, file_size_bytes,
row_count, compaction_level, created_at
FROM parquet_file
WHERE parquet_file.partition_id = $1
AND parquet_file.to_delete IS NULL;