refactor: converters for `ParquetFile`<>`ParquetFileParams` (#6637)
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>pull/24376/head
parent
df85c7b154
commit
5c462937ca
|
@ -1108,6 +1108,29 @@ pub struct ParquetFile {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ParquetFile {
|
impl ParquetFile {
|
||||||
|
/// Create new file from given parameters and ID.
|
||||||
|
///
|
||||||
|
/// [`to_delete`](Self::to_delete) will be set to `None`.
|
||||||
|
pub fn from_params(params: ParquetFileParams, id: ParquetFileId) -> Self {
|
||||||
|
Self {
|
||||||
|
id,
|
||||||
|
shard_id: params.shard_id,
|
||||||
|
namespace_id: params.namespace_id,
|
||||||
|
table_id: params.table_id,
|
||||||
|
partition_id: params.partition_id,
|
||||||
|
object_store_id: params.object_store_id,
|
||||||
|
max_sequence_number: params.max_sequence_number,
|
||||||
|
min_time: params.min_time,
|
||||||
|
max_time: params.max_time,
|
||||||
|
to_delete: None,
|
||||||
|
file_size_bytes: params.file_size_bytes,
|
||||||
|
row_count: params.row_count,
|
||||||
|
compaction_level: params.compaction_level,
|
||||||
|
created_at: params.created_at,
|
||||||
|
column_set: params.column_set,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Estimate the memory consumption of this object and its contents
|
/// Estimate the memory consumption of this object and its contents
|
||||||
pub fn size(&self) -> usize {
|
pub fn size(&self) -> usize {
|
||||||
std::mem::size_of_val(self) + self.column_set.size()
|
std::mem::size_of_val(self) + self.column_set.size()
|
||||||
|
@ -1146,6 +1169,26 @@ pub struct ParquetFileParams {
|
||||||
pub column_set: ColumnSet,
|
pub column_set: ColumnSet,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl From<ParquetFile> for ParquetFileParams {
|
||||||
|
fn from(value: ParquetFile) -> Self {
|
||||||
|
Self {
|
||||||
|
shard_id: value.shard_id,
|
||||||
|
namespace_id: value.namespace_id,
|
||||||
|
table_id: value.table_id,
|
||||||
|
partition_id: value.partition_id,
|
||||||
|
object_store_id: value.object_store_id,
|
||||||
|
max_sequence_number: value.max_sequence_number,
|
||||||
|
min_time: value.min_time,
|
||||||
|
max_time: value.max_time,
|
||||||
|
file_size_bytes: value.file_size_bytes,
|
||||||
|
row_count: value.row_count,
|
||||||
|
compaction_level: value.compaction_level,
|
||||||
|
created_at: value.created_at,
|
||||||
|
column_set: value.column_set,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Data for a processed tombstone reference in the catalog.
|
/// Data for a processed tombstone reference in the catalog.
|
||||||
#[derive(Debug, Copy, Clone, PartialEq, Eq, sqlx::FromRow)]
|
#[derive(Debug, Copy, Clone, PartialEq, Eq, sqlx::FromRow)]
|
||||||
pub struct ProcessedTombstone {
|
pub struct ProcessedTombstone {
|
||||||
|
|
|
@ -1167,48 +1167,23 @@ impl ParquetFileRepo for MemTxn {
|
||||||
async fn create(&mut self, parquet_file_params: ParquetFileParams) -> Result<ParquetFile> {
|
async fn create(&mut self, parquet_file_params: ParquetFileParams) -> Result<ParquetFile> {
|
||||||
let stage = self.stage();
|
let stage = self.stage();
|
||||||
|
|
||||||
let ParquetFileParams {
|
|
||||||
shard_id,
|
|
||||||
namespace_id,
|
|
||||||
table_id,
|
|
||||||
partition_id,
|
|
||||||
object_store_id,
|
|
||||||
max_sequence_number,
|
|
||||||
min_time,
|
|
||||||
max_time,
|
|
||||||
file_size_bytes,
|
|
||||||
row_count,
|
|
||||||
compaction_level,
|
|
||||||
created_at,
|
|
||||||
column_set,
|
|
||||||
} = parquet_file_params;
|
|
||||||
|
|
||||||
if stage
|
if stage
|
||||||
.parquet_files
|
.parquet_files
|
||||||
.iter()
|
.iter()
|
||||||
.any(|f| f.object_store_id == object_store_id)
|
.any(|f| f.object_store_id == parquet_file_params.object_store_id)
|
||||||
{
|
{
|
||||||
return Err(Error::FileExists { object_store_id });
|
return Err(Error::FileExists {
|
||||||
|
object_store_id: parquet_file_params.object_store_id,
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
let parquet_file = ParquetFile {
|
let parquet_file = ParquetFile::from_params(
|
||||||
id: ParquetFileId::new(stage.parquet_files.len() as i64 + 1),
|
parquet_file_params,
|
||||||
shard_id,
|
ParquetFileId::new(stage.parquet_files.len() as i64 + 1),
|
||||||
namespace_id,
|
);
|
||||||
table_id,
|
|
||||||
partition_id,
|
|
||||||
object_store_id,
|
|
||||||
max_sequence_number,
|
|
||||||
min_time,
|
|
||||||
max_time,
|
|
||||||
row_count,
|
|
||||||
to_delete: None,
|
|
||||||
file_size_bytes,
|
|
||||||
compaction_level,
|
|
||||||
created_at,
|
|
||||||
column_set,
|
|
||||||
};
|
|
||||||
let compaction_level = parquet_file.compaction_level;
|
let compaction_level = parquet_file.compaction_level;
|
||||||
|
let created_at = parquet_file.created_at;
|
||||||
|
let partition_id = parquet_file.partition_id;
|
||||||
stage.parquet_files.push(parquet_file);
|
stage.parquet_files.push(parquet_file);
|
||||||
|
|
||||||
// Update the new_file_at field its partition to the time of created_at
|
// Update the new_file_at field its partition to the time of created_at
|
||||||
|
|
Loading…
Reference in New Issue