feat: Add a way to set the row count on Parquet file catalog entries

And only allow setting this when no record batch or line protocol is
specified so that there isn't a way to create a parquet file with data
that has a mismatched row count.
pull/24376/head
Carol (Nichols || Goulding) 2022-08-31 14:36:40 -04:00
parent c21ac9050b
commit a9d664d0bf
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
1 changed files with 22 additions and 1 deletions

View File

@ -488,6 +488,7 @@ impl TestPartition {
compaction_level,
to_delete,
object_store_id,
row_count,
} = builder;
let record_batch = record_batch.expect("A record batch is required");
@ -499,6 +500,10 @@ impl TestPartition {
"Table name of line protocol and partition should have matched",
);
assert!(
row_count.is_none(),
"Cannot have both a record batch and a manually set row_count!"
);
let row_count = record_batch.num_rows();
assert!(row_count > 0, "Parquet file must have at least 1 row");
let (record_batch, sort_key) = sort_batch(record_batch, schema.clone());
@ -537,6 +542,7 @@ impl TestPartition {
compaction_level,
to_delete,
object_store_id,
row_count: None, // will be computed from the record batch again
};
let result = self.create_parquet_file_catalog_record(builder).await;
@ -561,6 +567,7 @@ impl TestPartition {
compaction_level,
to_delete,
object_store_id,
row_count,
..
} = builder;
@ -575,11 +582,16 @@ impl TestPartition {
.id
}));
assert!(
row_count.is_none(),
"Cannot have both a record batch and a manually set row_count!"
);
(record_batch.num_rows(), column_set)
} else {
let column_set =
ColumnSet::new(table_catalog_schema.columns.values().map(|col| col.id));
(0, column_set)
(row_count.unwrap_or(0), column_set)
};
let parquet_file_params = ParquetFileParams {
@ -638,6 +650,7 @@ pub struct TestParquetFileBuilder {
compaction_level: CompactionLevel,
to_delete: bool,
object_store_id: Uuid,
row_count: Option<usize>,
}
impl Default for TestParquetFileBuilder {
@ -654,6 +667,7 @@ impl Default for TestParquetFileBuilder {
compaction_level: CompactionLevel::Initial,
to_delete: false,
object_store_id: Uuid::new_v4(),
row_count: None,
}
}
}
@ -727,6 +741,13 @@ impl TestParquetFileBuilder {
self.to_delete = to_delete;
self
}
/// Specify the number of rows in this parquet file. If line protocol/record batch are also
/// set, this will panic! Only use this when you're not specifying any rows!
pub fn with_row_count(mut self, row_count: usize) -> Self {
self.row_count = Some(row_count);
self
}
}
async fn update_catalog_sort_key_if_needed(