fix: ParquetFileRepo create should be responsible for setting INITIAL_COMPACTION_LEVEL
When created in the catalog, parquet files should always have compaction level 0. Updating the compaction level should always happen in the compactor. Only the catalog should need to know about the initial compaction level value.pull/24376/head
parent
ff31407dce
commit
ecd06c6ec3
|
@ -679,15 +679,10 @@ pub struct ParquetFileParams {
|
|||
pub parquet_metadata: Vec<u8>,
|
||||
/// the number of rows of data in this file
|
||||
pub row_count: i64,
|
||||
/// the compaction level of the file
|
||||
pub compaction_level: i16,
|
||||
/// the creation time of the parquet file
|
||||
pub created_at: Timestamp,
|
||||
}
|
||||
|
||||
/// The starting compaction level for parquet files is zero.
|
||||
pub const INITIAL_COMPACTION_LEVEL: i16 = 0;
|
||||
|
||||
/// Data for a processed tombstone reference in the catalog.
|
||||
#[derive(Debug, Copy, Clone, PartialEq, sqlx::FromRow)]
|
||||
pub struct ProcessedTombstone {
|
||||
|
|
|
@ -1904,7 +1904,6 @@ mod tests {
|
|||
file_size_bytes: 0,
|
||||
parquet_metadata: vec![],
|
||||
row_count: 0,
|
||||
compaction_level: 0,
|
||||
created_at: Timestamp::new(1),
|
||||
};
|
||||
repos
|
||||
|
|
|
@ -406,6 +406,9 @@ pub trait TombstoneRepo: Send + Sync {
|
|||
) -> Result<Vec<Tombstone>>;
|
||||
}
|
||||
|
||||
/// The starting compaction level for parquet files is zero.
|
||||
pub const INITIAL_COMPACTION_LEVEL: i16 = 0;
|
||||
|
||||
/// Functions for working with parquet file pointers in the catalog
|
||||
#[async_trait]
|
||||
pub trait ParquetFileRepo: Send + Sync {
|
||||
|
@ -755,7 +758,6 @@ pub(crate) mod test_helpers {
|
|||
file_size_bytes: 0,
|
||||
parquet_metadata: vec![],
|
||||
row_count: 0,
|
||||
compaction_level: 0,
|
||||
created_at: Timestamp::new(1),
|
||||
};
|
||||
let p1 = repos
|
||||
|
@ -1275,7 +1277,6 @@ pub(crate) mod test_helpers {
|
|||
file_size_bytes: 1337,
|
||||
parquet_metadata: b"md1".to_vec(),
|
||||
row_count: 0,
|
||||
compaction_level: 0,
|
||||
created_at: Timestamp::new(1),
|
||||
};
|
||||
let parquet_file = repos
|
||||
|
@ -1480,7 +1481,6 @@ pub(crate) mod test_helpers {
|
|||
file_size_bytes: 1337,
|
||||
parquet_metadata: b"md1".to_vec(),
|
||||
row_count: 0,
|
||||
compaction_level: 0,
|
||||
created_at: Timestamp::new(1),
|
||||
};
|
||||
let other_parquet = ParquetFileParams {
|
||||
|
|
|
@ -6,7 +6,7 @@ use crate::{
|
|||
sealed::TransactionFinalize, Catalog, ColumnRepo, ColumnUpsertRequest, Error,
|
||||
KafkaTopicRepo, NamespaceRepo, ParquetFileRepo, PartitionRepo, ProcessedTombstoneRepo,
|
||||
QueryPoolRepo, RepoCollection, Result, SequencerRepo, TablePersistInfo, TableRepo,
|
||||
TombstoneRepo, Transaction,
|
||||
TombstoneRepo, Transaction, INITIAL_COMPACTION_LEVEL,
|
||||
},
|
||||
metrics::MetricDecorator,
|
||||
};
|
||||
|
@ -764,7 +764,6 @@ impl ParquetFileRepo for MemTxn {
|
|||
file_size_bytes,
|
||||
parquet_metadata,
|
||||
row_count,
|
||||
compaction_level,
|
||||
created_at,
|
||||
} = parquet_file_params;
|
||||
|
||||
|
@ -790,7 +789,7 @@ impl ParquetFileRepo for MemTxn {
|
|||
to_delete: false,
|
||||
file_size_bytes,
|
||||
parquet_metadata,
|
||||
compaction_level,
|
||||
compaction_level: INITIAL_COMPACTION_LEVEL,
|
||||
created_at,
|
||||
};
|
||||
stage.parquet_files.push(parquet_file);
|
||||
|
|
|
@ -5,7 +5,7 @@ use crate::{
|
|||
sealed::TransactionFinalize, Catalog, ColumnRepo, ColumnUpsertRequest, Error,
|
||||
KafkaTopicRepo, NamespaceRepo, ParquetFileRepo, PartitionRepo, ProcessedTombstoneRepo,
|
||||
QueryPoolRepo, RepoCollection, Result, SequencerRepo, TablePersistInfo, TableRepo,
|
||||
TombstoneRepo, Transaction,
|
||||
TombstoneRepo, Transaction, INITIAL_COMPACTION_LEVEL,
|
||||
},
|
||||
metrics::MetricDecorator,
|
||||
};
|
||||
|
@ -1171,7 +1171,6 @@ impl ParquetFileRepo for PostgresTxn {
|
|||
file_size_bytes,
|
||||
parquet_metadata,
|
||||
row_count,
|
||||
compaction_level,
|
||||
created_at,
|
||||
} = parquet_file_params;
|
||||
|
||||
|
@ -1193,7 +1192,7 @@ RETURNING *
|
|||
.bind(file_size_bytes) // $9
|
||||
.bind(parquet_metadata) // $10
|
||||
.bind(row_count) // $11
|
||||
.bind(compaction_level) // $12
|
||||
.bind(INITIAL_COMPACTION_LEVEL) // $12
|
||||
.bind(created_at) // $13
|
||||
.fetch_one(&mut self.inner)
|
||||
.await
|
||||
|
|
|
@ -315,7 +315,6 @@ impl TestPartition {
|
|||
file_size_bytes: file_size_bytes as i64,
|
||||
parquet_metadata: parquet_metadata_bin,
|
||||
row_count: row_count as i64,
|
||||
compaction_level: 0,
|
||||
created_at: Timestamp::new(1),
|
||||
};
|
||||
let parquet_file = repos
|
||||
|
|
|
@ -92,7 +92,6 @@ use data_types::{
|
|||
};
|
||||
use data_types2::{
|
||||
NamespaceId, ParquetFileParams, PartitionId, SequenceNumber, SequencerId, TableId, Timestamp,
|
||||
INITIAL_COMPACTION_LEVEL,
|
||||
};
|
||||
use generated_types::influxdata::iox::{
|
||||
ingester::v1 as proto, preserved_catalog::v1 as preserved_catalog,
|
||||
|
@ -648,7 +647,6 @@ impl IoxMetadata {
|
|||
file_size_bytes: file_size_bytes as i64,
|
||||
parquet_metadata: metadata.thrift_bytes().to_vec(),
|
||||
row_count: self.row_count,
|
||||
compaction_level: INITIAL_COMPACTION_LEVEL,
|
||||
created_at: Timestamp::new(self.creation_timestamp.timestamp_nanos()),
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue