feat: drop based on LRU (#2075) (#2092)

* feat: drop based on LRU (#2075)

* chore: review feedback

Co-authored-by: Andrew Lamb <alamb@influxdata.com>

Co-authored-by: Andrew Lamb <alamb@influxdata.com>
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Raphael Taylor-Davies 2021-07-23 09:31:28 +01:00 committed by GitHub
parent 50b436a8a3
commit 844a025c7c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 162 additions and 199 deletions

1
Cargo.lock generated
View File

@ -1889,6 +1889,7 @@ dependencies = [
"data_types", "data_types",
"futures", "futures",
"hashbrown", "hashbrown",
"internal_types",
"observability_deps", "observability_deps",
"tokio", "tokio",
"tracker", "tracker",

View File

@ -10,6 +10,7 @@ chrono = "0.4"
data_types = { path = "../data_types" } data_types = { path = "../data_types" }
futures = "0.3" futures = "0.3"
hashbrown = "0.11" hashbrown = "0.11"
internal_types = { path = "../internal_types" }
observability_deps = { path = "../observability_deps" } observability_deps = { path = "../observability_deps" }
tokio = { version = "1.0", features = ["macros", "time"] } tokio = { version = "1.0", features = ["macros", "time"] }
tracker = { path = "../tracker" } tracker = { path = "../tracker" }

View File

@ -14,6 +14,7 @@ use data_types::chunk_metadata::{ChunkAddr, ChunkLifecycleAction, ChunkStorage};
use data_types::database_rules::LifecycleRules; use data_types::database_rules::LifecycleRules;
use data_types::DatabaseName; use data_types::DatabaseName;
pub use guard::*; pub use guard::*;
use internal_types::access::AccessMetrics;
pub use policy::*; pub use policy::*;
use std::time::Instant; use std::time::Instant;
use tracker::TaskTracker; use tracker::TaskTracker;
@ -173,7 +174,8 @@ pub trait LifecycleChunk {
/// Returns the min timestamp contained within this chunk /// Returns the min timestamp contained within this chunk
fn min_timestamp(&self) -> DateTime<Utc>; fn min_timestamp(&self) -> DateTime<Utc>;
fn time_of_first_write(&self) -> Option<DateTime<Utc>>; /// Returns the access metrics for this chunk
fn access_metrics(&self) -> AccessMetrics;
fn time_of_last_write(&self) -> Option<DateTime<Utc>>; fn time_of_last_write(&self) -> Option<DateTime<Utc>>;

View File

@ -15,6 +15,7 @@ use crate::{
LifecycleChunk, LifecycleDb, LifecyclePartition, LockableChunk, LockablePartition, LifecycleChunk, LifecycleDb, LifecyclePartition, LockableChunk, LockablePartition,
PersistHandle, PersistHandle,
}; };
use internal_types::access::AccessMetrics;
/// Number of seconds to wait before retying a failed lifecycle action /// Number of seconds to wait before retying a failed lifecycle action
pub const LIFECYCLE_ACTION_BACKOFF: Duration = Duration::from_secs(10); pub const LIFECYCLE_ACTION_BACKOFF: Duration = Duration::from_secs(10);
@ -96,7 +97,7 @@ where
partition, partition,
action, action,
chunk_id: chunk.addr().chunk_id, chunk_id: chunk.addr().chunk_id,
first_write: chunk.time_of_first_write(), access_metrics: chunk.access_metrics(),
}) })
} }
} }
@ -637,18 +638,16 @@ struct FreeCandidate<'a, P> {
partition: &'a P, partition: &'a P,
chunk_id: u32, chunk_id: u32,
action: FreeAction, action: FreeAction,
first_write: Option<DateTime<Utc>>, access_metrics: AccessMetrics,
} }
fn sort_free_candidates<P>(candidates: &mut Vec<FreeCandidate<'_, P>>) { fn sort_free_candidates<P>(candidates: &mut Vec<FreeCandidate<'_, P>>) {
candidates.sort_unstable_by(|a, b| match a.action.cmp(&b.action) { candidates.sort_unstable_by(|a, b| match a.action.cmp(&b.action) {
// Order candidates with the same FreeAction by first write time, with nulls last // Order candidates with the same FreeAction by last access time
std::cmp::Ordering::Equal => match (a.first_write.as_ref(), b.first_write.as_ref()) { std::cmp::Ordering::Equal => a
(Some(a), Some(b)) => a.cmp(b), .access_metrics
(None, Some(_)) => std::cmp::Ordering::Greater, .last_instant
(Some(_), None) => std::cmp::Ordering::Less, .cmp(&b.access_metrics.last_instant),
(None, None) => std::cmp::Ordering::Equal,
},
o => o, o => o,
}) })
} }
@ -710,19 +709,14 @@ mod tests {
addr: ChunkAddr, addr: ChunkAddr,
row_count: usize, row_count: usize,
min_timestamp: Option<DateTime<Utc>>, min_timestamp: Option<DateTime<Utc>>,
time_of_first_write: Option<DateTime<Utc>>, access_metrics: AccessMetrics,
time_of_last_write: Option<DateTime<Utc>>, time_of_last_write: Option<DateTime<Utc>>,
lifecycle_action: Option<TaskTracker<ChunkLifecycleAction>>, lifecycle_action: Option<TaskTracker<ChunkLifecycleAction>>,
storage: ChunkStorage, storage: ChunkStorage,
} }
impl TestChunk { impl TestChunk {
fn new( fn new(id: u32, time_of_last_write: Option<i64>, storage: ChunkStorage) -> Self {
id: u32,
time_of_first_write: Option<i64>,
time_of_last_write: Option<i64>,
storage: ChunkStorage,
) -> Self {
let addr = ChunkAddr { let addr = ChunkAddr {
db_name: Arc::from(""), db_name: Arc::from(""),
table_name: Arc::from(""), table_name: Arc::from(""),
@ -734,47 +728,34 @@ mod tests {
addr, addr,
row_count: 10, row_count: 10,
min_timestamp: None, min_timestamp: None,
time_of_first_write: time_of_first_write.map(from_secs), access_metrics: AccessMetrics {
count: 0,
last_instant: Instant::now(),
},
time_of_last_write: time_of_last_write.map(from_secs), time_of_last_write: time_of_last_write.map(from_secs),
lifecycle_action: None, lifecycle_action: None,
storage, storage,
} }
} }
fn with_row_count(self, row_count: usize) -> Self { fn with_row_count(mut self, row_count: usize) -> Self {
Self { self.row_count = row_count;
addr: self.addr, self
row_count,
min_timestamp: self.min_timestamp,
time_of_first_write: self.time_of_first_write,
time_of_last_write: self.time_of_last_write,
lifecycle_action: self.lifecycle_action,
storage: self.storage,
}
} }
fn with_action(self, action: ChunkLifecycleAction) -> Self { fn with_action(mut self, action: ChunkLifecycleAction) -> Self {
Self { self.lifecycle_action = Some(TaskTracker::complete(action));
addr: self.addr, self
row_count: self.row_count,
min_timestamp: self.min_timestamp,
time_of_first_write: self.time_of_first_write,
time_of_last_write: self.time_of_last_write,
lifecycle_action: Some(TaskTracker::complete(action)),
storage: self.storage,
}
} }
fn with_min_timestamp(self, min_timestamp: DateTime<Utc>) -> Self { fn with_min_timestamp(mut self, min_timestamp: DateTime<Utc>) -> Self {
Self { self.min_timestamp = Some(min_timestamp);
addr: self.addr, self
row_count: self.row_count, }
min_timestamp: Some(min_timestamp),
time_of_first_write: self.time_of_first_write, fn with_access_metrics(mut self, metrics: AccessMetrics) -> Self {
time_of_last_write: self.time_of_last_write, self.access_metrics = metrics;
lifecycle_action: self.lifecycle_action, self
storage: self.storage,
}
} }
} }
@ -855,7 +836,7 @@ mod tests {
let id = partition.next_id; let id = partition.next_id;
partition.next_id += 1; partition.next_id += 1;
let mut new_chunk = TestChunk::new(id, None, None, ChunkStorage::ReadBuffer); let mut new_chunk = TestChunk::new(id, None, ChunkStorage::ReadBuffer);
new_chunk.row_count = 0; new_chunk.row_count = 0;
for chunk in &chunks { for chunk in &chunks {
@ -895,7 +876,7 @@ mod tests {
partition.next_id += 1; partition.next_id += 1;
// The remainder left behind after the split // The remainder left behind after the split
let new_chunk = TestChunk::new(id, None, None, ChunkStorage::ReadBuffer) let new_chunk = TestChunk::new(id, None, ChunkStorage::ReadBuffer)
.with_min_timestamp(handle.timestamp + chrono::Duration::nanoseconds(1)); .with_min_timestamp(handle.timestamp + chrono::Duration::nanoseconds(1));
partition partition
@ -988,8 +969,8 @@ mod tests {
self.min_timestamp.unwrap() self.min_timestamp.unwrap()
} }
fn time_of_first_write(&self) -> Option<DateTime<Utc>> { fn access_metrics(&self) -> AccessMetrics {
self.time_of_first_write self.access_metrics.clone()
} }
fn time_of_last_write(&self) -> Option<DateTime<Utc>> { fn time_of_last_write(&self) -> Option<DateTime<Utc>> {
@ -1113,70 +1094,61 @@ mod tests {
mub_row_threshold: NonZeroUsize::new(74).unwrap(), mub_row_threshold: NonZeroUsize::new(74).unwrap(),
..Default::default() ..Default::default()
}; };
let chunk = TestChunk::new(0, Some(0), Some(0), ChunkStorage::OpenMutableBuffer); let chunk = TestChunk::new(0, Some(0), ChunkStorage::OpenMutableBuffer);
assert!(!can_move(&rules, &chunk, from_secs(9))); assert!(!can_move(&rules, &chunk, from_secs(9)));
assert!(can_move(&rules, &chunk, from_secs(11))); assert!(can_move(&rules, &chunk, from_secs(11)));
// can move even if the chunk is small // can move even if the chunk is small
let chunk = let chunk = TestChunk::new(0, Some(0), ChunkStorage::OpenMutableBuffer).with_row_count(73);
TestChunk::new(0, Some(0), Some(0), ChunkStorage::OpenMutableBuffer).with_row_count(73);
assert!(can_move(&rules, &chunk, from_secs(11))); assert!(can_move(&rules, &chunk, from_secs(11)));
// If over the row count threshold, we should be able to move // If over the row count threshold, we should be able to move
let chunk = let chunk = TestChunk::new(0, None, ChunkStorage::OpenMutableBuffer).with_row_count(74);
TestChunk::new(0, None, None, ChunkStorage::OpenMutableBuffer).with_row_count(74);
assert!(can_move(&rules, &chunk, from_secs(0))); assert!(can_move(&rules, &chunk, from_secs(0)));
// If below the default row count threshold, it shouldn't move // If below the default row count threshold, it shouldn't move
let chunk = let chunk = TestChunk::new(0, None, ChunkStorage::OpenMutableBuffer).with_row_count(73);
TestChunk::new(0, None, None, ChunkStorage::OpenMutableBuffer).with_row_count(73);
assert!(!can_move(&rules, &chunk, from_secs(0))); assert!(!can_move(&rules, &chunk, from_secs(0)));
} }
#[test] #[test]
fn test_sort_free_candidates() { fn test_sort_free_candidates() {
let instant = Instant::now();
let access_metrics = |secs: u64| AccessMetrics {
count: 1,
last_instant: instant + Duration::from_secs(secs),
};
let mut candidates = vec![ let mut candidates = vec![
FreeCandidate {
partition: &(),
chunk_id: 0,
action: FreeAction::Drop,
first_write: None,
},
FreeCandidate {
partition: &(),
chunk_id: 2,
action: FreeAction::Unload,
first_write: None,
},
FreeCandidate { FreeCandidate {
partition: &(), partition: &(),
chunk_id: 1, chunk_id: 1,
action: FreeAction::Unload, action: FreeAction::Unload,
first_write: Some(from_secs(40)), access_metrics: access_metrics(40),
}, },
FreeCandidate { FreeCandidate {
partition: &(), partition: &(),
chunk_id: 3, chunk_id: 3,
action: FreeAction::Unload, action: FreeAction::Unload,
first_write: Some(from_secs(20)), access_metrics: access_metrics(20),
}, },
FreeCandidate { FreeCandidate {
partition: &(), partition: &(),
chunk_id: 4, chunk_id: 4,
action: FreeAction::Unload, action: FreeAction::Unload,
first_write: Some(from_secs(10)), access_metrics: access_metrics(10),
}, },
FreeCandidate { FreeCandidate {
partition: &(), partition: &(),
chunk_id: 5, chunk_id: 5,
action: FreeAction::Drop, action: FreeAction::Drop,
first_write: Some(from_secs(10)), access_metrics: access_metrics(10),
}, },
FreeCandidate { FreeCandidate {
partition: &(), partition: &(),
chunk_id: 6, chunk_id: 6,
action: FreeAction::Drop, action: FreeAction::Drop,
first_write: Some(from_secs(5)), access_metrics: access_metrics(5),
}, },
]; ];
@ -1186,8 +1158,8 @@ mod tests {
// Should first unload, then drop // Should first unload, then drop
// //
// Should order the same actions by write time, with nulls last // Should order the same actions by access time, with nulls last
assert_eq!(ids, vec![4, 3, 1, 2, 6, 5, 0]) assert_eq!(ids, vec![4, 3, 1, 6, 5])
} }
#[test] #[test]
@ -1195,9 +1167,9 @@ mod tests {
// The default rules shouldn't do anything // The default rules shouldn't do anything
let rules = LifecycleRules::default(); let rules = LifecycleRules::default();
let chunks = vec![ let chunks = vec![
TestChunk::new(0, Some(1), Some(1), ChunkStorage::OpenMutableBuffer), TestChunk::new(0, Some(1), ChunkStorage::OpenMutableBuffer),
TestChunk::new(1, Some(20), Some(1), ChunkStorage::OpenMutableBuffer), TestChunk::new(1, Some(1), ChunkStorage::OpenMutableBuffer),
TestChunk::new(2, Some(30), Some(1), ChunkStorage::OpenMutableBuffer), TestChunk::new(2, Some(1), ChunkStorage::OpenMutableBuffer),
]; ];
let db = TestDb::new(rules, chunks); let db = TestDb::new(rules, chunks);
@ -1213,9 +1185,9 @@ mod tests {
..Default::default() ..Default::default()
}; };
let chunks = vec![ let chunks = vec![
TestChunk::new(0, Some(0), Some(8), ChunkStorage::OpenMutableBuffer), TestChunk::new(0, Some(8), ChunkStorage::OpenMutableBuffer),
TestChunk::new(1, Some(0), Some(5), ChunkStorage::OpenMutableBuffer), TestChunk::new(1, Some(5), ChunkStorage::OpenMutableBuffer),
TestChunk::new(2, Some(0), Some(0), ChunkStorage::OpenMutableBuffer), TestChunk::new(2, Some(0), ChunkStorage::OpenMutableBuffer),
]; ];
let db = TestDb::new(rules, chunks); let db = TestDb::new(rules, chunks);
@ -1290,12 +1262,7 @@ mod tests {
..Default::default() ..Default::default()
}; };
let chunks = vec![TestChunk::new( let chunks = vec![TestChunk::new(0, Some(0), ChunkStorage::OpenMutableBuffer)];
0,
Some(0),
Some(0),
ChunkStorage::OpenMutableBuffer,
)];
let db = TestDb::new(rules.clone(), chunks); let db = TestDb::new(rules.clone(), chunks);
let mut lifecycle = LifecyclePolicy::new(&db); let mut lifecycle = LifecyclePolicy::new(&db);
@ -1303,26 +1270,44 @@ mod tests {
lifecycle.check_for_work(from_secs(10), Instant::now()); lifecycle.check_for_work(from_secs(10), Instant::now());
assert_eq!(*db.events.read(), vec![]); assert_eq!(*db.events.read(), vec![]);
let chunks = vec![ let instant = Instant::now();
// two "open" chunks => they must not be dropped (yet)
TestChunk::new(0, Some(0), Some(0), ChunkStorage::OpenMutableBuffer), let chunks =
TestChunk::new(1, Some(0), Some(0), ChunkStorage::OpenMutableBuffer), vec![
// "moved" chunk => can be dropped because `drop_non_persistent=true` // two "open" chunks => they must not be dropped (yet)
TestChunk::new(2, Some(0), Some(0), ChunkStorage::ReadBuffer), TestChunk::new(0, Some(0), ChunkStorage::OpenMutableBuffer),
// "writing" chunk => cannot be unloaded while write is in-progress TestChunk::new(1, Some(0), ChunkStorage::OpenMutableBuffer),
TestChunk::new(3, Some(0), Some(0), ChunkStorage::ReadBuffer) // "moved" chunk => can be dropped because `drop_non_persistent=true`
.with_action(ChunkLifecycleAction::Persisting), TestChunk::new(2, Some(0), ChunkStorage::ReadBuffer),
// "written" chunk => can be unloaded // "writing" chunk => cannot be unloaded while write is in-progress
TestChunk::new(4, Some(0), Some(0), ChunkStorage::ReadBufferAndObjectStore), TestChunk::new(3, Some(0), ChunkStorage::ReadBuffer)
]; .with_action(ChunkLifecycleAction::Persisting),
// "written" chunk => can be unloaded
TestChunk::new(4, Some(0), ChunkStorage::ReadBufferAndObjectStore)
.with_access_metrics(AccessMetrics {
count: 1,
last_instant: instant,
}),
// "written" chunk => can be unloaded
TestChunk::new(5, Some(0), ChunkStorage::ReadBufferAndObjectStore)
.with_access_metrics(AccessMetrics {
count: 12,
last_instant: instant - Duration::from_secs(1),
}),
];
let db = TestDb::new(rules, chunks); let db = TestDb::new(rules, chunks);
let mut lifecycle = LifecyclePolicy::new(&db); let mut lifecycle = LifecyclePolicy::new(&db);
// Should unload chunk 5 first as access time is smaller
lifecycle.check_for_work(from_secs(10), Instant::now()); lifecycle.check_for_work(from_secs(10), Instant::now());
assert_eq!( assert_eq!(
*db.events.read(), *db.events.read(),
vec![MoverEvents::Unload(4), MoverEvents::Drop(2)] vec![
MoverEvents::Unload(5),
MoverEvents::Unload(4),
MoverEvents::Drop(2)
]
); );
} }
@ -1339,12 +1324,7 @@ mod tests {
}; };
assert!(!rules.drop_non_persisted); assert!(!rules.drop_non_persisted);
let chunks = vec![TestChunk::new( let chunks = vec![TestChunk::new(0, Some(0), ChunkStorage::OpenMutableBuffer)];
0,
Some(0),
Some(0),
ChunkStorage::OpenMutableBuffer,
)];
let db = TestDb::new(rules.clone(), chunks); let db = TestDb::new(rules.clone(), chunks);
let mut lifecycle = LifecyclePolicy::new(&db); let mut lifecycle = LifecyclePolicy::new(&db);
@ -1354,15 +1334,15 @@ mod tests {
let chunks = vec![ let chunks = vec![
// two "open" chunks => they must not be dropped (yet) // two "open" chunks => they must not be dropped (yet)
TestChunk::new(0, Some(0), Some(0), ChunkStorage::OpenMutableBuffer), TestChunk::new(0, Some(0), ChunkStorage::OpenMutableBuffer),
TestChunk::new(1, Some(0), Some(0), ChunkStorage::OpenMutableBuffer), TestChunk::new(1, Some(0), ChunkStorage::OpenMutableBuffer),
// "moved" chunk => cannot be dropped because `drop_non_persistent=false` // "moved" chunk => cannot be dropped because `drop_non_persistent=false`
TestChunk::new(2, Some(0), Some(0), ChunkStorage::ReadBuffer), TestChunk::new(2, Some(0), ChunkStorage::ReadBuffer),
// "writing" chunk => cannot be drop while write is in-progess // "writing" chunk => cannot be drop while write is in-progess
TestChunk::new(3, Some(0), Some(0), ChunkStorage::ReadBuffer) TestChunk::new(3, Some(0), ChunkStorage::ReadBuffer)
.with_action(ChunkLifecycleAction::Persisting), .with_action(ChunkLifecycleAction::Persisting),
// "written" chunk => can be unloaded // "written" chunk => can be unloaded
TestChunk::new(4, Some(0), Some(0), ChunkStorage::ReadBufferAndObjectStore), TestChunk::new(4, Some(0), ChunkStorage::ReadBufferAndObjectStore),
]; ];
let db = TestDb::new(rules, chunks); let db = TestDb::new(rules, chunks);
@ -1380,12 +1360,7 @@ mod tests {
..Default::default() ..Default::default()
}; };
let chunks = vec![TestChunk::new( let chunks = vec![TestChunk::new(0, Some(0), ChunkStorage::OpenMutableBuffer)];
0,
Some(0),
Some(0),
ChunkStorage::OpenMutableBuffer,
)];
let db = TestDb::new(rules, chunks); let db = TestDb::new(rules, chunks);
let mut lifecycle = LifecyclePolicy::new(&db); let mut lifecycle = LifecyclePolicy::new(&db);
@ -1408,61 +1383,55 @@ mod tests {
let partitions = vec![ let partitions = vec![
TestPartition::new(vec![ TestPartition::new(vec![
// still receiving writes => cannot compact // still receiving writes => cannot compact
TestChunk::new(0, Some(0), Some(20), ChunkStorage::OpenMutableBuffer), TestChunk::new(0, Some(20), ChunkStorage::OpenMutableBuffer),
]), ]),
TestPartition::new(vec![ TestPartition::new(vec![
// still receiving writes => cannot compact // still receiving writes => cannot compact
TestChunk::new(1, Some(0), Some(20), ChunkStorage::OpenMutableBuffer), TestChunk::new(1, Some(20), ChunkStorage::OpenMutableBuffer),
// closed => can compact // closed => can compact
TestChunk::new(2, Some(0), Some(20), ChunkStorage::ClosedMutableBuffer), TestChunk::new(2, Some(20), ChunkStorage::ClosedMutableBuffer),
]), ]),
TestPartition::new(vec![ TestPartition::new(vec![
// open but cold => can compact // open but cold => can compact
TestChunk::new(3, Some(0), Some(5), ChunkStorage::OpenMutableBuffer), TestChunk::new(3, Some(5), ChunkStorage::OpenMutableBuffer),
// closed => can compact // closed => can compact
TestChunk::new(4, Some(0), Some(20), ChunkStorage::ClosedMutableBuffer), TestChunk::new(4, Some(20), ChunkStorage::ClosedMutableBuffer),
// closed => can compact // closed => can compact
TestChunk::new(5, Some(0), Some(20), ChunkStorage::ReadBuffer), TestChunk::new(5, Some(20), ChunkStorage::ReadBuffer),
// persisted => cannot compact // persisted => cannot compact
TestChunk::new(6, Some(0), Some(20), ChunkStorage::ReadBufferAndObjectStore), TestChunk::new(6, Some(20), ChunkStorage::ReadBufferAndObjectStore),
// persisted => cannot compact // persisted => cannot compact
TestChunk::new(7, Some(0), Some(20), ChunkStorage::ObjectStoreOnly), TestChunk::new(7, Some(20), ChunkStorage::ObjectStoreOnly),
]), ]),
TestPartition::new(vec![ TestPartition::new(vec![
// closed => can compact // closed => can compact
TestChunk::new(8, Some(0), Some(20), ChunkStorage::ReadBuffer), TestChunk::new(8, Some(20), ChunkStorage::ReadBuffer),
// closed => can compact // closed => can compact
TestChunk::new(9, Some(0), Some(20), ChunkStorage::ReadBuffer), TestChunk::new(9, Some(20), ChunkStorage::ReadBuffer),
// persisted => cannot compact // persisted => cannot compact
TestChunk::new( TestChunk::new(10, Some(20), ChunkStorage::ReadBufferAndObjectStore),
10,
Some(0),
Some(20),
ChunkStorage::ReadBufferAndObjectStore,
),
// persisted => cannot compact // persisted => cannot compact
TestChunk::new(11, Some(0), Some(20), ChunkStorage::ObjectStoreOnly), TestChunk::new(11, Some(20), ChunkStorage::ObjectStoreOnly),
]), ]),
TestPartition::new(vec![ TestPartition::new(vec![
// open but cold => can compact // open but cold => can compact
TestChunk::new(12, Some(0), Some(5), ChunkStorage::OpenMutableBuffer), TestChunk::new(12, Some(5), ChunkStorage::OpenMutableBuffer),
]), ]),
TestPartition::new(vec![ TestPartition::new(vec![
// already compacted => should not compact // already compacted => should not compact
TestChunk::new(13, Some(0), Some(5), ChunkStorage::ReadBuffer), TestChunk::new(13, Some(5), ChunkStorage::ReadBuffer),
]), ]),
TestPartition::new(vec![ TestPartition::new(vec![
// closed => can compact // closed => can compact
TestChunk::new(14, Some(0), Some(20), ChunkStorage::ReadBuffer).with_row_count(400), TestChunk::new(14, Some(20), ChunkStorage::ReadBuffer).with_row_count(400),
// too many individual rows => ignore // too many individual rows => ignore
TestChunk::new(15, Some(0), Some(20), ChunkStorage::ReadBuffer) TestChunk::new(15, Some(20), ChunkStorage::ReadBuffer).with_row_count(1_000),
.with_row_count(1_000),
// closed => can compact // closed => can compact
TestChunk::new(16, Some(0), Some(20), ChunkStorage::ReadBuffer).with_row_count(400), TestChunk::new(16, Some(20), ChunkStorage::ReadBuffer).with_row_count(400),
// too many total rows => next compaction job // too many total rows => next compaction job
TestChunk::new(17, Some(0), Some(20), ChunkStorage::ReadBuffer).with_row_count(400), TestChunk::new(17, Some(20), ChunkStorage::ReadBuffer).with_row_count(400),
// too many total rows => next compaction job // too many total rows => next compaction job
TestChunk::new(18, Some(0), Some(20), ChunkStorage::ReadBuffer).with_row_count(400), TestChunk::new(18, Some(20), ChunkStorage::ReadBuffer).with_row_count(400),
]), ]),
]; ];
@ -1497,19 +1466,19 @@ mod tests {
let partitions = vec![ let partitions = vec![
TestPartition::new(vec![ TestPartition::new(vec![
// closed => can compact // closed => can compact
TestChunk::new(0, Some(0), Some(20), ChunkStorage::ClosedMutableBuffer), TestChunk::new(0, Some(20), ChunkStorage::ClosedMutableBuffer),
// closed => can compact // closed => can compact
TestChunk::new(10, Some(0), Some(30), ChunkStorage::ClosedMutableBuffer), TestChunk::new(10, Some(30), ChunkStorage::ClosedMutableBuffer),
// closed => can compact // closed => can compact
TestChunk::new(12, Some(0), Some(40), ChunkStorage::ClosedMutableBuffer), TestChunk::new(12, Some(40), ChunkStorage::ClosedMutableBuffer),
]), ]),
TestPartition::new(vec![ TestPartition::new(vec![
// closed => can compact // closed => can compact
TestChunk::new(1, Some(0), Some(20), ChunkStorage::ClosedMutableBuffer), TestChunk::new(1, Some(20), ChunkStorage::ClosedMutableBuffer),
]), ]),
TestPartition::new(vec![ TestPartition::new(vec![
// closed => can compact // closed => can compact
TestChunk::new(200, Some(0), Some(10), ChunkStorage::ClosedMutableBuffer), TestChunk::new(200, Some(10), ChunkStorage::ClosedMutableBuffer),
]), ]),
]; ];
@ -1547,64 +1516,64 @@ mod tests {
let partitions = vec![ let partitions = vec![
// Insufficient rows and not old enough => don't persist but can compact // Insufficient rows and not old enough => don't persist but can compact
TestPartition::new(vec![ TestPartition::new(vec![
TestChunk::new(0, Some(0), Some(0), ChunkStorage::ClosedMutableBuffer) TestChunk::new(0, Some(0), ChunkStorage::ClosedMutableBuffer)
.with_min_timestamp(from_secs(10)), .with_min_timestamp(from_secs(10)),
TestChunk::new(1, Some(0), Some(0), ChunkStorage::ReadBuffer) TestChunk::new(1, Some(0), ChunkStorage::ReadBuffer)
.with_min_timestamp(from_secs(5)), .with_min_timestamp(from_secs(5)),
]) ])
.with_persistence(10, now, from_secs(20)), .with_persistence(10, now, from_secs(20)),
// Sufficient rows => persist // Sufficient rows => persist
TestPartition::new(vec![ TestPartition::new(vec![
TestChunk::new(2, Some(0), Some(0), ChunkStorage::ClosedMutableBuffer) TestChunk::new(2, Some(0), ChunkStorage::ClosedMutableBuffer)
.with_min_timestamp(from_secs(10)), .with_min_timestamp(from_secs(10)),
TestChunk::new(3, Some(0), Some(0), ChunkStorage::ReadBuffer) TestChunk::new(3, Some(0), ChunkStorage::ReadBuffer)
.with_min_timestamp(from_secs(5)), .with_min_timestamp(from_secs(5)),
]) ])
.with_persistence(1_000, now, from_secs(20)), .with_persistence(1_000, now, from_secs(20)),
// Writes too old => persist // Writes too old => persist
TestPartition::new(vec![ TestPartition::new(vec![
// Should split open chunks // Should split open chunks
TestChunk::new(4, Some(0), Some(20), ChunkStorage::OpenMutableBuffer) TestChunk::new(4, Some(20), ChunkStorage::OpenMutableBuffer)
.with_min_timestamp(from_secs(10)), .with_min_timestamp(from_secs(10)),
TestChunk::new(5, Some(0), Some(0), ChunkStorage::ReadBuffer) TestChunk::new(5, Some(0), ChunkStorage::ReadBuffer)
.with_min_timestamp(from_secs(5)), .with_min_timestamp(from_secs(5)),
TestChunk::new(6, Some(0), Some(0), ChunkStorage::ObjectStoreOnly) TestChunk::new(6, Some(0), ChunkStorage::ObjectStoreOnly)
.with_min_timestamp(from_secs(5)), .with_min_timestamp(from_secs(5)),
]) ])
.with_persistence(10, now - Duration::from_secs(10), from_secs(20)), .with_persistence(10, now - Duration::from_secs(10), from_secs(20)),
// Sufficient rows but conflicting compaction => prevent compaction // Sufficient rows but conflicting compaction => prevent compaction
TestPartition::new(vec![ TestPartition::new(vec![
TestChunk::new(7, Some(0), Some(0), ChunkStorage::ClosedMutableBuffer) TestChunk::new(7, Some(0), ChunkStorage::ClosedMutableBuffer)
.with_min_timestamp(from_secs(10)) .with_min_timestamp(from_secs(10))
.with_action(ChunkLifecycleAction::Compacting), .with_action(ChunkLifecycleAction::Compacting),
// This chunk would be a compaction candidate, but we want to persist it // This chunk would be a compaction candidate, but we want to persist it
TestChunk::new(8, Some(0), Some(0), ChunkStorage::ClosedMutableBuffer) TestChunk::new(8, Some(0), ChunkStorage::ClosedMutableBuffer)
.with_min_timestamp(from_secs(10)), .with_min_timestamp(from_secs(10)),
TestChunk::new(9, Some(0), Some(0), ChunkStorage::ReadBuffer) TestChunk::new(9, Some(0), ChunkStorage::ReadBuffer)
.with_min_timestamp(from_secs(5)), .with_min_timestamp(from_secs(5)),
]) ])
.with_persistence(1_000, now, from_secs(20)), .with_persistence(1_000, now, from_secs(20)),
// Sufficient rows and non-conflicting compaction => persist // Sufficient rows and non-conflicting compaction => persist
TestPartition::new(vec![ TestPartition::new(vec![
TestChunk::new(10, Some(0), Some(0), ChunkStorage::ClosedMutableBuffer) TestChunk::new(10, Some(0), ChunkStorage::ClosedMutableBuffer)
.with_min_timestamp(from_secs(21)) .with_min_timestamp(from_secs(21))
.with_action(ChunkLifecycleAction::Compacting), .with_action(ChunkLifecycleAction::Compacting),
TestChunk::new(11, Some(0), Some(0), ChunkStorage::ClosedMutableBuffer) TestChunk::new(11, Some(0), ChunkStorage::ClosedMutableBuffer)
.with_min_timestamp(from_secs(10)), .with_min_timestamp(from_secs(10)),
TestChunk::new(12, Some(0), Some(0), ChunkStorage::ReadBuffer) TestChunk::new(12, Some(0), ChunkStorage::ReadBuffer)
.with_min_timestamp(from_secs(5)), .with_min_timestamp(from_secs(5)),
]) ])
.with_persistence(1_000, now, from_secs(20)), .with_persistence(1_000, now, from_secs(20)),
// Sufficient rows, non-conflicting compaction and compact-able chunk => persist + compact // Sufficient rows, non-conflicting compaction and compact-able chunk => persist + compact
TestPartition::new(vec![ TestPartition::new(vec![
TestChunk::new(13, Some(0), Some(0), ChunkStorage::ClosedMutableBuffer) TestChunk::new(13, Some(0), ChunkStorage::ClosedMutableBuffer)
.with_min_timestamp(from_secs(21)) .with_min_timestamp(from_secs(21))
.with_action(ChunkLifecycleAction::Compacting), .with_action(ChunkLifecycleAction::Compacting),
TestChunk::new(14, Some(0), Some(0), ChunkStorage::ClosedMutableBuffer) TestChunk::new(14, Some(0), ChunkStorage::ClosedMutableBuffer)
.with_min_timestamp(from_secs(21)), .with_min_timestamp(from_secs(21)),
TestChunk::new(15, Some(0), Some(0), ChunkStorage::ClosedMutableBuffer) TestChunk::new(15, Some(0), ChunkStorage::ClosedMutableBuffer)
.with_min_timestamp(from_secs(10)), .with_min_timestamp(from_secs(10)),
TestChunk::new(16, Some(0), Some(0), ChunkStorage::ReadBuffer) TestChunk::new(16, Some(0), ChunkStorage::ReadBuffer)
.with_min_timestamp(from_secs(5)), .with_min_timestamp(from_secs(5)),
]) ])
.with_persistence(1_000, now, from_secs(20)), .with_persistence(1_000, now, from_secs(20)),
@ -1635,12 +1604,7 @@ mod tests {
late_arrive_window_seconds: NonZeroU32::new(10).unwrap(), late_arrive_window_seconds: NonZeroU32::new(10).unwrap(),
..Default::default() ..Default::default()
}; };
let chunks = vec![TestChunk::new( let chunks = vec![TestChunk::new(0, Some(40), ChunkStorage::OpenMutableBuffer)];
0,
Some(40),
Some(40),
ChunkStorage::OpenMutableBuffer,
)];
let db = TestDb::new(rules, chunks); let db = TestDb::new(rules, chunks);
let mut lifecycle = LifecyclePolicy::new(&db); let mut lifecycle = LifecyclePolicy::new(&db);
@ -1658,7 +1622,6 @@ mod tests {
let chunks = vec![TestChunk::new( let chunks = vec![TestChunk::new(
0, 0,
Some(40), Some(40),
Some(40),
ChunkStorage::ClosedMutableBuffer, ChunkStorage::ClosedMutableBuffer,
)]; )];
@ -1672,12 +1635,7 @@ mod tests {
#[test] #[test]
fn test_recovers_lifecycle_action() { fn test_recovers_lifecycle_action() {
let rules = LifecycleRules::default(); let rules = LifecycleRules::default();
let chunks = vec![TestChunk::new( let chunks = vec![TestChunk::new(0, None, ChunkStorage::ClosedMutableBuffer)];
0,
None,
None,
ChunkStorage::ClosedMutableBuffer,
)];
let db = TestDb::new(rules, chunks); let db = TestDb::new(rules, chunks);
let mut lifecycle = LifecyclePolicy::new(&db); let mut lifecycle = LifecyclePolicy::new(&db);

View File

@ -12,6 +12,7 @@ use data_types::job::Job;
use data_types::partition_metadata::Statistics; use data_types::partition_metadata::Statistics;
use data_types::DatabaseName; use data_types::DatabaseName;
use datafusion::physical_plan::SendableRecordBatchStream; use datafusion::physical_plan::SendableRecordBatchStream;
use internal_types::access::AccessMetrics;
use internal_types::schema::merge::SchemaMerger; use internal_types::schema::merge::SchemaMerger;
use internal_types::schema::{Schema, TIME_COLUMN_NAME}; use internal_types::schema::{Schema, TIME_COLUMN_NAME};
use lifecycle::{ use lifecycle::{
@ -19,6 +20,7 @@ use lifecycle::{
LockablePartition, LockablePartition,
}; };
use observability_deps::tracing::{info, trace}; use observability_deps::tracing::{info, trace};
use persistence_windows::persistence_windows::FlushHandle;
use query::QueryChunkMeta; use query::QueryChunkMeta;
use tracker::{RwLock, TaskTracker}; use tracker::{RwLock, TaskTracker};
@ -31,7 +33,6 @@ pub(crate) use drop::drop_chunk;
pub(crate) use error::{Error, Result}; pub(crate) use error::{Error, Result};
pub(crate) use move_chunk::move_chunk_to_read_buffer; pub(crate) use move_chunk::move_chunk_to_read_buffer;
pub(crate) use persist::persist_chunks; pub(crate) use persist::persist_chunks;
use persistence_windows::persistence_windows::FlushHandle;
pub(crate) use unload::unload_read_buffer_chunk; pub(crate) use unload::unload_read_buffer_chunk;
use super::DbChunk; use super::DbChunk;
@ -291,8 +292,24 @@ impl LifecycleChunk for CatalogChunk {
.expect("failed to clear lifecycle action") .expect("failed to clear lifecycle action")
} }
fn time_of_first_write(&self) -> Option<DateTime<Utc>> { fn min_timestamp(&self) -> DateTime<Utc> {
self.time_of_first_write() let table_summary = self.table_summary();
let col = table_summary
.columns
.iter()
.find(|x| x.name == TIME_COLUMN_NAME)
.expect("time column expected");
let min = match &col.stats {
Statistics::I64(stats) => stats.min.expect("time column cannot be empty"),
_ => panic!("unexpected time column type"),
};
Utc.timestamp_nanos(min)
}
fn access_metrics(&self) -> AccessMetrics {
self.access_recorder().get_metrics()
} }
fn time_of_last_write(&self) -> Option<DateTime<Utc>> { fn time_of_last_write(&self) -> Option<DateTime<Utc>> {
@ -310,22 +327,6 @@ impl LifecycleChunk for CatalogChunk {
fn row_count(&self) -> usize { fn row_count(&self) -> usize {
self.storage().0 self.storage().0
} }
fn min_timestamp(&self) -> DateTime<Utc> {
let table_summary = self.table_summary();
let col = table_summary
.columns
.iter()
.find(|x| x.name == TIME_COLUMN_NAME)
.expect("time column expected");
let min = match &col.stats {
Statistics::I64(stats) => stats.min.expect("time column cannot be empty"),
_ => panic!("unexpected time column type"),
};
Utc.timestamp_nanos(min)
}
} }
/// Executes a plan and collects the results into a read buffer chunk /// Executes a plan and collects the results into a read buffer chunk