feat(ingester): restrict partition row count
This limit restricts a single partition to containing at most N rows before it is marked for persistence (note: being marked for persistence does not currently prevent further ingest for that partition.)pull/24376/head
parent
cb10a7c6d8
commit
2a19606456
|
@ -75,6 +75,15 @@ pub struct IngesterConfig {
|
|||
)]
|
||||
pub persist_partition_cold_threshold_seconds: u64,
|
||||
|
||||
/// Trigger persistence of a partition if it contains more than this many rows.
|
||||
#[clap(
|
||||
long = "--persist-partition-max-rows",
|
||||
env = "INFLUXDB_IOX_PERSIST_PARTITION_MAX_ROWS",
|
||||
default_value = "500000",
|
||||
action
|
||||
)]
|
||||
pub persist_partition_rows_max: usize,
|
||||
|
||||
/// If the catalog's max sequence number for the partition is no longer available in the write
|
||||
/// buffer due to the retention policy, by default the ingester will panic. If this flag is
|
||||
/// specified, the ingester will skip any sequence numbers that have not been retained in the
|
||||
|
|
|
@ -407,6 +407,7 @@ impl Config {
|
|||
skip_to_oldest_available,
|
||||
test_flight_do_get_panic: 0,
|
||||
concurrent_request_limit: 10,
|
||||
persist_partition_rows_max: 500_000,
|
||||
};
|
||||
|
||||
// create a CompactorConfig for the all in one server based on
|
||||
|
|
|
@ -937,8 +937,13 @@ impl TableData {
|
|||
}
|
||||
}
|
||||
|
||||
let should_pause =
|
||||
lifecycle_handle.log_write(partition_data.id, shard_id, sequence_number, batch.size());
|
||||
let should_pause = lifecycle_handle.log_write(
|
||||
partition_data.id,
|
||||
shard_id,
|
||||
sequence_number,
|
||||
batch.size(),
|
||||
batch.rows(),
|
||||
);
|
||||
partition_data.buffer_write(sequence_number, batch)?;
|
||||
|
||||
Ok(should_pause)
|
||||
|
@ -1846,6 +1851,7 @@ mod tests {
|
|||
0,
|
||||
Duration::from_secs(1),
|
||||
Duration::from_secs(1),
|
||||
1000000,
|
||||
),
|
||||
metrics,
|
||||
Arc::new(SystemProvider::new()),
|
||||
|
@ -1866,6 +1872,106 @@ mod tests {
|
|||
assert!(should_pause);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn persist_row_count_trigger() {
|
||||
let metrics = Arc::new(metric::Registry::new());
|
||||
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(Arc::clone(&metrics)));
|
||||
let mut repos = catalog.repositories().await;
|
||||
let topic = repos.topics().create_or_get("whatevs").await.unwrap();
|
||||
let query_pool = repos.query_pools().create_or_get("whatevs").await.unwrap();
|
||||
let shard_index = ShardIndex::new(0);
|
||||
let namespace = repos
|
||||
.namespaces()
|
||||
.create("foo", "inf", topic.id, query_pool.id)
|
||||
.await
|
||||
.unwrap();
|
||||
let shard1 = repos
|
||||
.shards()
|
||||
.create_or_get(&topic, shard_index)
|
||||
.await
|
||||
.unwrap();
|
||||
let mut shards = BTreeMap::new();
|
||||
shards.insert(
|
||||
shard1.id,
|
||||
ShardData::new(shard1.shard_index, Arc::clone(&metrics)),
|
||||
);
|
||||
|
||||
let object_store: Arc<DynObjectStore> = Arc::new(InMemory::new());
|
||||
|
||||
let data = Arc::new(IngesterData::new(
|
||||
Arc::clone(&object_store),
|
||||
Arc::clone(&catalog),
|
||||
shards,
|
||||
Arc::new(Executor::new(1)),
|
||||
BackoffConfig::default(),
|
||||
Arc::clone(&metrics),
|
||||
));
|
||||
|
||||
let schema = NamespaceSchema::new(namespace.id, topic.id, query_pool.id);
|
||||
|
||||
let w1 = DmlWrite::new(
|
||||
"foo",
|
||||
lines_to_batches("mem foo=1 10\nmem foo=1 11", 0).unwrap(),
|
||||
Some("1970-01-01".into()),
|
||||
DmlMeta::sequenced(
|
||||
Sequence::new(ShardIndex::new(1), SequenceNumber::new(1)),
|
||||
Time::from_timestamp_millis(42),
|
||||
None,
|
||||
50,
|
||||
),
|
||||
);
|
||||
let _schema = validate_or_insert_schema(w1.tables(), &schema, repos.deref_mut())
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
|
||||
// drop repos so the mem catalog won't deadlock.
|
||||
std::mem::drop(repos);
|
||||
|
||||
let manager = LifecycleManager::new(
|
||||
LifecycleConfig::new(
|
||||
1000000000,
|
||||
0,
|
||||
0,
|
||||
Duration::from_secs(1),
|
||||
Duration::from_secs(1),
|
||||
1, // This row count will be hit
|
||||
),
|
||||
Arc::clone(&metrics),
|
||||
Arc::new(SystemProvider::new()),
|
||||
);
|
||||
|
||||
let should_pause = data
|
||||
.buffer_operation(shard1.id, DmlOperation::Write(w1), &manager.handle())
|
||||
.await
|
||||
.unwrap();
|
||||
// Exceeding the row count doesn't pause ingest (like other partition
|
||||
// limits)
|
||||
assert!(!should_pause);
|
||||
|
||||
let partition_id = {
|
||||
let sd = data.shards.get(&shard1.id).unwrap();
|
||||
let n = sd.namespace("foo").unwrap();
|
||||
let mem_table = n.table_data("mem").unwrap();
|
||||
assert!(n.table_data("mem").is_some());
|
||||
let mem_table = mem_table.write().await;
|
||||
let p = mem_table.partition_data.get(&"1970-01-01".into()).unwrap();
|
||||
p.id
|
||||
};
|
||||
|
||||
data.persist(partition_id).await;
|
||||
|
||||
// verify that a file got put into object store
|
||||
let file_paths: Vec<_> = object_store
|
||||
.list(None)
|
||||
.await
|
||||
.unwrap()
|
||||
.try_collect()
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(file_paths.len(), 1);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn persist() {
|
||||
let metrics = Arc::new(metric::Registry::new());
|
||||
|
@ -1961,7 +2067,14 @@ mod tests {
|
|||
);
|
||||
|
||||
let manager = LifecycleManager::new(
|
||||
LifecycleConfig::new(1, 0, 0, Duration::from_secs(1), Duration::from_secs(1)),
|
||||
LifecycleConfig::new(
|
||||
1,
|
||||
0,
|
||||
0,
|
||||
Duration::from_secs(1),
|
||||
Duration::from_secs(1),
|
||||
1000000,
|
||||
),
|
||||
Arc::clone(&metrics),
|
||||
Arc::new(SystemProvider::new()),
|
||||
);
|
||||
|
@ -2186,7 +2299,14 @@ mod tests {
|
|||
drop(repos); // release catalog transaction
|
||||
|
||||
let manager = LifecycleManager::new(
|
||||
LifecycleConfig::new(1, 0, 0, Duration::from_secs(1), Duration::from_secs(1)),
|
||||
LifecycleConfig::new(
|
||||
1,
|
||||
0,
|
||||
0,
|
||||
Duration::from_secs(1),
|
||||
Duration::from_secs(1),
|
||||
1000000,
|
||||
),
|
||||
metrics,
|
||||
Arc::new(SystemProvider::new()),
|
||||
);
|
||||
|
@ -2596,7 +2716,14 @@ mod tests {
|
|||
std::mem::drop(repos);
|
||||
|
||||
let manager = LifecycleManager::new(
|
||||
LifecycleConfig::new(1, 0, 0, Duration::from_secs(1), Duration::from_secs(1)),
|
||||
LifecycleConfig::new(
|
||||
1,
|
||||
0,
|
||||
0,
|
||||
Duration::from_secs(1),
|
||||
Duration::from_secs(1),
|
||||
1000000,
|
||||
),
|
||||
Arc::clone(&metrics),
|
||||
Arc::new(SystemProvider::new()),
|
||||
);
|
||||
|
@ -2715,6 +2842,7 @@ mod tests {
|
|||
0,
|
||||
Duration::from_secs(1),
|
||||
Duration::from_secs(1),
|
||||
1000000,
|
||||
),
|
||||
metrics,
|
||||
Arc::new(SystemProvider::new()),
|
||||
|
|
|
@ -672,6 +672,7 @@ mod tests {
|
|||
1000,
|
||||
Duration::from_secs(10),
|
||||
Duration::from_secs(10),
|
||||
100000000,
|
||||
);
|
||||
let ingester = IngestHandlerImpl::new(
|
||||
lifecycle_config,
|
||||
|
@ -944,6 +945,7 @@ mod tests {
|
|||
1000,
|
||||
Duration::from_secs(10),
|
||||
Duration::from_secs(10),
|
||||
10000000,
|
||||
);
|
||||
let ingester = IngestHandlerImpl::new(
|
||||
lifecycle_config,
|
||||
|
|
|
@ -30,6 +30,7 @@ pub trait LifecycleHandle: Send + Sync + 'static {
|
|||
shard_id: ShardId,
|
||||
sequence_number: SequenceNumber,
|
||||
bytes_written: usize,
|
||||
rows_written: usize,
|
||||
) -> bool;
|
||||
|
||||
/// Returns true if the `total_bytes` tracked by the manager is less than the pause amount.
|
||||
|
@ -59,6 +60,7 @@ impl LifecycleHandle for LifecycleHandleImpl {
|
|||
shard_id: ShardId,
|
||||
sequence_number: SequenceNumber,
|
||||
bytes_written: usize,
|
||||
rows_written: usize,
|
||||
) -> bool {
|
||||
let mut s = self.state.lock();
|
||||
let now = self.time_provider.now();
|
||||
|
@ -72,14 +74,18 @@ impl LifecycleHandle for LifecycleHandleImpl {
|
|||
first_write: now,
|
||||
last_write: now,
|
||||
bytes_written: 0,
|
||||
rows_written: 0,
|
||||
first_sequence_number: sequence_number,
|
||||
});
|
||||
|
||||
stats.bytes_written += bytes_written;
|
||||
stats.last_write = now;
|
||||
stats.rows_written += rows_written;
|
||||
|
||||
s.total_bytes += bytes_written;
|
||||
s.total_bytes > self.config.pause_ingest_size
|
||||
|
||||
// Pause if the server has exceeded the configured memory limit.
|
||||
s.total_bytes >= self.config.pause_ingest_size
|
||||
}
|
||||
|
||||
fn can_resume_ingest(&self) -> bool {
|
||||
|
@ -124,6 +130,9 @@ pub struct LifecycleManager {
|
|||
persist_age_counter: U64Counter,
|
||||
/// Counter for a partition going cold for writes triggering a persist.
|
||||
persist_cold_counter: U64Counter,
|
||||
/// Counter tracking the number of times a partition has been evicted for
|
||||
/// containing too many rows.
|
||||
persist_rows_counter: U64Counter,
|
||||
}
|
||||
|
||||
/// The configuration options for the lifecycle on the ingester.
|
||||
|
@ -153,6 +162,13 @@ pub struct LifecycleConfig {
|
|||
/// manager will persist it. This is to ensure that cold partitions get cleared out to make
|
||||
/// room for partitions that are actively receiving writes.
|
||||
partition_cold_threshold: Duration,
|
||||
|
||||
/// The maximum number of rows allowed within a single partition before it
|
||||
/// MUST be persisted.
|
||||
///
|
||||
/// Reaching this limit pauses ingest while the partition is flushed to
|
||||
/// object storage.
|
||||
partition_row_max: usize,
|
||||
}
|
||||
|
||||
impl LifecycleConfig {
|
||||
|
@ -164,6 +180,7 @@ impl LifecycleConfig {
|
|||
partition_size_threshold: usize,
|
||||
partition_age_threshold: Duration,
|
||||
partition_cold_threshold: Duration,
|
||||
partition_row_max: usize,
|
||||
) -> Self {
|
||||
// this must be true to ensure that persistence will get triggered, freeing up memory
|
||||
assert!(pause_ingest_size > persist_memory_threshold);
|
||||
|
@ -174,6 +191,7 @@ impl LifecycleConfig {
|
|||
partition_size_threshold,
|
||||
partition_age_threshold,
|
||||
partition_cold_threshold,
|
||||
partition_row_max,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -215,6 +233,9 @@ struct PartitionLifecycleStats {
|
|||
last_write: Time,
|
||||
/// The number of bytes in the partition as estimated by the mutable batch sizes.
|
||||
bytes_written: usize,
|
||||
/// The number of rows in the partition as estimated by the mutable batch
|
||||
/// sizes + snapshots.
|
||||
rows_written: usize,
|
||||
/// The sequence number the partition received on its first write. This is reset anytime
|
||||
/// the partition is persisted.
|
||||
first_sequence_number: SequenceNumber,
|
||||
|
@ -237,6 +258,7 @@ impl LifecycleManager {
|
|||
let persist_size_counter = persist_counter.recorder(&[("trigger", "size")]);
|
||||
let persist_age_counter = persist_counter.recorder(&[("trigger", "age")]);
|
||||
let persist_cold_counter = persist_counter.recorder(&[("trigger", "cold")]);
|
||||
let persist_rows_counter = persist_counter.recorder(&[("trigger", "rows")]);
|
||||
|
||||
let job_registry = Arc::new(JobRegistry::new(
|
||||
metric_registry,
|
||||
|
@ -251,6 +273,7 @@ impl LifecycleManager {
|
|||
persist_size_counter,
|
||||
persist_age_counter,
|
||||
persist_cold_counter,
|
||||
persist_rows_counter,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -296,6 +319,11 @@ impl LifecycleManager {
|
|||
self.persist_cold_counter.inc(1);
|
||||
}
|
||||
|
||||
let exceeded_max_rows = s.rows_written >= self.config.partition_row_max;
|
||||
if exceeded_max_rows {
|
||||
self.persist_rows_counter.inc(1);
|
||||
}
|
||||
|
||||
let sized_out = s.bytes_written > self.config.partition_size_threshold;
|
||||
if sized_out {
|
||||
self.persist_size_counter.inc(1);
|
||||
|
@ -306,7 +334,7 @@ impl LifecycleManager {
|
|||
"Partition is over size threshold, persisting");
|
||||
}
|
||||
|
||||
aged_out || sized_out || is_cold
|
||||
aged_out || sized_out || is_cold || exceeded_max_rows
|
||||
});
|
||||
|
||||
// keep track of what we'll be evicting to see what else to drop
|
||||
|
@ -606,6 +634,7 @@ mod tests {
|
|||
partition_size_threshold: 5,
|
||||
partition_age_threshold: Duration::from_nanos(0),
|
||||
partition_cold_threshold: Duration::from_secs(500),
|
||||
partition_row_max: 100,
|
||||
};
|
||||
let TestLifecycleManger {
|
||||
m, time_provider, ..
|
||||
|
@ -614,25 +643,27 @@ mod tests {
|
|||
let h = m.handle();
|
||||
|
||||
// log first two writes at different times
|
||||
assert!(!h.log_write(PartitionId::new(1), shard_id, SequenceNumber::new(1), 1));
|
||||
assert!(!h.log_write(PartitionId::new(1), shard_id, SequenceNumber::new(1), 1, 1));
|
||||
time_provider.inc(Duration::from_nanos(10));
|
||||
assert!(!h.log_write(PartitionId::new(1), shard_id, SequenceNumber::new(2), 1));
|
||||
assert!(!h.log_write(PartitionId::new(1), shard_id, SequenceNumber::new(2), 1, 1));
|
||||
|
||||
// log another write for different partition using a different handle
|
||||
assert!(!m
|
||||
.handle()
|
||||
.log_write(PartitionId::new(2), shard_id, SequenceNumber::new(3), 3));
|
||||
.log_write(PartitionId::new(2), shard_id, SequenceNumber::new(3), 3, 3));
|
||||
|
||||
let stats = m.stats();
|
||||
assert_eq!(stats.total_bytes, 5);
|
||||
|
||||
let p1 = stats.partition_stats.get(0).unwrap();
|
||||
assert_eq!(p1.bytes_written, 2);
|
||||
assert_eq!(p1.rows_written, 2);
|
||||
assert_eq!(p1.partition_id, PartitionId::new(1));
|
||||
assert_eq!(p1.first_write, Time::from_timestamp_nanos(0));
|
||||
|
||||
let p2 = stats.partition_stats.get(1).unwrap();
|
||||
assert_eq!(p2.bytes_written, 3);
|
||||
assert_eq!(p2.rows_written, 3);
|
||||
assert_eq!(p2.partition_id, PartitionId::new(2));
|
||||
assert_eq!(p2.first_write, Time::from_timestamp_nanos(10));
|
||||
}
|
||||
|
@ -640,11 +671,12 @@ mod tests {
|
|||
#[tokio::test]
|
||||
async fn pausing_and_resuming_ingest() {
|
||||
let config = LifecycleConfig {
|
||||
pause_ingest_size: 20,
|
||||
pause_ingest_size: 10,
|
||||
persist_memory_threshold: 10,
|
||||
partition_size_threshold: 5,
|
||||
partition_age_threshold: Duration::from_nanos(0),
|
||||
partition_cold_threshold: Duration::from_secs(500),
|
||||
partition_row_max: 100,
|
||||
};
|
||||
let partition_id = PartitionId::new(1);
|
||||
let TestLifecycleManger { mut m, .. } = TestLifecycleManger::new(config);
|
||||
|
@ -652,10 +684,11 @@ mod tests {
|
|||
let h = m.handle();
|
||||
|
||||
// write more than the limit (10)
|
||||
assert!(!h.log_write(partition_id, shard_id, SequenceNumber::new(1), 15));
|
||||
assert!(h.log_write(partition_id, shard_id, SequenceNumber::new(1), 15, 1));
|
||||
assert!(!h.can_resume_ingest());
|
||||
|
||||
// all subsequent writes should also indicate a pause
|
||||
assert!(h.log_write(partition_id, shard_id, SequenceNumber::new(2), 10));
|
||||
assert!(h.log_write(partition_id, shard_id, SequenceNumber::new(2), 1, 1));
|
||||
assert!(!h.can_resume_ingest());
|
||||
|
||||
// persist the partition
|
||||
|
@ -664,7 +697,62 @@ mod tests {
|
|||
|
||||
// ingest can resume
|
||||
assert!(h.can_resume_ingest());
|
||||
assert!(!h.log_write(partition_id, shard_id, SequenceNumber::new(3), 3));
|
||||
assert!(!h.log_write(partition_id, shard_id, SequenceNumber::new(3), 3, 1));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn pausing_and_resuming_ingest_row_count_allowed() {
|
||||
let config = LifecycleConfig {
|
||||
pause_ingest_size: 20,
|
||||
persist_memory_threshold: 10,
|
||||
partition_size_threshold: 5,
|
||||
partition_age_threshold: Duration::from_nanos(0),
|
||||
partition_cold_threshold: Duration::from_secs(500),
|
||||
partition_row_max: 10,
|
||||
};
|
||||
let partition_id = PartitionId::new(1);
|
||||
let TestLifecycleManger { mut m, .. } = TestLifecycleManger::new(config);
|
||||
let shard_id = ShardId::new(1);
|
||||
let h = m.handle();
|
||||
|
||||
// No stats exist before writing.
|
||||
{
|
||||
let s = m.state.lock();
|
||||
assert!(s.partition_stats.get(&partition_id).is_none());
|
||||
}
|
||||
|
||||
// write more than the limit (10) and don't get stopped, because the
|
||||
// per-partition limit does not pause the server from ingesting.
|
||||
assert!(!h.log_write(partition_id, shard_id, SequenceNumber::new(1), 1, 50));
|
||||
assert!(h.can_resume_ingest());
|
||||
|
||||
// Rows were counted
|
||||
{
|
||||
let s = m.state.lock();
|
||||
let stats = s
|
||||
.partition_stats
|
||||
.get(&partition_id)
|
||||
.expect("should have partition stats");
|
||||
assert_eq!(stats.rows_written, 50);
|
||||
}
|
||||
|
||||
// all subsequent writes should also be allowed
|
||||
assert!(!h.log_write(partition_id, shard_id, SequenceNumber::new(2), 1, 1));
|
||||
assert!(h.can_resume_ingest());
|
||||
|
||||
// persist the partition
|
||||
let persister = Arc::new(TestPersister::default());
|
||||
m.maybe_persist(&persister).await;
|
||||
|
||||
// Stats were reset
|
||||
{
|
||||
let s = m.state.lock();
|
||||
assert!(s.partition_stats.get(&partition_id).is_none());
|
||||
}
|
||||
|
||||
// ingest can continue
|
||||
assert!(h.can_resume_ingest());
|
||||
assert!(!h.log_write(partition_id, shard_id, SequenceNumber::new(3), 1, 1));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
@ -675,6 +763,7 @@ mod tests {
|
|||
partition_size_threshold: 5,
|
||||
partition_age_threshold: Duration::from_nanos(0),
|
||||
partition_cold_threshold: Duration::from_secs(500),
|
||||
partition_row_max: 100,
|
||||
};
|
||||
let partition_id = PartitionId::new(1);
|
||||
let TestLifecycleManger { mut m, .. } = TestLifecycleManger::new(config);
|
||||
|
@ -682,7 +771,7 @@ mod tests {
|
|||
let h = m.handle();
|
||||
|
||||
// write more than the limit (20)
|
||||
h.log_write(partition_id, shard_id, SequenceNumber::new(1), 25);
|
||||
h.log_write(partition_id, shard_id, SequenceNumber::new(1), 25, 1);
|
||||
|
||||
// can not resume ingest as we are overall the pause ingest limit
|
||||
assert!(!h.can_resume_ingest());
|
||||
|
@ -710,7 +799,7 @@ mod tests {
|
|||
|
||||
// ingest can resume
|
||||
assert!(h.can_resume_ingest());
|
||||
assert!(!h.log_write(partition_id, shard_id, SequenceNumber::new(2), 3));
|
||||
assert!(!h.log_write(partition_id, shard_id, SequenceNumber::new(2), 3, 1));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
@ -721,6 +810,7 @@ mod tests {
|
|||
partition_size_threshold: 10,
|
||||
partition_age_threshold: Duration::from_nanos(5),
|
||||
partition_cold_threshold: Duration::from_secs(500),
|
||||
partition_row_max: 100,
|
||||
};
|
||||
let TestLifecycleManger {
|
||||
mut m,
|
||||
|
@ -732,7 +822,7 @@ mod tests {
|
|||
let shard_id = ShardId::new(1);
|
||||
let h = m.handle();
|
||||
|
||||
h.log_write(partition_id, shard_id, SequenceNumber::new(1), 10);
|
||||
h.log_write(partition_id, shard_id, SequenceNumber::new(1), 10, 1);
|
||||
|
||||
m.maybe_persist(&persister).await;
|
||||
let stats = m.stats();
|
||||
|
@ -747,7 +837,7 @@ mod tests {
|
|||
|
||||
// write in data for a new partition so we can be sure it isn't persisted, but the older
|
||||
// one is
|
||||
h.log_write(PartitionId::new(2), shard_id, SequenceNumber::new(2), 6);
|
||||
h.log_write(PartitionId::new(2), shard_id, SequenceNumber::new(2), 6, 1);
|
||||
|
||||
m.maybe_persist(&persister).await;
|
||||
|
||||
|
@ -775,6 +865,7 @@ mod tests {
|
|||
partition_size_threshold: 10,
|
||||
partition_age_threshold: Duration::from_nanos(5),
|
||||
partition_cold_threshold: Duration::from_secs(500),
|
||||
partition_row_max: 100,
|
||||
};
|
||||
let TestLifecycleManger {
|
||||
mut m,
|
||||
|
@ -786,7 +877,7 @@ mod tests {
|
|||
let shard_id = ShardId::new(1);
|
||||
let h = m.handle();
|
||||
|
||||
h.log_write(partition_id, shard_id, SequenceNumber::new(1), 10);
|
||||
h.log_write(partition_id, shard_id, SequenceNumber::new(1), 10, 1);
|
||||
|
||||
m.maybe_persist(&persister).await;
|
||||
let stats = m.stats();
|
||||
|
@ -801,8 +892,8 @@ mod tests {
|
|||
|
||||
// write in data for a new partition so we can be sure it isn't persisted, but the older
|
||||
// one is
|
||||
h.log_write(PartitionId::new(2), shard_id, SequenceNumber::new(2), 6);
|
||||
h.log_write(PartitionId::new(3), shard_id, SequenceNumber::new(3), 7);
|
||||
h.log_write(PartitionId::new(2), shard_id, SequenceNumber::new(2), 6, 1);
|
||||
h.log_write(PartitionId::new(3), shard_id, SequenceNumber::new(3), 7, 1);
|
||||
|
||||
m.maybe_persist(&persister).await;
|
||||
|
||||
|
@ -831,6 +922,7 @@ mod tests {
|
|||
partition_size_threshold: 5,
|
||||
partition_age_threshold: Duration::from_millis(100),
|
||||
partition_cold_threshold: Duration::from_secs(500),
|
||||
partition_row_max: 100,
|
||||
};
|
||||
let TestLifecycleManger {
|
||||
mut m,
|
||||
|
@ -842,7 +934,7 @@ mod tests {
|
|||
|
||||
let partition_id = PartitionId::new(1);
|
||||
let persister = Arc::new(TestPersister::default());
|
||||
h.log_write(partition_id, shard_id, SequenceNumber::new(1), 4);
|
||||
h.log_write(partition_id, shard_id, SequenceNumber::new(1), 4, 1);
|
||||
|
||||
m.maybe_persist(&persister).await;
|
||||
|
||||
|
@ -852,8 +944,8 @@ mod tests {
|
|||
assert!(!persister.persist_called_for(partition_id));
|
||||
|
||||
// introduce a new partition under the limit to verify it doesn't get taken with the other
|
||||
h.log_write(PartitionId::new(2), shard_id, SequenceNumber::new(2), 3);
|
||||
h.log_write(partition_id, shard_id, SequenceNumber::new(3), 5);
|
||||
h.log_write(PartitionId::new(2), shard_id, SequenceNumber::new(2), 3, 1);
|
||||
h.log_write(partition_id, shard_id, SequenceNumber::new(3), 5, 1);
|
||||
|
||||
m.maybe_persist(&persister).await;
|
||||
|
||||
|
@ -881,6 +973,7 @@ mod tests {
|
|||
partition_size_threshold: 20,
|
||||
partition_age_threshold: Duration::from_millis(1000),
|
||||
partition_cold_threshold: Duration::from_secs(500),
|
||||
partition_row_max: 100,
|
||||
};
|
||||
let shard_id = ShardId::new(1);
|
||||
let TestLifecycleManger {
|
||||
|
@ -891,8 +984,8 @@ mod tests {
|
|||
let h = m.handle();
|
||||
let partition_id = PartitionId::new(1);
|
||||
let persister = Arc::new(TestPersister::default());
|
||||
h.log_write(partition_id, shard_id, SequenceNumber::new(1), 8);
|
||||
h.log_write(PartitionId::new(2), shard_id, SequenceNumber::new(2), 13);
|
||||
h.log_write(partition_id, shard_id, SequenceNumber::new(1), 8, 1);
|
||||
h.log_write(PartitionId::new(2), shard_id, SequenceNumber::new(2), 13, 1);
|
||||
|
||||
m.maybe_persist(&persister).await;
|
||||
|
||||
|
@ -908,8 +1001,8 @@ mod tests {
|
|||
);
|
||||
|
||||
// add that partition back in over size
|
||||
h.log_write(partition_id, shard_id, SequenceNumber::new(3), 20);
|
||||
h.log_write(PartitionId::new(2), shard_id, SequenceNumber::new(4), 21);
|
||||
h.log_write(partition_id, shard_id, SequenceNumber::new(3), 20, 1);
|
||||
h.log_write(PartitionId::new(2), shard_id, SequenceNumber::new(4), 21, 1);
|
||||
|
||||
// both partitions should now need to be persisted to bring us below the mem threshold of
|
||||
// 20.
|
||||
|
@ -944,6 +1037,7 @@ mod tests {
|
|||
partition_size_threshold: 5,
|
||||
partition_age_threshold: Duration::from_millis(1000),
|
||||
partition_cold_threshold: Duration::from_secs(500),
|
||||
partition_row_max: 100,
|
||||
};
|
||||
let shard_id = ShardId::new(1);
|
||||
let TestLifecycleManger {
|
||||
|
@ -953,11 +1047,11 @@ mod tests {
|
|||
} = TestLifecycleManger::new(config);
|
||||
let h = m.handle();
|
||||
let persister = Arc::new(TestPersister::default());
|
||||
h.log_write(PartitionId::new(1), shard_id, SequenceNumber::new(1), 4);
|
||||
h.log_write(PartitionId::new(1), shard_id, SequenceNumber::new(1), 4, 1);
|
||||
time_provider.inc(Duration::from_nanos(1));
|
||||
h.log_write(PartitionId::new(2), shard_id, SequenceNumber::new(2), 6);
|
||||
h.log_write(PartitionId::new(2), shard_id, SequenceNumber::new(2), 6, 1);
|
||||
time_provider.inc(Duration::from_nanos(1));
|
||||
h.log_write(PartitionId::new(3), shard_id, SequenceNumber::new(3), 3);
|
||||
h.log_write(PartitionId::new(3), shard_id, SequenceNumber::new(3), 3, 1);
|
||||
|
||||
m.maybe_persist(&persister).await;
|
||||
|
||||
|
@ -985,6 +1079,7 @@ mod tests {
|
|||
partition_size_threshold: 500,
|
||||
partition_age_threshold: Duration::from_secs(1000),
|
||||
partition_cold_threshold: Duration::from_secs(5),
|
||||
partition_row_max: 100,
|
||||
};
|
||||
let TestLifecycleManger {
|
||||
mut m,
|
||||
|
@ -996,7 +1091,7 @@ mod tests {
|
|||
let persister = Arc::new(TestPersister::default());
|
||||
let shard_id = ShardId::new(1);
|
||||
|
||||
h.log_write(partition_id, shard_id, SequenceNumber::new(1), 10);
|
||||
h.log_write(partition_id, shard_id, SequenceNumber::new(1), 10, 1);
|
||||
|
||||
m.maybe_persist(&persister).await;
|
||||
let stats = m.stats();
|
||||
|
@ -1010,7 +1105,7 @@ mod tests {
|
|||
assert!(!persister.persist_called_for(partition_id));
|
||||
|
||||
// write in data for a new partition so we can be sure it isn't persisted, but the older one is
|
||||
h.log_write(PartitionId::new(2), shard_id, SequenceNumber::new(2), 6);
|
||||
h.log_write(PartitionId::new(2), shard_id, SequenceNumber::new(2), 6, 1);
|
||||
|
||||
m.maybe_persist(&persister).await;
|
||||
|
||||
|
|
|
@ -610,7 +610,7 @@ mod tests {
|
|||
let time_provider: Arc<dyn TimeProvider> = Arc::new(SystemProvider::default());
|
||||
let lifecycle = LifecycleManager::new(
|
||||
LifecycleConfig::new(
|
||||
100, 2, 3, Duration::from_secs(4), Duration::from_secs(5)
|
||||
100, 2, 3, Duration::from_secs(4), Duration::from_secs(5), 10000000,
|
||||
),
|
||||
Arc::clone(&metrics),
|
||||
time_provider,
|
||||
|
@ -974,7 +974,14 @@ mod tests {
|
|||
let metrics = Arc::new(metric::Registry::default());
|
||||
let time_provider = Arc::new(SystemProvider::default());
|
||||
let lifecycle = LifecycleManager::new(
|
||||
LifecycleConfig::new(100, 2, 3, Duration::from_secs(4), Duration::from_secs(5)),
|
||||
LifecycleConfig::new(
|
||||
100,
|
||||
2,
|
||||
3,
|
||||
Duration::from_secs(4),
|
||||
Duration::from_secs(5),
|
||||
1000000,
|
||||
),
|
||||
Arc::clone(&metrics),
|
||||
time_provider,
|
||||
);
|
||||
|
@ -1007,7 +1014,14 @@ mod tests {
|
|||
let metrics = Arc::new(metric::Registry::default());
|
||||
let time_provider = Arc::new(SystemProvider::default());
|
||||
let lifecycle = LifecycleManager::new(
|
||||
LifecycleConfig::new(100, 2, 3, Duration::from_secs(4), Duration::from_secs(5)),
|
||||
LifecycleConfig::new(
|
||||
100,
|
||||
2,
|
||||
3,
|
||||
Duration::from_secs(4),
|
||||
Duration::from_secs(5),
|
||||
1000000,
|
||||
),
|
||||
Arc::clone(&metrics),
|
||||
time_provider,
|
||||
);
|
||||
|
|
|
@ -180,6 +180,7 @@ pub async fn create_ingester_server_type(
|
|||
ingester_config.persist_partition_size_threshold_bytes,
|
||||
Duration::from_secs(ingester_config.persist_partition_age_threshold_seconds),
|
||||
Duration::from_secs(ingester_config.persist_partition_cold_threshold_seconds),
|
||||
ingester_config.persist_partition_rows_max,
|
||||
);
|
||||
let ingest_handler = Arc::new(
|
||||
IngestHandlerImpl::new(
|
||||
|
|
|
@ -968,6 +968,7 @@ impl LifecycleHandle for NoopLifecycleHandle {
|
|||
_shard_id: ShardId,
|
||||
_sequence_number: SequenceNumber,
|
||||
_bytes_written: usize,
|
||||
_rows_written: usize,
|
||||
) -> bool {
|
||||
// do NOT pause ingest
|
||||
false
|
||||
|
|
Loading…
Reference in New Issue