parent
078d5f7cfb
commit
e41fd2a821
|
@ -130,10 +130,6 @@ pub struct LifecycleRules {
|
||||||
/// Represents the chunk transition open -> moving and closed -> moving
|
/// Represents the chunk transition open -> moving and closed -> moving
|
||||||
pub mutable_linger_seconds: NonZeroU32,
|
pub mutable_linger_seconds: NonZeroU32,
|
||||||
|
|
||||||
/// A chunk of data within a partition is guaranteed to remain mutable
|
|
||||||
/// for at least this number of seconds unless it exceeds the mutable_size_threshold
|
|
||||||
pub mutable_minimum_age_seconds: Option<NonZeroU32>,
|
|
||||||
|
|
||||||
/// Once the total amount of buffered data in memory reaches this size start
|
/// Once the total amount of buffered data in memory reaches this size start
|
||||||
/// dropping data from memory
|
/// dropping data from memory
|
||||||
pub buffer_size_soft: Option<NonZeroUsize>,
|
pub buffer_size_soft: Option<NonZeroUsize>,
|
||||||
|
@ -180,7 +176,6 @@ impl Default for LifecycleRules {
|
||||||
fn default() -> Self {
|
fn default() -> Self {
|
||||||
Self {
|
Self {
|
||||||
mutable_linger_seconds: NonZeroU32::new(DEFAULT_MUTABLE_LINGER_SECONDS).unwrap(),
|
mutable_linger_seconds: NonZeroU32::new(DEFAULT_MUTABLE_LINGER_SECONDS).unwrap(),
|
||||||
mutable_minimum_age_seconds: None,
|
|
||||||
buffer_size_soft: None,
|
buffer_size_soft: None,
|
||||||
buffer_size_hard: None,
|
buffer_size_hard: None,
|
||||||
drop_non_persisted: false,
|
drop_non_persisted: false,
|
||||||
|
|
|
@ -39,10 +39,6 @@ message LifecycleRules {
|
||||||
// Represents the chunk transition open -> moving and closed -> moving
|
// Represents the chunk transition open -> moving and closed -> moving
|
||||||
uint32 mutable_linger_seconds = 1;
|
uint32 mutable_linger_seconds = 1;
|
||||||
|
|
||||||
// A chunk of data within a partition is guaranteed to remain mutable
|
|
||||||
// for at least this number of seconds unless it exceeds the mutable_size_threshold
|
|
||||||
uint32 mutable_minimum_age_seconds = 2;
|
|
||||||
|
|
||||||
// Once the total amount of buffered data in memory reaches this size start
|
// Once the total amount of buffered data in memory reaches this size start
|
||||||
// dropping data from memory
|
// dropping data from memory
|
||||||
uint64 buffer_size_soft = 4;
|
uint64 buffer_size_soft = 4;
|
||||||
|
|
|
@ -15,10 +15,6 @@ impl From<LifecycleRules> for management::LifecycleRules {
|
||||||
fn from(config: LifecycleRules) -> Self {
|
fn from(config: LifecycleRules) -> Self {
|
||||||
Self {
|
Self {
|
||||||
mutable_linger_seconds: config.mutable_linger_seconds.get(),
|
mutable_linger_seconds: config.mutable_linger_seconds.get(),
|
||||||
mutable_minimum_age_seconds: config
|
|
||||||
.mutable_minimum_age_seconds
|
|
||||||
.map(Into::into)
|
|
||||||
.unwrap_or_default(),
|
|
||||||
buffer_size_soft: config
|
buffer_size_soft: config
|
||||||
.buffer_size_soft
|
.buffer_size_soft
|
||||||
.map(|x| x.get() as u64)
|
.map(|x| x.get() as u64)
|
||||||
|
@ -52,7 +48,6 @@ impl TryFrom<management::LifecycleRules> for LifecycleRules {
|
||||||
proto.mutable_linger_seconds
|
proto.mutable_linger_seconds
|
||||||
})
|
})
|
||||||
.unwrap(),
|
.unwrap(),
|
||||||
mutable_minimum_age_seconds: proto.mutable_minimum_age_seconds.try_into().ok(),
|
|
||||||
buffer_size_soft: (proto.buffer_size_soft as usize).try_into().ok(),
|
buffer_size_soft: (proto.buffer_size_soft as usize).try_into().ok(),
|
||||||
buffer_size_hard: (proto.buffer_size_hard as usize).try_into().ok(),
|
buffer_size_hard: (proto.buffer_size_hard as usize).try_into().ok(),
|
||||||
drop_non_persisted: proto.drop_non_persisted,
|
drop_non_persisted: proto.drop_non_persisted,
|
||||||
|
@ -86,7 +81,6 @@ mod tests {
|
||||||
fn lifecycle_rules() {
|
fn lifecycle_rules() {
|
||||||
let protobuf = management::LifecycleRules {
|
let protobuf = management::LifecycleRules {
|
||||||
mutable_linger_seconds: 123,
|
mutable_linger_seconds: 123,
|
||||||
mutable_minimum_age_seconds: 5345,
|
|
||||||
buffer_size_soft: 353,
|
buffer_size_soft: 353,
|
||||||
buffer_size_hard: 232,
|
buffer_size_hard: 232,
|
||||||
drop_non_persisted: true,
|
drop_non_persisted: true,
|
||||||
|
@ -106,10 +100,6 @@ mod tests {
|
||||||
config.mutable_linger_seconds.get(),
|
config.mutable_linger_seconds.get(),
|
||||||
protobuf.mutable_linger_seconds
|
protobuf.mutable_linger_seconds
|
||||||
);
|
);
|
||||||
assert_eq!(
|
|
||||||
config.mutable_minimum_age_seconds.unwrap().get(),
|
|
||||||
protobuf.mutable_minimum_age_seconds
|
|
||||||
);
|
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
config.buffer_size_soft.unwrap().get(),
|
config.buffer_size_soft.unwrap().get(),
|
||||||
protobuf.buffer_size_soft as usize
|
protobuf.buffer_size_soft as usize
|
||||||
|
@ -122,10 +112,6 @@ mod tests {
|
||||||
assert_eq!(config.immutable, protobuf.immutable);
|
assert_eq!(config.immutable, protobuf.immutable);
|
||||||
|
|
||||||
assert_eq!(back.mutable_linger_seconds, protobuf.mutable_linger_seconds);
|
assert_eq!(back.mutable_linger_seconds, protobuf.mutable_linger_seconds);
|
||||||
assert_eq!(
|
|
||||||
back.mutable_minimum_age_seconds,
|
|
||||||
protobuf.mutable_minimum_age_seconds
|
|
||||||
);
|
|
||||||
assert_eq!(back.buffer_size_soft, protobuf.buffer_size_soft);
|
assert_eq!(back.buffer_size_soft, protobuf.buffer_size_soft);
|
||||||
assert_eq!(back.buffer_size_hard, protobuf.buffer_size_hard);
|
assert_eq!(back.buffer_size_hard, protobuf.buffer_size_hard);
|
||||||
assert_eq!(back.drop_non_persisted, protobuf.drop_non_persisted);
|
assert_eq!(back.drop_non_persisted, protobuf.drop_non_persisted);
|
||||||
|
|
|
@ -484,18 +484,7 @@ fn can_move<C: LifecycleChunk>(rules: &LifecycleRules, chunk: &C, now: DateTime<
|
||||||
Some(last_write)
|
Some(last_write)
|
||||||
if elapsed_seconds(now, last_write) >= rules.mutable_linger_seconds.get() =>
|
if elapsed_seconds(now, last_write) >= rules.mutable_linger_seconds.get() =>
|
||||||
{
|
{
|
||||||
match (
|
true
|
||||||
rules.mutable_minimum_age_seconds,
|
|
||||||
chunk.time_of_first_write(),
|
|
||||||
) {
|
|
||||||
(Some(min_age), Some(first_write)) => {
|
|
||||||
// Chunk can be moved if it is old enough
|
|
||||||
elapsed_seconds(now, first_write) >= min_age.get()
|
|
||||||
}
|
|
||||||
// If no minimum age set - permit chunk movement
|
|
||||||
(None, _) => true,
|
|
||||||
(_, None) => unreachable!("chunk with last write and no first write"),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Disable movement the chunk is empty, or the linger hasn't expired
|
// Disable movement the chunk is empty, or the linger hasn't expired
|
||||||
|
@ -901,33 +890,6 @@ mod tests {
|
||||||
.with_row_count(DEFAULT_MUB_ROW_THRESHOLD - 1);
|
.with_row_count(DEFAULT_MUB_ROW_THRESHOLD - 1);
|
||||||
assert!(can_move(&rules, &chunk, from_secs(11)));
|
assert!(can_move(&rules, &chunk, from_secs(11)));
|
||||||
|
|
||||||
// If mutable_minimum_age_seconds set must also take this into account
|
|
||||||
let rules = LifecycleRules {
|
|
||||||
mutable_linger_seconds: NonZeroU32::new(10).unwrap(),
|
|
||||||
mutable_minimum_age_seconds: Some(NonZeroU32::new(60).unwrap()),
|
|
||||||
..Default::default()
|
|
||||||
};
|
|
||||||
let chunk = TestChunk::new(0, Some(0), Some(0), ChunkStorage::OpenMutableBuffer);
|
|
||||||
assert!(!can_move(&rules, &chunk, from_secs(9)));
|
|
||||||
assert!(!can_move(&rules, &chunk, from_secs(11)));
|
|
||||||
assert!(can_move(&rules, &chunk, from_secs(61)));
|
|
||||||
|
|
||||||
let chunk = TestChunk::new(0, Some(0), Some(0), ChunkStorage::OpenMutableBuffer)
|
|
||||||
.with_row_count(DEFAULT_MUB_ROW_THRESHOLD - 1);
|
|
||||||
assert!(!can_move(&rules, &chunk, from_secs(9)));
|
|
||||||
assert!(!can_move(&rules, &chunk, from_secs(11)));
|
|
||||||
assert!(can_move(&rules, &chunk, from_secs(61)));
|
|
||||||
|
|
||||||
let chunk = TestChunk::new(0, Some(0), Some(0), ChunkStorage::OpenMutableBuffer)
|
|
||||||
.with_row_count(DEFAULT_MUB_ROW_THRESHOLD + 1);
|
|
||||||
assert!(can_move(&rules, &chunk, from_secs(9)));
|
|
||||||
assert!(can_move(&rules, &chunk, from_secs(11)));
|
|
||||||
assert!(can_move(&rules, &chunk, from_secs(61)));
|
|
||||||
|
|
||||||
let chunk = TestChunk::new(0, Some(0), Some(70), ChunkStorage::OpenMutableBuffer);
|
|
||||||
assert!(!can_move(&rules, &chunk, from_secs(71)));
|
|
||||||
assert!(can_move(&rules, &chunk, from_secs(81)));
|
|
||||||
|
|
||||||
// If over the default row count threshold, we should be able to move
|
// If over the default row count threshold, we should be able to move
|
||||||
let chunk = TestChunk::new(0, None, None, ChunkStorage::OpenMutableBuffer)
|
let chunk = TestChunk::new(0, None, None, ChunkStorage::OpenMutableBuffer)
|
||||||
.with_row_count(DEFAULT_MUB_ROW_THRESHOLD);
|
.with_row_count(DEFAULT_MUB_ROW_THRESHOLD);
|
||||||
|
@ -1083,47 +1045,6 @@ mod tests {
|
||||||
.expect("expect early return due to task completion");
|
.expect("expect early return due to task completion");
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_minimum_age() {
|
|
||||||
let rules = LifecycleRules {
|
|
||||||
mutable_linger_seconds: NonZeroU32::new(10).unwrap(),
|
|
||||||
mutable_minimum_age_seconds: Some(NonZeroU32::new(60).unwrap()),
|
|
||||||
..Default::default()
|
|
||||||
};
|
|
||||||
let chunks = vec![
|
|
||||||
TestChunk::new(0, Some(40), Some(40), ChunkStorage::OpenMutableBuffer),
|
|
||||||
TestChunk::new(1, Some(0), Some(0), ChunkStorage::OpenMutableBuffer),
|
|
||||||
];
|
|
||||||
|
|
||||||
let db = TestDb::new(rules, chunks);
|
|
||||||
let mut lifecycle = LifecyclePolicy::new(&db);
|
|
||||||
let partition = Arc::clone(&db.partitions.read()[0]);
|
|
||||||
|
|
||||||
// Expect to move chunk_id=1 first, despite it coming second in
|
|
||||||
// the order, because chunk_id=0 will only become old enough at t=100
|
|
||||||
|
|
||||||
lifecycle.check_for_work(from_secs(80), Instant::now());
|
|
||||||
let chunks = partition.read().chunks.keys().cloned().collect::<Vec<_>>();
|
|
||||||
// Expect chunk 1 to have been compacted into a new chunk 2
|
|
||||||
assert_eq!(*db.events.read(), vec![MoverEvents::Compact(vec![1])]);
|
|
||||||
assert_eq!(chunks, vec![0, 2]);
|
|
||||||
|
|
||||||
lifecycle.check_for_work(from_secs(90), Instant::now());
|
|
||||||
assert_eq!(*db.events.read(), vec![MoverEvents::Compact(vec![1])]);
|
|
||||||
|
|
||||||
lifecycle.check_for_work(from_secs(110), Instant::now());
|
|
||||||
assert_eq!(
|
|
||||||
*db.events.read(),
|
|
||||||
vec![
|
|
||||||
MoverEvents::Compact(vec![1]),
|
|
||||||
MoverEvents::Compact(vec![0, 2])
|
|
||||||
]
|
|
||||||
);
|
|
||||||
|
|
||||||
assert_eq!(partition.read().chunks.len(), 1);
|
|
||||||
assert_eq!(partition.read().chunks[&3].read().row_count, 20);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_buffer_size_soft_drop_non_persisted() {
|
async fn test_buffer_size_soft_drop_non_persisted() {
|
||||||
// test that chunk mover can drop non persisted chunks
|
// test that chunk mover can drop non persisted chunks
|
||||||
|
@ -1397,10 +1318,9 @@ mod tests {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_moves_closed() {
|
fn test_moves_open() {
|
||||||
let rules = LifecycleRules {
|
let rules = LifecycleRules {
|
||||||
mutable_linger_seconds: NonZeroU32::new(10).unwrap(),
|
mutable_linger_seconds: NonZeroU32::new(10).unwrap(),
|
||||||
mutable_minimum_age_seconds: Some(NonZeroU32::new(60).unwrap()),
|
|
||||||
..Default::default()
|
..Default::default()
|
||||||
};
|
};
|
||||||
let chunks = vec![TestChunk::new(
|
let chunks = vec![TestChunk::new(
|
||||||
|
@ -1413,14 +1333,26 @@ mod tests {
|
||||||
let db = TestDb::new(rules, chunks);
|
let db = TestDb::new(rules, chunks);
|
||||||
let mut lifecycle = LifecyclePolicy::new(&db);
|
let mut lifecycle = LifecyclePolicy::new(&db);
|
||||||
|
|
||||||
// Initially can't move
|
|
||||||
lifecycle.check_for_work(from_secs(80), Instant::now());
|
lifecycle.check_for_work(from_secs(80), Instant::now());
|
||||||
assert_eq!(*db.events.read(), vec![]);
|
assert_eq!(*db.events.read(), vec![MoverEvents::Compact(vec![0])]);
|
||||||
|
}
|
||||||
|
|
||||||
db.partitions.read()[0].read().chunks[&0].write().storage =
|
#[test]
|
||||||
ChunkStorage::ClosedMutableBuffer;
|
fn test_moves_closed() {
|
||||||
|
let rules = LifecycleRules {
|
||||||
|
mutable_linger_seconds: NonZeroU32::new(10).unwrap(),
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
let chunks = vec![TestChunk::new(
|
||||||
|
0,
|
||||||
|
Some(40),
|
||||||
|
Some(40),
|
||||||
|
ChunkStorage::ClosedMutableBuffer,
|
||||||
|
)];
|
||||||
|
|
||||||
|
let db = TestDb::new(rules, chunks);
|
||||||
|
let mut lifecycle = LifecyclePolicy::new(&db);
|
||||||
|
|
||||||
// As soon as closed can move
|
|
||||||
lifecycle.check_for_work(from_secs(80), Instant::now());
|
lifecycle.check_for_work(from_secs(80), Instant::now());
|
||||||
assert_eq!(*db.events.read(), vec![MoverEvents::Compact(vec![0])]);
|
assert_eq!(*db.events.read(), vec![MoverEvents::Compact(vec![0])]);
|
||||||
}
|
}
|
||||||
|
|
|
@ -83,11 +83,6 @@ struct Create {
|
||||||
#[structopt(long, default_value = "300")] // 5 minutes
|
#[structopt(long, default_value = "300")] // 5 minutes
|
||||||
mutable_linger_seconds: u32,
|
mutable_linger_seconds: u32,
|
||||||
|
|
||||||
/// A chunk of data within a partition is guaranteed to remain mutable
|
|
||||||
/// for at least this number of seconds
|
|
||||||
#[structopt(long, default_value = "0")] // 0 minutes
|
|
||||||
mutable_minimum_age_seconds: u32,
|
|
||||||
|
|
||||||
/// Once the total amount of buffered data in memory reaches this size start
|
/// Once the total amount of buffered data in memory reaches this size start
|
||||||
/// dropping data from memory based on the drop_order
|
/// dropping data from memory based on the drop_order
|
||||||
#[structopt(long, default_value = "52428800")] // 52428800 = 50*1024*1024
|
#[structopt(long, default_value = "52428800")] // 52428800 = 50*1024*1024
|
||||||
|
@ -186,7 +181,6 @@ pub async fn command(url: String, config: Config) -> Result<()> {
|
||||||
name: command.name,
|
name: command.name,
|
||||||
lifecycle_rules: Some(LifecycleRules {
|
lifecycle_rules: Some(LifecycleRules {
|
||||||
mutable_linger_seconds: command.mutable_linger_seconds,
|
mutable_linger_seconds: command.mutable_linger_seconds,
|
||||||
mutable_minimum_age_seconds: command.mutable_minimum_age_seconds,
|
|
||||||
buffer_size_soft: command.buffer_size_soft as _,
|
buffer_size_soft: command.buffer_size_soft as _,
|
||||||
buffer_size_hard: command.buffer_size_hard as _,
|
buffer_size_hard: command.buffer_size_hard as _,
|
||||||
drop_non_persisted: command.drop_non_persisted,
|
drop_non_persisted: command.drop_non_persisted,
|
||||||
|
|
Loading…
Reference in New Issue