diff --git a/persistence_windows/src/persistence_windows.rs b/persistence_windows/src/persistence_windows.rs index 1b5ae73d29..5b66a593f8 100644 --- a/persistence_windows/src/persistence_windows.rs +++ b/persistence_windows/src/persistence_windows.rs @@ -133,6 +133,12 @@ impl PersistenceWindows { } } + /// Updates the late arrival period of this `PersistenceWindows` instance + pub fn set_late_arrival_period(&mut self, late_arrival_period: Duration) { + self.closed_window_period = late_arrival_period.min(DEFAULT_CLOSED_WINDOW_PERIOD); + self.late_arrival_period = late_arrival_period; + } + /// Updates the windows with the information from a batch of rows from a single sequencer /// to the same partition. The min and max times are the times on the row data. The `received_at` /// Instant is when the data was received. Taking it in this function is really just about diff --git a/server/src/config.rs b/server/src/config.rs index 4554e8e912..f869319a87 100644 --- a/server/src/config.rs +++ b/server/src/config.rs @@ -224,7 +224,7 @@ impl Config { .db_initialized(db_name) .context(DatabaseNotFound { db_name })?; - db.update_db_rules(update).map_err(UpdateError::Closure) + db.update_rules(update).map_err(UpdateError::Closure) } /// Get all registered remote servers. diff --git a/server/src/db.rs b/server/src/db.rs index dc198aa292..f94eb4b28c 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -333,13 +333,40 @@ impl Db { } /// Updates the database rules - pub fn update_db_rules(&self, update: F) -> Result, E> + pub fn update_rules(&self, update: F) -> Result, E> where F: FnOnce(DatabaseRules) -> Result, { - let mut rules = self.rules.write(); - let new_rules = Arc::new(update(rules.as_ref().clone())?); - *rules = Arc::clone(&new_rules); + let (late_arrive_window_updated, new_rules) = { + let mut rules = self.rules.write(); + info!(db_name=%rules.name, "updating rules for database"); + let new_rules = Arc::new(update(rules.as_ref().clone())?); + let late_arrive_window_updated = rules.lifecycle_rules.late_arrive_window_seconds + != new_rules.lifecycle_rules.late_arrive_window_seconds; + + *rules = Arc::clone(&new_rules); + (late_arrive_window_updated, new_rules) + }; + + if late_arrive_window_updated { + // Hold a read lock to prevent concurrent modification and + // use values from re-acquired read guard + let current = self.rules.read(); + + // Update windows + let partitions = self.catalog.partitions(); + for partition in &partitions { + let mut partition = partition.write(); + let addr = partition.addr().clone(); + if let Some(windows) = partition.persistence_windows_mut() { + info!(partition=%addr, "updating persistence windows"); + windows.set_late_arrival_period(Duration::from_secs( + current.lifecycle_rules.late_arrive_window_seconds.get() as u64, + )) + } + } + } + Ok(new_rules) } diff --git a/server/src/lib.rs b/server/src/lib.rs index 48246918d1..e2dc829e31 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -1733,7 +1733,7 @@ mod tests { let remote_ids = vec![bad_remote_id, good_remote_id_1, good_remote_id_2]; let db = server.db(&db_name).unwrap(); - db.update_db_rules(|mut rules| { + db.update_rules(|mut rules| { let shard_config = ShardConfig { hash_ring: Some(HashRing { shards: vec![TEST_SHARD_ID].into(), @@ -1976,7 +1976,7 @@ mod tests { let db_name = DatabaseName::new("foo").unwrap(); let db = server.db(&db_name).unwrap(); let rules = db - .update_db_rules(|mut rules| { + .update_rules(|mut rules| { rules.lifecycle_rules.buffer_size_hard = Some(std::num::NonZeroUsize::new(10).unwrap()); Ok::<_, Infallible>(rules) diff --git a/tests/end_to_end_cases/persistence.rs b/tests/end_to_end_cases/persistence.rs index 634c506856..af4dccfa61 100644 --- a/tests/end_to_end_cases/persistence.rs +++ b/tests/end_to_end_cases/persistence.rs @@ -49,16 +49,43 @@ async fn test_chunk_is_persisted_automatically() { assert_eq!(chunks[0].row_count, 1_000); } +async fn write_data( + write_client: &mut influxdb_iox_client::write::Client, + db_name: &str, + num_payloads: u64, + num_duplicates: u64, + payload_size: u64, +) { + let payloads: Vec<_> = (0..num_payloads) + .map(|x| { + (0..payload_size) + .map(|i| format!("data,tag{}=val{} x={} {}", x, i, i * 10, i)) + .join("\n") + }) + .collect(); + + for payload in &payloads { + // Writing the same data multiple times should be compacted away + for _ in 0..=num_duplicates { + let num_lines_written = write_client + .write(db_name, payload) + .await + .expect("successful write"); + assert_eq!(num_lines_written, payload_size as usize); + } + } +} + #[tokio::test] async fn test_full_lifecycle() { let fixture = ServerFixture::create_shared().await; let mut write_client = fixture.write_client(); let num_payloads = 10; - let num_duplicates = 2; + let num_duplicates = 1; let payload_size = 1_000; - let total_rows = num_payloads * num_duplicates * payload_size; + let total_rows = num_payloads * (1 + num_duplicates) * payload_size; let db_name = rand_name(); DatabaseBuilder::new(db_name.clone()) @@ -73,24 +100,14 @@ async fn test_full_lifecycle() { .build(fixture.grpc_channel()) .await; - let payloads: Vec<_> = (0..num_payloads) - .map(|x| { - (0..payload_size) - .map(|i| format!("data,tag{}=val{} x={} {}", x, i, i * 10, i)) - .join("\n") - }) - .collect(); - - for payload in &payloads { - // Writing the same data multiple times should be compacted away - for _ in 0..num_duplicates { - let num_lines_written = write_client - .write(&db_name, payload) - .await - .expect("successful write"); - assert_eq!(num_lines_written, payload_size as usize); - } - } + write_data( + &mut write_client, + &db_name, + num_payloads, + num_duplicates, + payload_size, + ) + .await; wait_for_exact_chunk_states( &fixture, @@ -123,6 +140,58 @@ async fn test_full_lifecycle() { assert_eq!(chunks[0].row_count, (num_payloads * payload_size) as usize) } +#[tokio::test] +async fn test_update_late_arrival() { + let fixture = ServerFixture::create_shared().await; + let mut write_client = fixture.write_client(); + + let payload_size = 100; + + let db_name = rand_name(); + DatabaseBuilder::new(db_name.clone()) + .persist(true) + // Don't close MUB automatically + .mub_row_threshold(payload_size * 2) + .persist_row_threshold(payload_size) + .persist_age_threshold_seconds(1000) + // Initially set to be a large value + .late_arrive_window_seconds(1000) + .build(fixture.grpc_channel()) + .await; + + write_data(&mut write_client, &db_name, 1, 0, payload_size).await; + + let mut management = fixture.management_client(); + + let chunks = management.list_chunks(&db_name).await.unwrap(); + assert_eq!(chunks.len(), 1); + assert_eq!( + chunks[0].storage, + influxdb_iox_client::management::generated_types::ChunkStorage::OpenMutableBuffer as i32 + ); + + let mut rules = management.get_database(&db_name).await.unwrap(); + rules + .lifecycle_rules + .as_mut() + .unwrap() + .late_arrive_window_seconds = 1; + + fixture + .management_client() + .update_database(rules) + .await + .unwrap(); + + wait_for_exact_chunk_states( + &fixture, + &db_name, + vec![ChunkStorage::ReadBufferAndObjectStore], + std::time::Duration::from_secs(5), + ) + .await; +} + #[tokio::test] async fn test_query_chunk_after_restart() { // fixtures