refactor: `catalog_checkpoint_interval` => `catalog_transactions_until_checkpoint`

pull/24376/head
Marco Neumann 2021-06-14 10:34:32 +02:00
parent 2eb2aca091
commit f4693e36c0
8 changed files with 21 additions and 16 deletions

View File

@ -153,7 +153,7 @@ pub struct LifecycleRules {
pub worker_backoff_millis: Option<NonZeroU64>,
/// After how many transactions should IOx write a new checkpoint?
pub catalog_checkpoint_interval: Option<NonZeroU64>,
pub catalog_transactions_until_checkpoint: Option<NonZeroU64>,
}
/// This struct specifies the rules for the order to sort partitions

View File

@ -114,7 +114,7 @@ message LifecycleRules {
// After how many transactions should IOx write a new checkpoint?
//
// If 0 / absent, this default to 100.
uint64 catalog_checkpoint_interval = 11;
uint64 catalog_transactions_until_checkpoint = 11;
}
message DatabaseRules {

View File

@ -35,8 +35,8 @@ impl From<LifecycleRules> for management::LifecycleRules {
persist: config.persist,
immutable: config.immutable,
worker_backoff_millis: config.worker_backoff_millis.map_or(0, NonZeroU64::get),
catalog_checkpoint_interval: config
.catalog_checkpoint_interval
catalog_transactions_until_checkpoint: config
.catalog_transactions_until_checkpoint
.map_or(100, NonZeroU64::get),
}
}
@ -57,7 +57,9 @@ impl TryFrom<management::LifecycleRules> for LifecycleRules {
persist: proto.persist,
immutable: proto.immutable,
worker_backoff_millis: NonZeroU64::new(proto.worker_backoff_millis),
catalog_checkpoint_interval: NonZeroU64::new(proto.catalog_checkpoint_interval),
catalog_transactions_until_checkpoint: NonZeroU64::new(
proto.catalog_transactions_until_checkpoint,
),
})
}
}
@ -145,7 +147,7 @@ mod tests {
persist: true,
immutable: true,
worker_backoff_millis: 1000,
catalog_checkpoint_interval: 10,
catalog_transactions_until_checkpoint: 10,
};
let config: LifecycleRules = protobuf.clone().try_into().unwrap();

View File

@ -656,7 +656,7 @@ impl Db {
.rules
.read()
.lifecycle_rules
.catalog_checkpoint_interval
.catalog_transactions_until_checkpoint
.map_or(false, |interval| {
transaction.revision_counter() % interval.get() == 0
});
@ -3030,7 +3030,7 @@ mod tests {
.object_store(Arc::clone(&object_store))
.server_id(server_id)
.db_name(db_name)
.catalog_checkpoint_interval(NonZeroU64::try_from(2).unwrap())
.catalog_transactions_until_checkpoint(NonZeroU64::try_from(2).unwrap())
.build()
.await;
let db = Arc::new(test_db.db);

View File

@ -35,7 +35,7 @@ pub struct TestDbBuilder {
db_name: Option<DatabaseName<'static>>,
worker_cleanup_avg_sleep: Option<Duration>,
write_buffer: Option<Arc<dyn WriteBuffer>>,
catalog_checkpoint_interval: Option<NonZeroU64>,
catalog_transactions_until_checkpoint: Option<NonZeroU64>,
}
impl TestDbBuilder {
@ -74,7 +74,8 @@ impl TestDbBuilder {
.unwrap_or_else(|| Duration::from_secs(1));
// enable checkpointing
rules.lifecycle_rules.catalog_checkpoint_interval = self.catalog_checkpoint_interval;
rules.lifecycle_rules.catalog_transactions_until_checkpoint =
self.catalog_transactions_until_checkpoint;
TestDb {
metric_registry: metrics::TestMetricRegistry::new(metrics_registry),
@ -115,8 +116,8 @@ impl TestDbBuilder {
self
}
pub fn catalog_checkpoint_interval(mut self, interval: NonZeroU64) -> Self {
self.catalog_checkpoint_interval = Some(interval);
pub fn catalog_transactions_until_checkpoint(mut self, interval: NonZeroU64) -> Self {
self.catalog_transactions_until_checkpoint = Some(interval);
self
}
}

View File

@ -106,7 +106,7 @@ async fn setup(object_store: Arc<ObjectStore>, done: &Mutex<bool>) {
async fn create_persisted_db(object_store: Arc<ObjectStore>) -> TestDb {
TestDb::builder()
.object_store(object_store)
.catalog_checkpoint_interval(NonZeroU64::try_from(CHECKPOINT_INTERVAL).unwrap())
.catalog_transactions_until_checkpoint(NonZeroU64::try_from(CHECKPOINT_INTERVAL).unwrap())
.build()
.await
}

View File

@ -116,7 +116,7 @@ struct Create {
/// After how many transactions should IOx write a new checkpoint?
#[structopt(long, default_value = "100", parse(try_from_str))]
catalog_checkpoint_interval: NonZeroU64,
catalog_transactions_until_checkpoint: NonZeroU64,
}
/// Get list of databases
@ -185,7 +185,9 @@ pub async fn command(url: String, config: Config) -> Result<()> {
persist: command.persist,
immutable: command.immutable,
worker_backoff_millis: Default::default(),
catalog_checkpoint_interval: command.catalog_checkpoint_interval.get(),
catalog_transactions_until_checkpoint: command
.catalog_transactions_until_checkpoint
.get(),
}),
// Default to hourly partitions

View File

@ -214,7 +214,7 @@ async fn test_create_get_update_database() {
order: Order::Asc as _,
sort: Some(lifecycle_rules::sort_order::Sort::CreatedAtTime(Empty {})),
}),
catalog_checkpoint_interval: 13,
catalog_transactions_until_checkpoint: 13,
..Default::default()
}),
routing_rules: None,