refactor: Make TestDbBuilder reusable for rebuilding databases

Many tests want to exercise behavior that happens on reloading or
replay of a database. This changes the way the TestDbBuilder works to
make it possible to call `build` on a builder multiple times to create
databases with the same options to simulate reload of a database without
having to keep track of which pieces need to match.
pull/24376/head
Carol (Nichols || Goulding) 2021-10-22 14:31:31 -04:00
parent fe155e15fb
commit e838a22f92
1 changed files with 47 additions and 56 deletions

View File

@ -35,16 +35,39 @@ impl TestDb {
} }
} }
#[derive(Debug, Default)] #[derive(Debug)]
pub struct TestDbBuilder { pub struct TestDbBuilder {
server_id: Option<ServerId>, server_id: ServerId,
object_store: Option<Arc<ObjectStore>>, object_store: Arc<ObjectStore>,
db_name: Option<DatabaseName<'static>>, db_name: DatabaseName<'static>,
worker_cleanup_avg_sleep: Option<Duration>, worker_cleanup_avg_sleep: Duration,
write_buffer_producer: Option<Arc<dyn WriteBufferWriting>>, write_buffer_producer: Option<Arc<dyn WriteBufferWriting>>,
lifecycle_rules: Option<LifecycleRules>, lifecycle_rules: LifecycleRules,
partition_template: Option<PartitionTemplate>, partition_template: PartitionTemplate,
time_provider: Option<Arc<dyn TimeProvider>>, time_provider: Arc<dyn TimeProvider>,
}
impl Default for TestDbBuilder {
fn default() -> Self {
Self {
server_id: ServerId::try_from(1).unwrap(),
object_store: Arc::new(ObjectStore::new_in_memory()),
db_name: DatabaseName::new("placeholder").unwrap(),
// make background loop spin a bit faster for tests
worker_cleanup_avg_sleep: Duration::from_secs(1),
write_buffer_producer: None,
// default to quick lifecycle rules for faster tests
lifecycle_rules: LifecycleRules {
late_arrive_window_seconds: NonZeroU32::try_from(1).unwrap(),
..Default::default()
},
// default to hourly
partition_template: PartitionTemplate {
parts: vec![TemplatePart::TimeFormat("%Y-%m-%dT%H".to_string())],
},
time_provider: Arc::new(time::SystemProvider::new()),
}
}
} }
impl TestDbBuilder { impl TestDbBuilder {
@ -52,37 +75,23 @@ impl TestDbBuilder {
Self::default() Self::default()
} }
pub async fn build(self) -> TestDb { pub async fn build(&self) -> TestDb {
let server_id = self let server_id = self.server_id;
.server_id let object_store = Arc::clone(&self.object_store);
.unwrap_or_else(|| ServerId::try_from(1).unwrap()); let db_name = self.db_name.clone();
let db_name = self let time_provider = Arc::clone(&self.time_provider);
.db_name
.unwrap_or_else(|| DatabaseName::new("placeholder").unwrap());
let time_provider = self
.time_provider
.clone()
.take()
.unwrap_or_else(|| Arc::new(time::SystemProvider::new()));
let object_store = self
.object_store
.unwrap_or_else(|| Arc::new(ObjectStore::new_in_memory()));
let iox_object_store = let iox_object_store =
IoxObjectStore::find_existing(Arc::clone(&object_store), server_id, &db_name) IoxObjectStore::find_existing(Arc::clone(&object_store), server_id, &db_name)
.await .await
.unwrap(); .unwrap();
let iox_object_store = match iox_object_store { let iox_object_store = match iox_object_store {
Some(ios) => ios, Some(ios) => ios,
None => IoxObjectStore::new(Arc::clone(&object_store), server_id, &db_name) None => IoxObjectStore::new(Arc::clone(&object_store), server_id, &db_name)
.await .await
.unwrap(), .unwrap(),
}; };
let iox_object_store = Arc::new(iox_object_store); let iox_object_store = Arc::new(iox_object_store);
// deterministic thread and concurrency count // deterministic thread and concurrency count
@ -105,27 +114,9 @@ impl TestDbBuilder {
.unwrap(); .unwrap();
let mut rules = DatabaseRules::new(db_name); let mut rules = DatabaseRules::new(db_name);
rules.worker_cleanup_avg_sleep = self.worker_cleanup_avg_sleep;
// make background loop spin a bit faster for tests rules.lifecycle_rules = self.lifecycle_rules.clone();
rules.worker_cleanup_avg_sleep = self rules.partition_template = self.partition_template.clone();
.worker_cleanup_avg_sleep
.unwrap_or_else(|| Duration::from_secs(1));
// default to quick lifecycle rules for faster tests
rules.lifecycle_rules = self.lifecycle_rules.unwrap_or_else(|| LifecycleRules {
late_arrive_window_seconds: NonZeroU32::try_from(1).unwrap(),
..Default::default()
});
// set partion template
if let Some(partition_template) = self.partition_template {
rules.partition_template = partition_template;
} else {
// default to hourly
rules.partition_template = PartitionTemplate {
parts: vec![TemplatePart::TimeFormat("%Y-%m-%dT%H".to_string())],
};
}
let jobs = Arc::new(JobRegistry::new( let jobs = Arc::new(JobRegistry::new(
Default::default(), Default::default(),
@ -138,7 +129,7 @@ impl TestDbBuilder {
iox_object_store, iox_object_store,
preserved_catalog, preserved_catalog,
catalog, catalog,
write_buffer_producer: self.write_buffer_producer, write_buffer_producer: self.write_buffer_producer.clone(),
exec, exec,
metric_registry: Arc::clone(&metric_registry), metric_registry: Arc::clone(&metric_registry),
time_provider, time_provider,
@ -152,22 +143,22 @@ impl TestDbBuilder {
} }
pub fn server_id(mut self, server_id: ServerId) -> Self { pub fn server_id(mut self, server_id: ServerId) -> Self {
self.server_id = Some(server_id); self.server_id = server_id;
self self
} }
pub fn object_store(mut self, object_store: Arc<ObjectStore>) -> Self { pub fn object_store(mut self, object_store: Arc<ObjectStore>) -> Self {
self.object_store = Some(object_store); self.object_store = object_store;
self self
} }
pub fn db_name<T: Into<Cow<'static, str>>>(mut self, db_name: T) -> Self { pub fn db_name<T: Into<Cow<'static, str>>>(mut self, db_name: T) -> Self {
self.db_name = Some(DatabaseName::new(db_name).unwrap()); self.db_name = DatabaseName::new(db_name).unwrap();
self self
} }
pub fn worker_cleanup_avg_sleep(mut self, d: Duration) -> Self { pub fn worker_cleanup_avg_sleep(mut self, d: Duration) -> Self {
self.worker_cleanup_avg_sleep = Some(d); self.worker_cleanup_avg_sleep = d;
self self
} }
@ -180,17 +171,17 @@ impl TestDbBuilder {
} }
pub fn lifecycle_rules(mut self, lifecycle_rules: LifecycleRules) -> Self { pub fn lifecycle_rules(mut self, lifecycle_rules: LifecycleRules) -> Self {
self.lifecycle_rules = Some(lifecycle_rules); self.lifecycle_rules = lifecycle_rules;
self self
} }
pub fn partition_template(mut self, template: PartitionTemplate) -> Self { pub fn partition_template(mut self, template: PartitionTemplate) -> Self {
self.partition_template = Some(template); self.partition_template = template;
self self
} }
pub fn time_provider(mut self, time_provider: Arc<dyn TimeProvider>) -> Self { pub fn time_provider(mut self, time_provider: Arc<dyn TimeProvider>) -> Self {
self.time_provider = Some(time_provider); self.time_provider = time_provider;
self self
} }
} }