From 2a19606456bc0fec1746fdf1a96c3b84fd855692 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Wed, 31 Aug 2022 15:48:18 +0200 Subject: [PATCH] feat(ingester): restrict partition row count This limit restricts a single partition to containing at most N rows before it is marked for persistence (note: being marked for persistence does not currently prevent further ingest for that partition.) --- clap_blocks/src/ingester.rs | 9 ++ influxdb_iox/src/commands/run/all_in_one.rs | 1 + ingester/src/data.rs | 138 +++++++++++++++++- ingester/src/handler.rs | 2 + ingester/src/lifecycle.rs | 151 ++++++++++++++++---- ingester/src/stream_handler/handler.rs | 20 ++- ioxd_ingester/src/lib.rs | 1 + query_tests/src/scenarios/util.rs | 1 + 8 files changed, 287 insertions(+), 36 deletions(-) diff --git a/clap_blocks/src/ingester.rs b/clap_blocks/src/ingester.rs index 052f287170..c574409f92 100644 --- a/clap_blocks/src/ingester.rs +++ b/clap_blocks/src/ingester.rs @@ -75,6 +75,15 @@ pub struct IngesterConfig { )] pub persist_partition_cold_threshold_seconds: u64, + /// Trigger persistence of a partition if it contains more than this many rows. + #[clap( + long = "--persist-partition-max-rows", + env = "INFLUXDB_IOX_PERSIST_PARTITION_MAX_ROWS", + default_value = "500000", + action + )] + pub persist_partition_rows_max: usize, + /// If the catalog's max sequence number for the partition is no longer available in the write /// buffer due to the retention policy, by default the ingester will panic. If this flag is /// specified, the ingester will skip any sequence numbers that have not been retained in the diff --git a/influxdb_iox/src/commands/run/all_in_one.rs b/influxdb_iox/src/commands/run/all_in_one.rs index 00b0f2c2e7..9ae8a204ca 100644 --- a/influxdb_iox/src/commands/run/all_in_one.rs +++ b/influxdb_iox/src/commands/run/all_in_one.rs @@ -407,6 +407,7 @@ impl Config { skip_to_oldest_available, test_flight_do_get_panic: 0, concurrent_request_limit: 10, + persist_partition_rows_max: 500_000, }; // create a CompactorConfig for the all in one server based on diff --git a/ingester/src/data.rs b/ingester/src/data.rs index a163cfeae2..51b3c16734 100644 --- a/ingester/src/data.rs +++ b/ingester/src/data.rs @@ -937,8 +937,13 @@ impl TableData { } } - let should_pause = - lifecycle_handle.log_write(partition_data.id, shard_id, sequence_number, batch.size()); + let should_pause = lifecycle_handle.log_write( + partition_data.id, + shard_id, + sequence_number, + batch.size(), + batch.rows(), + ); partition_data.buffer_write(sequence_number, batch)?; Ok(should_pause) @@ -1846,6 +1851,7 @@ mod tests { 0, Duration::from_secs(1), Duration::from_secs(1), + 1000000, ), metrics, Arc::new(SystemProvider::new()), @@ -1866,6 +1872,106 @@ mod tests { assert!(should_pause); } + #[tokio::test] + async fn persist_row_count_trigger() { + let metrics = Arc::new(metric::Registry::new()); + let catalog: Arc = Arc::new(MemCatalog::new(Arc::clone(&metrics))); + let mut repos = catalog.repositories().await; + let topic = repos.topics().create_or_get("whatevs").await.unwrap(); + let query_pool = repos.query_pools().create_or_get("whatevs").await.unwrap(); + let shard_index = ShardIndex::new(0); + let namespace = repos + .namespaces() + .create("foo", "inf", topic.id, query_pool.id) + .await + .unwrap(); + let shard1 = repos + .shards() + .create_or_get(&topic, shard_index) + .await + .unwrap(); + let mut shards = BTreeMap::new(); + shards.insert( + shard1.id, + ShardData::new(shard1.shard_index, Arc::clone(&metrics)), + ); + + let object_store: Arc = Arc::new(InMemory::new()); + + let data = Arc::new(IngesterData::new( + Arc::clone(&object_store), + Arc::clone(&catalog), + shards, + Arc::new(Executor::new(1)), + BackoffConfig::default(), + Arc::clone(&metrics), + )); + + let schema = NamespaceSchema::new(namespace.id, topic.id, query_pool.id); + + let w1 = DmlWrite::new( + "foo", + lines_to_batches("mem foo=1 10\nmem foo=1 11", 0).unwrap(), + Some("1970-01-01".into()), + DmlMeta::sequenced( + Sequence::new(ShardIndex::new(1), SequenceNumber::new(1)), + Time::from_timestamp_millis(42), + None, + 50, + ), + ); + let _schema = validate_or_insert_schema(w1.tables(), &schema, repos.deref_mut()) + .await + .unwrap() + .unwrap(); + + // drop repos so the mem catalog won't deadlock. + std::mem::drop(repos); + + let manager = LifecycleManager::new( + LifecycleConfig::new( + 1000000000, + 0, + 0, + Duration::from_secs(1), + Duration::from_secs(1), + 1, // This row count will be hit + ), + Arc::clone(&metrics), + Arc::new(SystemProvider::new()), + ); + + let should_pause = data + .buffer_operation(shard1.id, DmlOperation::Write(w1), &manager.handle()) + .await + .unwrap(); + // Exceeding the row count doesn't pause ingest (like other partition + // limits) + assert!(!should_pause); + + let partition_id = { + let sd = data.shards.get(&shard1.id).unwrap(); + let n = sd.namespace("foo").unwrap(); + let mem_table = n.table_data("mem").unwrap(); + assert!(n.table_data("mem").is_some()); + let mem_table = mem_table.write().await; + let p = mem_table.partition_data.get(&"1970-01-01".into()).unwrap(); + p.id + }; + + data.persist(partition_id).await; + + // verify that a file got put into object store + let file_paths: Vec<_> = object_store + .list(None) + .await + .unwrap() + .try_collect() + .await + .unwrap(); + assert_eq!(file_paths.len(), 1); + } + #[tokio::test] async fn persist() { let metrics = Arc::new(metric::Registry::new()); @@ -1961,7 +2067,14 @@ mod tests { ); let manager = LifecycleManager::new( - LifecycleConfig::new(1, 0, 0, Duration::from_secs(1), Duration::from_secs(1)), + LifecycleConfig::new( + 1, + 0, + 0, + Duration::from_secs(1), + Duration::from_secs(1), + 1000000, + ), Arc::clone(&metrics), Arc::new(SystemProvider::new()), ); @@ -2186,7 +2299,14 @@ mod tests { drop(repos); // release catalog transaction let manager = LifecycleManager::new( - LifecycleConfig::new(1, 0, 0, Duration::from_secs(1), Duration::from_secs(1)), + LifecycleConfig::new( + 1, + 0, + 0, + Duration::from_secs(1), + Duration::from_secs(1), + 1000000, + ), metrics, Arc::new(SystemProvider::new()), ); @@ -2596,7 +2716,14 @@ mod tests { std::mem::drop(repos); let manager = LifecycleManager::new( - LifecycleConfig::new(1, 0, 0, Duration::from_secs(1), Duration::from_secs(1)), + LifecycleConfig::new( + 1, + 0, + 0, + Duration::from_secs(1), + Duration::from_secs(1), + 1000000, + ), Arc::clone(&metrics), Arc::new(SystemProvider::new()), ); @@ -2715,6 +2842,7 @@ mod tests { 0, Duration::from_secs(1), Duration::from_secs(1), + 1000000, ), metrics, Arc::new(SystemProvider::new()), diff --git a/ingester/src/handler.rs b/ingester/src/handler.rs index 52823371d5..e3acc9333f 100644 --- a/ingester/src/handler.rs +++ b/ingester/src/handler.rs @@ -672,6 +672,7 @@ mod tests { 1000, Duration::from_secs(10), Duration::from_secs(10), + 100000000, ); let ingester = IngestHandlerImpl::new( lifecycle_config, @@ -944,6 +945,7 @@ mod tests { 1000, Duration::from_secs(10), Duration::from_secs(10), + 10000000, ); let ingester = IngestHandlerImpl::new( lifecycle_config, diff --git a/ingester/src/lifecycle.rs b/ingester/src/lifecycle.rs index fe22c10c2c..f559768735 100644 --- a/ingester/src/lifecycle.rs +++ b/ingester/src/lifecycle.rs @@ -30,6 +30,7 @@ pub trait LifecycleHandle: Send + Sync + 'static { shard_id: ShardId, sequence_number: SequenceNumber, bytes_written: usize, + rows_written: usize, ) -> bool; /// Returns true if the `total_bytes` tracked by the manager is less than the pause amount. @@ -59,6 +60,7 @@ impl LifecycleHandle for LifecycleHandleImpl { shard_id: ShardId, sequence_number: SequenceNumber, bytes_written: usize, + rows_written: usize, ) -> bool { let mut s = self.state.lock(); let now = self.time_provider.now(); @@ -72,14 +74,18 @@ impl LifecycleHandle for LifecycleHandleImpl { first_write: now, last_write: now, bytes_written: 0, + rows_written: 0, first_sequence_number: sequence_number, }); stats.bytes_written += bytes_written; stats.last_write = now; + stats.rows_written += rows_written; s.total_bytes += bytes_written; - s.total_bytes > self.config.pause_ingest_size + + // Pause if the server has exceeded the configured memory limit. + s.total_bytes >= self.config.pause_ingest_size } fn can_resume_ingest(&self) -> bool { @@ -124,6 +130,9 @@ pub struct LifecycleManager { persist_age_counter: U64Counter, /// Counter for a partition going cold for writes triggering a persist. persist_cold_counter: U64Counter, + /// Counter tracking the number of times a partition has been evicted for + /// containing too many rows. + persist_rows_counter: U64Counter, } /// The configuration options for the lifecycle on the ingester. @@ -153,6 +162,13 @@ pub struct LifecycleConfig { /// manager will persist it. This is to ensure that cold partitions get cleared out to make /// room for partitions that are actively receiving writes. partition_cold_threshold: Duration, + + /// The maximum number of rows allowed within a single partition before it + /// MUST be persisted. + /// + /// Reaching this limit pauses ingest while the partition is flushed to + /// object storage. + partition_row_max: usize, } impl LifecycleConfig { @@ -164,6 +180,7 @@ impl LifecycleConfig { partition_size_threshold: usize, partition_age_threshold: Duration, partition_cold_threshold: Duration, + partition_row_max: usize, ) -> Self { // this must be true to ensure that persistence will get triggered, freeing up memory assert!(pause_ingest_size > persist_memory_threshold); @@ -174,6 +191,7 @@ impl LifecycleConfig { partition_size_threshold, partition_age_threshold, partition_cold_threshold, + partition_row_max, } } } @@ -215,6 +233,9 @@ struct PartitionLifecycleStats { last_write: Time, /// The number of bytes in the partition as estimated by the mutable batch sizes. bytes_written: usize, + /// The number of rows in the partition as estimated by the mutable batch + /// sizes + snapshots. + rows_written: usize, /// The sequence number the partition received on its first write. This is reset anytime /// the partition is persisted. first_sequence_number: SequenceNumber, @@ -237,6 +258,7 @@ impl LifecycleManager { let persist_size_counter = persist_counter.recorder(&[("trigger", "size")]); let persist_age_counter = persist_counter.recorder(&[("trigger", "age")]); let persist_cold_counter = persist_counter.recorder(&[("trigger", "cold")]); + let persist_rows_counter = persist_counter.recorder(&[("trigger", "rows")]); let job_registry = Arc::new(JobRegistry::new( metric_registry, @@ -251,6 +273,7 @@ impl LifecycleManager { persist_size_counter, persist_age_counter, persist_cold_counter, + persist_rows_counter, } } @@ -296,6 +319,11 @@ impl LifecycleManager { self.persist_cold_counter.inc(1); } + let exceeded_max_rows = s.rows_written >= self.config.partition_row_max; + if exceeded_max_rows { + self.persist_rows_counter.inc(1); + } + let sized_out = s.bytes_written > self.config.partition_size_threshold; if sized_out { self.persist_size_counter.inc(1); @@ -306,7 +334,7 @@ impl LifecycleManager { "Partition is over size threshold, persisting"); } - aged_out || sized_out || is_cold + aged_out || sized_out || is_cold || exceeded_max_rows }); // keep track of what we'll be evicting to see what else to drop @@ -606,6 +634,7 @@ mod tests { partition_size_threshold: 5, partition_age_threshold: Duration::from_nanos(0), partition_cold_threshold: Duration::from_secs(500), + partition_row_max: 100, }; let TestLifecycleManger { m, time_provider, .. @@ -614,25 +643,27 @@ mod tests { let h = m.handle(); // log first two writes at different times - assert!(!h.log_write(PartitionId::new(1), shard_id, SequenceNumber::new(1), 1)); + assert!(!h.log_write(PartitionId::new(1), shard_id, SequenceNumber::new(1), 1, 1)); time_provider.inc(Duration::from_nanos(10)); - assert!(!h.log_write(PartitionId::new(1), shard_id, SequenceNumber::new(2), 1)); + assert!(!h.log_write(PartitionId::new(1), shard_id, SequenceNumber::new(2), 1, 1)); // log another write for different partition using a different handle assert!(!m .handle() - .log_write(PartitionId::new(2), shard_id, SequenceNumber::new(3), 3)); + .log_write(PartitionId::new(2), shard_id, SequenceNumber::new(3), 3, 3)); let stats = m.stats(); assert_eq!(stats.total_bytes, 5); let p1 = stats.partition_stats.get(0).unwrap(); assert_eq!(p1.bytes_written, 2); + assert_eq!(p1.rows_written, 2); assert_eq!(p1.partition_id, PartitionId::new(1)); assert_eq!(p1.first_write, Time::from_timestamp_nanos(0)); let p2 = stats.partition_stats.get(1).unwrap(); assert_eq!(p2.bytes_written, 3); + assert_eq!(p2.rows_written, 3); assert_eq!(p2.partition_id, PartitionId::new(2)); assert_eq!(p2.first_write, Time::from_timestamp_nanos(10)); } @@ -640,11 +671,12 @@ mod tests { #[tokio::test] async fn pausing_and_resuming_ingest() { let config = LifecycleConfig { - pause_ingest_size: 20, + pause_ingest_size: 10, persist_memory_threshold: 10, partition_size_threshold: 5, partition_age_threshold: Duration::from_nanos(0), partition_cold_threshold: Duration::from_secs(500), + partition_row_max: 100, }; let partition_id = PartitionId::new(1); let TestLifecycleManger { mut m, .. } = TestLifecycleManger::new(config); @@ -652,10 +684,11 @@ mod tests { let h = m.handle(); // write more than the limit (10) - assert!(!h.log_write(partition_id, shard_id, SequenceNumber::new(1), 15)); + assert!(h.log_write(partition_id, shard_id, SequenceNumber::new(1), 15, 1)); + assert!(!h.can_resume_ingest()); // all subsequent writes should also indicate a pause - assert!(h.log_write(partition_id, shard_id, SequenceNumber::new(2), 10)); + assert!(h.log_write(partition_id, shard_id, SequenceNumber::new(2), 1, 1)); assert!(!h.can_resume_ingest()); // persist the partition @@ -664,7 +697,62 @@ mod tests { // ingest can resume assert!(h.can_resume_ingest()); - assert!(!h.log_write(partition_id, shard_id, SequenceNumber::new(3), 3)); + assert!(!h.log_write(partition_id, shard_id, SequenceNumber::new(3), 3, 1)); + } + + #[tokio::test] + async fn pausing_and_resuming_ingest_row_count_allowed() { + let config = LifecycleConfig { + pause_ingest_size: 20, + persist_memory_threshold: 10, + partition_size_threshold: 5, + partition_age_threshold: Duration::from_nanos(0), + partition_cold_threshold: Duration::from_secs(500), + partition_row_max: 10, + }; + let partition_id = PartitionId::new(1); + let TestLifecycleManger { mut m, .. } = TestLifecycleManger::new(config); + let shard_id = ShardId::new(1); + let h = m.handle(); + + // No stats exist before writing. + { + let s = m.state.lock(); + assert!(s.partition_stats.get(&partition_id).is_none()); + } + + // write more than the limit (10) and don't get stopped, because the + // per-partition limit does not pause the server from ingesting. + assert!(!h.log_write(partition_id, shard_id, SequenceNumber::new(1), 1, 50)); + assert!(h.can_resume_ingest()); + + // Rows were counted + { + let s = m.state.lock(); + let stats = s + .partition_stats + .get(&partition_id) + .expect("should have partition stats"); + assert_eq!(stats.rows_written, 50); + } + + // all subsequent writes should also be allowed + assert!(!h.log_write(partition_id, shard_id, SequenceNumber::new(2), 1, 1)); + assert!(h.can_resume_ingest()); + + // persist the partition + let persister = Arc::new(TestPersister::default()); + m.maybe_persist(&persister).await; + + // Stats were reset + { + let s = m.state.lock(); + assert!(s.partition_stats.get(&partition_id).is_none()); + } + + // ingest can continue + assert!(h.can_resume_ingest()); + assert!(!h.log_write(partition_id, shard_id, SequenceNumber::new(3), 1, 1)); } #[tokio::test] @@ -675,6 +763,7 @@ mod tests { partition_size_threshold: 5, partition_age_threshold: Duration::from_nanos(0), partition_cold_threshold: Duration::from_secs(500), + partition_row_max: 100, }; let partition_id = PartitionId::new(1); let TestLifecycleManger { mut m, .. } = TestLifecycleManger::new(config); @@ -682,7 +771,7 @@ mod tests { let h = m.handle(); // write more than the limit (20) - h.log_write(partition_id, shard_id, SequenceNumber::new(1), 25); + h.log_write(partition_id, shard_id, SequenceNumber::new(1), 25, 1); // can not resume ingest as we are overall the pause ingest limit assert!(!h.can_resume_ingest()); @@ -710,7 +799,7 @@ mod tests { // ingest can resume assert!(h.can_resume_ingest()); - assert!(!h.log_write(partition_id, shard_id, SequenceNumber::new(2), 3)); + assert!(!h.log_write(partition_id, shard_id, SequenceNumber::new(2), 3, 1)); } #[tokio::test] @@ -721,6 +810,7 @@ mod tests { partition_size_threshold: 10, partition_age_threshold: Duration::from_nanos(5), partition_cold_threshold: Duration::from_secs(500), + partition_row_max: 100, }; let TestLifecycleManger { mut m, @@ -732,7 +822,7 @@ mod tests { let shard_id = ShardId::new(1); let h = m.handle(); - h.log_write(partition_id, shard_id, SequenceNumber::new(1), 10); + h.log_write(partition_id, shard_id, SequenceNumber::new(1), 10, 1); m.maybe_persist(&persister).await; let stats = m.stats(); @@ -747,7 +837,7 @@ mod tests { // write in data for a new partition so we can be sure it isn't persisted, but the older // one is - h.log_write(PartitionId::new(2), shard_id, SequenceNumber::new(2), 6); + h.log_write(PartitionId::new(2), shard_id, SequenceNumber::new(2), 6, 1); m.maybe_persist(&persister).await; @@ -775,6 +865,7 @@ mod tests { partition_size_threshold: 10, partition_age_threshold: Duration::from_nanos(5), partition_cold_threshold: Duration::from_secs(500), + partition_row_max: 100, }; let TestLifecycleManger { mut m, @@ -786,7 +877,7 @@ mod tests { let shard_id = ShardId::new(1); let h = m.handle(); - h.log_write(partition_id, shard_id, SequenceNumber::new(1), 10); + h.log_write(partition_id, shard_id, SequenceNumber::new(1), 10, 1); m.maybe_persist(&persister).await; let stats = m.stats(); @@ -801,8 +892,8 @@ mod tests { // write in data for a new partition so we can be sure it isn't persisted, but the older // one is - h.log_write(PartitionId::new(2), shard_id, SequenceNumber::new(2), 6); - h.log_write(PartitionId::new(3), shard_id, SequenceNumber::new(3), 7); + h.log_write(PartitionId::new(2), shard_id, SequenceNumber::new(2), 6, 1); + h.log_write(PartitionId::new(3), shard_id, SequenceNumber::new(3), 7, 1); m.maybe_persist(&persister).await; @@ -831,6 +922,7 @@ mod tests { partition_size_threshold: 5, partition_age_threshold: Duration::from_millis(100), partition_cold_threshold: Duration::from_secs(500), + partition_row_max: 100, }; let TestLifecycleManger { mut m, @@ -842,7 +934,7 @@ mod tests { let partition_id = PartitionId::new(1); let persister = Arc::new(TestPersister::default()); - h.log_write(partition_id, shard_id, SequenceNumber::new(1), 4); + h.log_write(partition_id, shard_id, SequenceNumber::new(1), 4, 1); m.maybe_persist(&persister).await; @@ -852,8 +944,8 @@ mod tests { assert!(!persister.persist_called_for(partition_id)); // introduce a new partition under the limit to verify it doesn't get taken with the other - h.log_write(PartitionId::new(2), shard_id, SequenceNumber::new(2), 3); - h.log_write(partition_id, shard_id, SequenceNumber::new(3), 5); + h.log_write(PartitionId::new(2), shard_id, SequenceNumber::new(2), 3, 1); + h.log_write(partition_id, shard_id, SequenceNumber::new(3), 5, 1); m.maybe_persist(&persister).await; @@ -881,6 +973,7 @@ mod tests { partition_size_threshold: 20, partition_age_threshold: Duration::from_millis(1000), partition_cold_threshold: Duration::from_secs(500), + partition_row_max: 100, }; let shard_id = ShardId::new(1); let TestLifecycleManger { @@ -891,8 +984,8 @@ mod tests { let h = m.handle(); let partition_id = PartitionId::new(1); let persister = Arc::new(TestPersister::default()); - h.log_write(partition_id, shard_id, SequenceNumber::new(1), 8); - h.log_write(PartitionId::new(2), shard_id, SequenceNumber::new(2), 13); + h.log_write(partition_id, shard_id, SequenceNumber::new(1), 8, 1); + h.log_write(PartitionId::new(2), shard_id, SequenceNumber::new(2), 13, 1); m.maybe_persist(&persister).await; @@ -908,8 +1001,8 @@ mod tests { ); // add that partition back in over size - h.log_write(partition_id, shard_id, SequenceNumber::new(3), 20); - h.log_write(PartitionId::new(2), shard_id, SequenceNumber::new(4), 21); + h.log_write(partition_id, shard_id, SequenceNumber::new(3), 20, 1); + h.log_write(PartitionId::new(2), shard_id, SequenceNumber::new(4), 21, 1); // both partitions should now need to be persisted to bring us below the mem threshold of // 20. @@ -944,6 +1037,7 @@ mod tests { partition_size_threshold: 5, partition_age_threshold: Duration::from_millis(1000), partition_cold_threshold: Duration::from_secs(500), + partition_row_max: 100, }; let shard_id = ShardId::new(1); let TestLifecycleManger { @@ -953,11 +1047,11 @@ mod tests { } = TestLifecycleManger::new(config); let h = m.handle(); let persister = Arc::new(TestPersister::default()); - h.log_write(PartitionId::new(1), shard_id, SequenceNumber::new(1), 4); + h.log_write(PartitionId::new(1), shard_id, SequenceNumber::new(1), 4, 1); time_provider.inc(Duration::from_nanos(1)); - h.log_write(PartitionId::new(2), shard_id, SequenceNumber::new(2), 6); + h.log_write(PartitionId::new(2), shard_id, SequenceNumber::new(2), 6, 1); time_provider.inc(Duration::from_nanos(1)); - h.log_write(PartitionId::new(3), shard_id, SequenceNumber::new(3), 3); + h.log_write(PartitionId::new(3), shard_id, SequenceNumber::new(3), 3, 1); m.maybe_persist(&persister).await; @@ -985,6 +1079,7 @@ mod tests { partition_size_threshold: 500, partition_age_threshold: Duration::from_secs(1000), partition_cold_threshold: Duration::from_secs(5), + partition_row_max: 100, }; let TestLifecycleManger { mut m, @@ -996,7 +1091,7 @@ mod tests { let persister = Arc::new(TestPersister::default()); let shard_id = ShardId::new(1); - h.log_write(partition_id, shard_id, SequenceNumber::new(1), 10); + h.log_write(partition_id, shard_id, SequenceNumber::new(1), 10, 1); m.maybe_persist(&persister).await; let stats = m.stats(); @@ -1010,7 +1105,7 @@ mod tests { assert!(!persister.persist_called_for(partition_id)); // write in data for a new partition so we can be sure it isn't persisted, but the older one is - h.log_write(PartitionId::new(2), shard_id, SequenceNumber::new(2), 6); + h.log_write(PartitionId::new(2), shard_id, SequenceNumber::new(2), 6, 1); m.maybe_persist(&persister).await; diff --git a/ingester/src/stream_handler/handler.rs b/ingester/src/stream_handler/handler.rs index bdd8cba909..40e34992e7 100644 --- a/ingester/src/stream_handler/handler.rs +++ b/ingester/src/stream_handler/handler.rs @@ -610,7 +610,7 @@ mod tests { let time_provider: Arc = Arc::new(SystemProvider::default()); let lifecycle = LifecycleManager::new( LifecycleConfig::new( - 100, 2, 3, Duration::from_secs(4), Duration::from_secs(5) + 100, 2, 3, Duration::from_secs(4), Duration::from_secs(5), 10000000, ), Arc::clone(&metrics), time_provider, @@ -974,7 +974,14 @@ mod tests { let metrics = Arc::new(metric::Registry::default()); let time_provider = Arc::new(SystemProvider::default()); let lifecycle = LifecycleManager::new( - LifecycleConfig::new(100, 2, 3, Duration::from_secs(4), Duration::from_secs(5)), + LifecycleConfig::new( + 100, + 2, + 3, + Duration::from_secs(4), + Duration::from_secs(5), + 1000000, + ), Arc::clone(&metrics), time_provider, ); @@ -1007,7 +1014,14 @@ mod tests { let metrics = Arc::new(metric::Registry::default()); let time_provider = Arc::new(SystemProvider::default()); let lifecycle = LifecycleManager::new( - LifecycleConfig::new(100, 2, 3, Duration::from_secs(4), Duration::from_secs(5)), + LifecycleConfig::new( + 100, + 2, + 3, + Duration::from_secs(4), + Duration::from_secs(5), + 1000000, + ), Arc::clone(&metrics), time_provider, ); diff --git a/ioxd_ingester/src/lib.rs b/ioxd_ingester/src/lib.rs index b5293eb606..a85f09bdf3 100644 --- a/ioxd_ingester/src/lib.rs +++ b/ioxd_ingester/src/lib.rs @@ -180,6 +180,7 @@ pub async fn create_ingester_server_type( ingester_config.persist_partition_size_threshold_bytes, Duration::from_secs(ingester_config.persist_partition_age_threshold_seconds), Duration::from_secs(ingester_config.persist_partition_cold_threshold_seconds), + ingester_config.persist_partition_rows_max, ); let ingest_handler = Arc::new( IngestHandlerImpl::new( diff --git a/query_tests/src/scenarios/util.rs b/query_tests/src/scenarios/util.rs index 7c83d16bcd..9792f47726 100644 --- a/query_tests/src/scenarios/util.rs +++ b/query_tests/src/scenarios/util.rs @@ -968,6 +968,7 @@ impl LifecycleHandle for NoopLifecycleHandle { _shard_id: ShardId, _sequence_number: SequenceNumber, _bytes_written: usize, + _rows_written: usize, ) -> bool { // do NOT pause ingest false