fix: update PersistenceWindows on rules update (#2018) (#2060)

* fix: update PersistenceWindows on rules update (#2018)

* chore: review feedback

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Raphael Taylor-Davies 2021-07-20 13:44:47 +01:00 committed by GitHub
parent 7d9e1f9704
commit e4d2c51e8b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 129 additions and 27 deletions

View File

@ -133,6 +133,12 @@ impl PersistenceWindows {
} }
} }
/// Updates the late arrival period of this `PersistenceWindows` instance
pub fn set_late_arrival_period(&mut self, late_arrival_period: Duration) {
self.closed_window_period = late_arrival_period.min(DEFAULT_CLOSED_WINDOW_PERIOD);
self.late_arrival_period = late_arrival_period;
}
/// Updates the windows with the information from a batch of rows from a single sequencer /// Updates the windows with the information from a batch of rows from a single sequencer
/// to the same partition. The min and max times are the times on the row data. The `received_at` /// to the same partition. The min and max times are the times on the row data. The `received_at`
/// Instant is when the data was received. Taking it in this function is really just about /// Instant is when the data was received. Taking it in this function is really just about

View File

@ -224,7 +224,7 @@ impl Config {
.db_initialized(db_name) .db_initialized(db_name)
.context(DatabaseNotFound { db_name })?; .context(DatabaseNotFound { db_name })?;
db.update_db_rules(update).map_err(UpdateError::Closure) db.update_rules(update).map_err(UpdateError::Closure)
} }
/// Get all registered remote servers. /// Get all registered remote servers.

View File

@ -333,13 +333,40 @@ impl Db {
} }
/// Updates the database rules /// Updates the database rules
pub fn update_db_rules<F, E>(&self, update: F) -> Result<Arc<DatabaseRules>, E> pub fn update_rules<F, E>(&self, update: F) -> Result<Arc<DatabaseRules>, E>
where where
F: FnOnce(DatabaseRules) -> Result<DatabaseRules, E>, F: FnOnce(DatabaseRules) -> Result<DatabaseRules, E>,
{ {
let (late_arrive_window_updated, new_rules) = {
let mut rules = self.rules.write(); let mut rules = self.rules.write();
info!(db_name=%rules.name, "updating rules for database");
let new_rules = Arc::new(update(rules.as_ref().clone())?); let new_rules = Arc::new(update(rules.as_ref().clone())?);
let late_arrive_window_updated = rules.lifecycle_rules.late_arrive_window_seconds
!= new_rules.lifecycle_rules.late_arrive_window_seconds;
*rules = Arc::clone(&new_rules); *rules = Arc::clone(&new_rules);
(late_arrive_window_updated, new_rules)
};
if late_arrive_window_updated {
// Hold a read lock to prevent concurrent modification and
// use values from re-acquired read guard
let current = self.rules.read();
// Update windows
let partitions = self.catalog.partitions();
for partition in &partitions {
let mut partition = partition.write();
let addr = partition.addr().clone();
if let Some(windows) = partition.persistence_windows_mut() {
info!(partition=%addr, "updating persistence windows");
windows.set_late_arrival_period(Duration::from_secs(
current.lifecycle_rules.late_arrive_window_seconds.get() as u64,
))
}
}
}
Ok(new_rules) Ok(new_rules)
} }

View File

@ -1733,7 +1733,7 @@ mod tests {
let remote_ids = vec![bad_remote_id, good_remote_id_1, good_remote_id_2]; let remote_ids = vec![bad_remote_id, good_remote_id_1, good_remote_id_2];
let db = server.db(&db_name).unwrap(); let db = server.db(&db_name).unwrap();
db.update_db_rules(|mut rules| { db.update_rules(|mut rules| {
let shard_config = ShardConfig { let shard_config = ShardConfig {
hash_ring: Some(HashRing { hash_ring: Some(HashRing {
shards: vec![TEST_SHARD_ID].into(), shards: vec![TEST_SHARD_ID].into(),
@ -1976,7 +1976,7 @@ mod tests {
let db_name = DatabaseName::new("foo").unwrap(); let db_name = DatabaseName::new("foo").unwrap();
let db = server.db(&db_name).unwrap(); let db = server.db(&db_name).unwrap();
let rules = db let rules = db
.update_db_rules(|mut rules| { .update_rules(|mut rules| {
rules.lifecycle_rules.buffer_size_hard = rules.lifecycle_rules.buffer_size_hard =
Some(std::num::NonZeroUsize::new(10).unwrap()); Some(std::num::NonZeroUsize::new(10).unwrap());
Ok::<_, Infallible>(rules) Ok::<_, Infallible>(rules)

View File

@ -49,16 +49,43 @@ async fn test_chunk_is_persisted_automatically() {
assert_eq!(chunks[0].row_count, 1_000); assert_eq!(chunks[0].row_count, 1_000);
} }
async fn write_data(
write_client: &mut influxdb_iox_client::write::Client,
db_name: &str,
num_payloads: u64,
num_duplicates: u64,
payload_size: u64,
) {
let payloads: Vec<_> = (0..num_payloads)
.map(|x| {
(0..payload_size)
.map(|i| format!("data,tag{}=val{} x={} {}", x, i, i * 10, i))
.join("\n")
})
.collect();
for payload in &payloads {
// Writing the same data multiple times should be compacted away
for _ in 0..=num_duplicates {
let num_lines_written = write_client
.write(db_name, payload)
.await
.expect("successful write");
assert_eq!(num_lines_written, payload_size as usize);
}
}
}
#[tokio::test] #[tokio::test]
async fn test_full_lifecycle() { async fn test_full_lifecycle() {
let fixture = ServerFixture::create_shared().await; let fixture = ServerFixture::create_shared().await;
let mut write_client = fixture.write_client(); let mut write_client = fixture.write_client();
let num_payloads = 10; let num_payloads = 10;
let num_duplicates = 2; let num_duplicates = 1;
let payload_size = 1_000; let payload_size = 1_000;
let total_rows = num_payloads * num_duplicates * payload_size; let total_rows = num_payloads * (1 + num_duplicates) * payload_size;
let db_name = rand_name(); let db_name = rand_name();
DatabaseBuilder::new(db_name.clone()) DatabaseBuilder::new(db_name.clone())
@ -73,24 +100,14 @@ async fn test_full_lifecycle() {
.build(fixture.grpc_channel()) .build(fixture.grpc_channel())
.await; .await;
let payloads: Vec<_> = (0..num_payloads) write_data(
.map(|x| { &mut write_client,
(0..payload_size) &db_name,
.map(|i| format!("data,tag{}=val{} x={} {}", x, i, i * 10, i)) num_payloads,
.join("\n") num_duplicates,
}) payload_size,
.collect(); )
.await;
for payload in &payloads {
// Writing the same data multiple times should be compacted away
for _ in 0..num_duplicates {
let num_lines_written = write_client
.write(&db_name, payload)
.await
.expect("successful write");
assert_eq!(num_lines_written, payload_size as usize);
}
}
wait_for_exact_chunk_states( wait_for_exact_chunk_states(
&fixture, &fixture,
@ -123,6 +140,58 @@ async fn test_full_lifecycle() {
assert_eq!(chunks[0].row_count, (num_payloads * payload_size) as usize) assert_eq!(chunks[0].row_count, (num_payloads * payload_size) as usize)
} }
#[tokio::test]
async fn test_update_late_arrival() {
let fixture = ServerFixture::create_shared().await;
let mut write_client = fixture.write_client();
let payload_size = 100;
let db_name = rand_name();
DatabaseBuilder::new(db_name.clone())
.persist(true)
// Don't close MUB automatically
.mub_row_threshold(payload_size * 2)
.persist_row_threshold(payload_size)
.persist_age_threshold_seconds(1000)
// Initially set to be a large value
.late_arrive_window_seconds(1000)
.build(fixture.grpc_channel())
.await;
write_data(&mut write_client, &db_name, 1, 0, payload_size).await;
let mut management = fixture.management_client();
let chunks = management.list_chunks(&db_name).await.unwrap();
assert_eq!(chunks.len(), 1);
assert_eq!(
chunks[0].storage,
influxdb_iox_client::management::generated_types::ChunkStorage::OpenMutableBuffer as i32
);
let mut rules = management.get_database(&db_name).await.unwrap();
rules
.lifecycle_rules
.as_mut()
.unwrap()
.late_arrive_window_seconds = 1;
fixture
.management_client()
.update_database(rules)
.await
.unwrap();
wait_for_exact_chunk_states(
&fixture,
&db_name,
vec![ChunkStorage::ReadBufferAndObjectStore],
std::time::Duration::from_secs(5),
)
.await;
}
#[tokio::test] #[tokio::test]
async fn test_query_chunk_after_restart() { async fn test_query_chunk_after_restart() {
// fixtures // fixtures