feat: automated chunk lifecycle (#1091)
* feat: automated chunk lifecycle * chore: use >= for lifecycle timestamp comparisons * chore: review fixespull/24376/head
parent
8f69a12ec6
commit
b0e21e5f9e
|
@ -177,6 +177,8 @@ pub struct LifecycleRules {
|
|||
|
||||
/// Once the amount of data in memory reaches this size start
|
||||
/// rejecting writes
|
||||
///
|
||||
/// TODO: Implement this limit
|
||||
pub buffer_size_hard: Option<NonZeroUsize>,
|
||||
|
||||
/// Configure order to transition data
|
||||
|
@ -364,7 +366,7 @@ pub enum Order {
|
|||
|
||||
impl Default for Order {
|
||||
fn default() -> Self {
|
||||
Self::Desc
|
||||
Self::Asc
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -7,7 +7,7 @@ edition = "2018"
|
|||
[dependencies] # In alphabetical order
|
||||
arrow_deps = { path = "../arrow_deps" }
|
||||
async-trait = "0.1"
|
||||
bytes = { version = "1.0", features = ["serde"] }
|
||||
bytes = { version = "1.0" }
|
||||
chrono = "0.4"
|
||||
crc32fast = "1.2.0"
|
||||
data_types = { path = "../data_types" }
|
||||
|
|
|
@ -23,10 +23,14 @@ use read_buffer::Database as ReadBufferDb;
|
|||
|
||||
pub(crate) use chunk::DBChunk;
|
||||
|
||||
use crate::db::lifecycle::LifecycleManager;
|
||||
use crate::tracker::{TrackedFutureExt, Tracker};
|
||||
use crate::{buffer::Buffer, JobRegistry};
|
||||
use data_types::job::Job;
|
||||
|
||||
pub mod catalog;
|
||||
mod chunk;
|
||||
mod lifecycle;
|
||||
pub mod pred;
|
||||
mod streams;
|
||||
|
||||
|
@ -369,6 +373,40 @@ impl Db {
|
|||
Ok(DBChunk::snapshot(&chunk))
|
||||
}
|
||||
|
||||
/// Spawns a task to perform load_chunk_to_read_buffer
|
||||
pub fn load_chunk_to_read_buffer_in_background(
|
||||
self: &Arc<Self>,
|
||||
partition_key: String,
|
||||
chunk_id: u32,
|
||||
) -> Tracker<Job> {
|
||||
let name = self.rules.read().name.clone();
|
||||
let (tracker, registration) = self.jobs.register(Job::CloseChunk {
|
||||
db_name: name.clone(),
|
||||
partition_key: partition_key.clone(),
|
||||
chunk_id,
|
||||
});
|
||||
|
||||
let captured = Arc::clone(&self);
|
||||
let task = async move {
|
||||
debug!(%name, %partition_key, %chunk_id, "background task loading chunk to read buffer");
|
||||
let result = captured
|
||||
.load_chunk_to_read_buffer(&partition_key, chunk_id)
|
||||
.await;
|
||||
if let Err(e) = result {
|
||||
info!(?e, %name, %partition_key, %chunk_id, "background task error loading read buffer chunk");
|
||||
return Err(e);
|
||||
}
|
||||
|
||||
debug!(%name, %partition_key, %chunk_id, "background task completed closing chunk");
|
||||
|
||||
Ok(())
|
||||
};
|
||||
|
||||
tokio::spawn(task.track(registration));
|
||||
|
||||
tracker
|
||||
}
|
||||
|
||||
/// Returns the next write sequence number
|
||||
pub fn next_sequence(&self) -> u64 {
|
||||
self.sequence.fetch_add(1, Ordering::SeqCst)
|
||||
|
@ -420,14 +458,20 @@ impl Db {
|
|||
}
|
||||
|
||||
/// Background worker function
|
||||
pub async fn background_worker(&self, shutdown: tokio_util::sync::CancellationToken) {
|
||||
pub async fn background_worker(
|
||||
self: &Arc<Self>,
|
||||
shutdown: tokio_util::sync::CancellationToken,
|
||||
) {
|
||||
info!("started background worker");
|
||||
|
||||
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(1));
|
||||
let mut lifecycle_manager = LifecycleManager::new(Arc::clone(&self));
|
||||
|
||||
while !shutdown.is_cancelled() {
|
||||
self.worker_iterations.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
lifecycle_manager.check_for_work();
|
||||
|
||||
tokio::select! {
|
||||
_ = interval.tick() => {},
|
||||
_ = shutdown.cancelled() => break
|
||||
|
|
|
@ -25,9 +25,6 @@ pub enum ChunkState {
|
|||
/// Chunk can still accept new writes, but will likely be closed soon
|
||||
Closing(MBChunk),
|
||||
|
||||
/// Chunk is closed for new writes and has become read only
|
||||
Closed(Arc<MBChunk>),
|
||||
|
||||
/// Chunk is closed for new writes, and is actively moving to the read
|
||||
/// buffer
|
||||
Moving(Arc<MBChunk>),
|
||||
|
@ -42,7 +39,6 @@ impl ChunkState {
|
|||
Self::Invalid => "Invalid",
|
||||
Self::Open(_) => "Open",
|
||||
Self::Closing(_) => "Closing",
|
||||
Self::Closed(_) => "Closed",
|
||||
Self::Moving(_) => "Moving",
|
||||
Self::Moved(_) => "Moved",
|
||||
}
|
||||
|
@ -91,7 +87,7 @@ macro_rules! unexpected_state {
|
|||
}
|
||||
|
||||
impl Chunk {
|
||||
/// Create a new chunk in the Open state
|
||||
/// Create a new chunk in the provided state
|
||||
pub(crate) fn new(partition_key: impl Into<String>, id: u32, state: ChunkState) -> Self {
|
||||
Self {
|
||||
partition_key: Arc::new(partition_key.into()),
|
||||
|
@ -103,6 +99,23 @@ impl Chunk {
|
|||
}
|
||||
}
|
||||
|
||||
/// Creates a new open chunk
|
||||
pub(crate) fn new_open(partition_key: impl Into<String>, id: u32) -> Self {
|
||||
let state = ChunkState::Open(mutable_buffer::chunk::Chunk::new(id));
|
||||
Self::new(partition_key, id, state)
|
||||
}
|
||||
|
||||
/// Used for testing
|
||||
#[cfg(test)]
|
||||
pub(crate) fn set_timestamps(
|
||||
&mut self,
|
||||
time_of_first_write: Option<DateTime<Utc>>,
|
||||
time_of_last_write: Option<DateTime<Utc>>,
|
||||
) {
|
||||
self.time_of_first_write = time_of_first_write;
|
||||
self.time_of_last_write = time_of_last_write;
|
||||
}
|
||||
|
||||
pub fn id(&self) -> u32 {
|
||||
self.id
|
||||
}
|
||||
|
@ -156,7 +169,7 @@ impl Chunk {
|
|||
match &self.state {
|
||||
ChunkState::Invalid => false,
|
||||
ChunkState::Open(chunk) | ChunkState::Closing(chunk) => chunk.has_table(table_name),
|
||||
ChunkState::Moving(chunk) | ChunkState::Closed(chunk) => chunk.has_table(table_name),
|
||||
ChunkState::Moving(chunk) => chunk.has_table(table_name),
|
||||
ChunkState::Moved(db) => {
|
||||
db.has_table(self.partition_key.as_str(), table_name, &[self.id])
|
||||
}
|
||||
|
@ -168,13 +181,26 @@ impl Chunk {
|
|||
match &self.state {
|
||||
ChunkState::Invalid => {}
|
||||
ChunkState::Open(chunk) | ChunkState::Closing(chunk) => chunk.all_table_names(names),
|
||||
ChunkState::Moving(chunk) | ChunkState::Closed(chunk) => chunk.all_table_names(names),
|
||||
ChunkState::Moving(chunk) => chunk.all_table_names(names),
|
||||
ChunkState::Moved(db) => {
|
||||
db.all_table_names(self.partition_key.as_str(), &[self.id], names)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns an approximation of the amount of process memory consumed by the
|
||||
/// chunk
|
||||
pub fn size(&self) -> usize {
|
||||
match &self.state {
|
||||
ChunkState::Invalid => 0,
|
||||
ChunkState::Open(chunk) | ChunkState::Closing(chunk) => chunk.size(),
|
||||
ChunkState::Moving(chunk) => chunk.size(),
|
||||
ChunkState::Moved(db) => db
|
||||
.chunks_size(self.partition_key.as_str(), &[self.id])
|
||||
.unwrap_or(0) as usize,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns a mutable reference to the mutable buffer storage for
|
||||
/// chunks in the Open or Closing state
|
||||
///
|
||||
|
@ -218,18 +244,9 @@ impl Chunk {
|
|||
self.state = ChunkState::Moving(Arc::clone(&chunk));
|
||||
Ok(chunk)
|
||||
}
|
||||
ChunkState::Closed(chunk) => {
|
||||
self.state = ChunkState::Moving(Arc::clone(&chunk));
|
||||
Ok(chunk)
|
||||
}
|
||||
state => {
|
||||
self.state = state;
|
||||
unexpected_state!(
|
||||
self,
|
||||
"setting moving",
|
||||
"Open, Closing or Closed",
|
||||
&self.state
|
||||
)
|
||||
unexpected_state!(self, "setting moving", "Open or Closing", &self.state)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -78,8 +78,7 @@ impl Partition {
|
|||
let chunk_id = self.next_chunk_id;
|
||||
self.next_chunk_id += 1;
|
||||
|
||||
let state = ChunkState::Open(mutable_buffer::chunk::Chunk::new(chunk_id));
|
||||
let chunk = Arc::new(RwLock::new(Chunk::new(&self.key, chunk_id, state)));
|
||||
let chunk = Arc::new(RwLock::new(Chunk::new_open(&self.key, chunk_id)));
|
||||
|
||||
if self.chunks.insert(chunk_id, Arc::clone(&chunk)).is_some() {
|
||||
// A fundamental invariant has been violated - abort
|
||||
|
|
|
@ -77,11 +77,13 @@ impl DBChunk {
|
|||
let partition_key = Arc::new(chunk.key().to_string());
|
||||
let chunk_id = chunk.id();
|
||||
|
||||
use super::catalog::chunk::ChunkState;
|
||||
|
||||
let db_chunk = match chunk.state() {
|
||||
super::catalog::chunk::ChunkState::Invalid => {
|
||||
ChunkState::Invalid => {
|
||||
panic!("Invalid internal state");
|
||||
}
|
||||
super::catalog::chunk::ChunkState::Open(chunk) => {
|
||||
ChunkState::Open(chunk) => {
|
||||
// TODO the performance if cloning the chunk is terrible
|
||||
// Proper performance is tracked in
|
||||
// https://github.com/influxdata/influxdb_iox/issues/635
|
||||
|
@ -92,7 +94,7 @@ impl DBChunk {
|
|||
open: true,
|
||||
}
|
||||
}
|
||||
super::catalog::chunk::ChunkState::Closing(chunk) => {
|
||||
ChunkState::Closing(chunk) => {
|
||||
// TODO the performance if cloning the chunk is terrible
|
||||
// Proper performance is tracked in
|
||||
// https://github.com/influxdata/influxdb_iox/issues/635
|
||||
|
@ -103,8 +105,7 @@ impl DBChunk {
|
|||
open: false,
|
||||
}
|
||||
}
|
||||
super::catalog::chunk::ChunkState::Closed(chunk)
|
||||
| super::catalog::chunk::ChunkState::Moving(chunk) => {
|
||||
ChunkState::Moving(chunk) => {
|
||||
let chunk = Arc::clone(chunk);
|
||||
Self::MutableBuffer {
|
||||
chunk,
|
||||
|
@ -112,7 +113,7 @@ impl DBChunk {
|
|||
open: false,
|
||||
}
|
||||
}
|
||||
super::catalog::chunk::ChunkState::Moved(db) => {
|
||||
ChunkState::Moved(db) => {
|
||||
let db = Arc::clone(db);
|
||||
Self::ReadBuffer {
|
||||
db,
|
||||
|
|
|
@ -0,0 +1,495 @@
|
|||
use std::convert::TryInto;
|
||||
use std::sync::Arc;
|
||||
|
||||
use chrono::{DateTime, Utc};
|
||||
use parking_lot::{RwLock, RwLockUpgradableReadGuard};
|
||||
use tracing::{error, info};
|
||||
|
||||
use data_types::{database_rules::LifecycleRules, error::ErrorLogger, job::Job};
|
||||
|
||||
use crate::tracker::Tracker;
|
||||
|
||||
use super::{
|
||||
catalog::chunk::{Chunk, ChunkState},
|
||||
Db,
|
||||
};
|
||||
use data_types::database_rules::SortOrder;
|
||||
|
||||
/// Handles the lifecycle of chunks within a Db
|
||||
pub struct LifecycleManager {
|
||||
db: Arc<Db>,
|
||||
move_task: Option<Tracker<Job>>,
|
||||
}
|
||||
|
||||
impl LifecycleManager {
|
||||
pub fn new(db: Arc<Db>) -> Self {
|
||||
Self {
|
||||
db,
|
||||
move_task: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Polls the lifecycle manager to find and spawn work that needs
|
||||
/// to be done as determined by the database's lifecycle policy
|
||||
///
|
||||
/// Should be called periodically and should spawn any long-running
|
||||
/// work onto the tokio threadpool and return
|
||||
pub fn check_for_work(&mut self) {
|
||||
ChunkMover::check_for_work(self, Utc::now())
|
||||
}
|
||||
}
|
||||
|
||||
/// A trait that encapsulates the core chunk lifecycle logic
|
||||
///
|
||||
/// This is to enable independent testing of the policy logic
|
||||
trait ChunkMover {
|
||||
/// Returns the size of a chunk - overridden for testing
|
||||
fn chunk_size(chunk: &Chunk) -> usize {
|
||||
chunk.size()
|
||||
}
|
||||
|
||||
/// Returns the lifecycle policy
|
||||
fn rules(&self) -> LifecycleRules;
|
||||
|
||||
/// Returns a list of chunks sorted in the order
|
||||
/// they should prioritised
|
||||
fn chunks(&self, order: &SortOrder) -> Vec<Arc<RwLock<Chunk>>>;
|
||||
|
||||
/// Returns a boolean indicating if a move is in progress
|
||||
fn is_move_active(&self) -> bool;
|
||||
|
||||
/// Starts an operation to move a chunk to the read buffer
|
||||
fn move_to_read_buffer(&mut self, partition_key: String, chunk_id: u32);
|
||||
|
||||
/// Drops a chunk from the database
|
||||
fn drop_chunk(&mut self, partition_key: String, chunk_id: u32);
|
||||
|
||||
/// The core policy logic
|
||||
fn check_for_work(&mut self, now: DateTime<Utc>) {
|
||||
let rules = self.rules();
|
||||
let chunks = self.chunks(&rules.sort_order);
|
||||
|
||||
let mut buffer_size = 0;
|
||||
|
||||
// Only want to start a new move task if there isn't one already in-flight
|
||||
//
|
||||
// Note: This does not take into account manually triggered tasks
|
||||
let mut move_active = self.is_move_active();
|
||||
|
||||
// Iterate through the chunks to determine
|
||||
// - total memory consumption
|
||||
// - any chunks to move
|
||||
|
||||
// TODO: Track size globally to avoid iterating through all chunks (#1100)
|
||||
for chunk in &chunks {
|
||||
let chunk_guard = chunk.upgradable_read();
|
||||
buffer_size += Self::chunk_size(&*chunk_guard);
|
||||
|
||||
if !move_active && can_move(&rules, &*chunk_guard, now) {
|
||||
match chunk_guard.state() {
|
||||
ChunkState::Open(_) => {
|
||||
let mut chunk_guard = RwLockUpgradableReadGuard::upgrade(chunk_guard);
|
||||
chunk_guard.set_closing().expect("cannot close open chunk");
|
||||
|
||||
let partition_key = chunk_guard.key().to_string();
|
||||
let chunk_id = chunk_guard.id();
|
||||
|
||||
std::mem::drop(chunk_guard);
|
||||
|
||||
move_active = true;
|
||||
self.move_to_read_buffer(partition_key, chunk_id);
|
||||
}
|
||||
ChunkState::Closing(_) => {
|
||||
let partition_key = chunk_guard.key().to_string();
|
||||
let chunk_id = chunk_guard.id();
|
||||
|
||||
std::mem::drop(chunk_guard);
|
||||
|
||||
move_active = true;
|
||||
self.move_to_read_buffer(partition_key, chunk_id);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: Find and recover cancelled move jobs (#1099)
|
||||
}
|
||||
|
||||
if let Some(soft_limit) = rules.buffer_size_soft {
|
||||
let mut chunks = chunks.iter();
|
||||
|
||||
while buffer_size > soft_limit.get() {
|
||||
match chunks.next() {
|
||||
Some(chunk) => {
|
||||
let chunk_guard = chunk.read();
|
||||
if rules.drop_non_persisted
|
||||
|| matches!(chunk_guard.state(), ChunkState::Moved(_))
|
||||
{
|
||||
let partition_key = chunk_guard.key().to_string();
|
||||
let chunk_id = chunk_guard.id();
|
||||
buffer_size =
|
||||
buffer_size.saturating_sub(Self::chunk_size(&*chunk_guard));
|
||||
|
||||
std::mem::drop(chunk_guard);
|
||||
|
||||
self.drop_chunk(partition_key, chunk_id)
|
||||
}
|
||||
}
|
||||
None => {
|
||||
error!("failed to find chunk to evict");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ChunkMover for LifecycleManager {
|
||||
fn rules(&self) -> LifecycleRules {
|
||||
self.db.rules.read().lifecycle_rules.clone()
|
||||
}
|
||||
|
||||
fn chunks(&self, sort_order: &SortOrder) -> Vec<Arc<RwLock<Chunk>>> {
|
||||
self.db.catalog.chunks_sorted_by(sort_order)
|
||||
}
|
||||
|
||||
fn is_move_active(&self) -> bool {
|
||||
self.move_task
|
||||
.as_ref()
|
||||
.map(|x| !x.is_complete())
|
||||
.unwrap_or(false)
|
||||
}
|
||||
|
||||
fn move_to_read_buffer(&mut self, partition_key: String, chunk_id: u32) {
|
||||
info!(%partition_key, %chunk_id, "moving chunk to read buffer");
|
||||
self.move_task = Some(
|
||||
self.db
|
||||
.load_chunk_to_read_buffer_in_background(partition_key, chunk_id),
|
||||
)
|
||||
}
|
||||
|
||||
fn drop_chunk(&mut self, partition_key: String, chunk_id: u32) {
|
||||
info!(%partition_key, %chunk_id, "dropping chunk");
|
||||
let _ = self
|
||||
.db
|
||||
.drop_chunk(&partition_key, chunk_id)
|
||||
.log_if_error("dropping chunk to free up memory");
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the number of seconds between two times
|
||||
///
|
||||
/// Computes a - b
|
||||
fn elapsed_seconds(a: DateTime<Utc>, b: DateTime<Utc>) -> u32 {
|
||||
let seconds = (a - b).num_seconds();
|
||||
if seconds < 0 {
|
||||
0 // This can occur as DateTime is not monotonic
|
||||
} else {
|
||||
seconds.try_into().unwrap_or(u32::max_value())
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns if the chunk is sufficiently cold and old to move
|
||||
///
|
||||
/// Note: Does not check the chunk is the correct state
|
||||
fn can_move(rules: &LifecycleRules, chunk: &Chunk, now: DateTime<Utc>) -> bool {
|
||||
match (rules.mutable_linger_seconds, chunk.time_of_last_write()) {
|
||||
(Some(linger), Some(last_write)) if elapsed_seconds(now, last_write) >= linger.get() => {
|
||||
match (
|
||||
rules.mutable_minimum_age_seconds,
|
||||
chunk.time_of_first_write(),
|
||||
) {
|
||||
(Some(min_age), Some(first_write)) => {
|
||||
// Chunk can be moved if it is old enough
|
||||
elapsed_seconds(now, first_write) >= min_age.get()
|
||||
}
|
||||
// If no minimum age set - permit chunk movement
|
||||
(None, _) => true,
|
||||
(_, None) => unreachable!("chunk with last write and no first write"),
|
||||
}
|
||||
}
|
||||
|
||||
// Disable movement if no mutable_linger set,
|
||||
// or the chunk is empty, or the linger hasn't expired
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::num::{NonZeroU32, NonZeroUsize};
|
||||
|
||||
fn from_secs(secs: i64) -> DateTime<Utc> {
|
||||
DateTime::from_utc(chrono::NaiveDateTime::from_timestamp(secs, 0), Utc)
|
||||
}
|
||||
|
||||
fn new_chunk(
|
||||
id: u32,
|
||||
time_of_first_write: Option<i64>,
|
||||
time_of_last_write: Option<i64>,
|
||||
) -> Chunk {
|
||||
let mut chunk = Chunk::new_open("", id);
|
||||
chunk.set_timestamps(
|
||||
time_of_first_write.map(from_secs),
|
||||
time_of_last_write.map(from_secs),
|
||||
);
|
||||
chunk
|
||||
}
|
||||
|
||||
#[derive(Debug, Eq, PartialEq)]
|
||||
enum MoverEvents {
|
||||
Move(u32),
|
||||
Drop(u32),
|
||||
}
|
||||
|
||||
/// A dummy mover that is used to test the policy
|
||||
/// logic within ChunkMover::poll
|
||||
struct DummyMover {
|
||||
rules: LifecycleRules,
|
||||
move_active: bool,
|
||||
chunks: Vec<Arc<RwLock<Chunk>>>,
|
||||
events: Vec<MoverEvents>,
|
||||
}
|
||||
|
||||
impl DummyMover {
|
||||
fn new(rules: LifecycleRules, chunks: Vec<Chunk>) -> Self {
|
||||
Self {
|
||||
rules,
|
||||
chunks: chunks
|
||||
.into_iter()
|
||||
.map(|x| Arc::new(RwLock::new(x)))
|
||||
.collect(),
|
||||
move_active: false,
|
||||
events: vec![],
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ChunkMover for DummyMover {
|
||||
fn chunk_size(_: &Chunk) -> usize {
|
||||
// All chunks are 20 bytes
|
||||
20
|
||||
}
|
||||
|
||||
fn rules(&self) -> LifecycleRules {
|
||||
self.rules.clone()
|
||||
}
|
||||
|
||||
fn chunks(&self, _: &SortOrder) -> Vec<Arc<RwLock<Chunk>>> {
|
||||
self.chunks.clone()
|
||||
}
|
||||
|
||||
fn is_move_active(&self) -> bool {
|
||||
self.move_active
|
||||
}
|
||||
|
||||
fn move_to_read_buffer(&mut self, _: String, chunk_id: u32) {
|
||||
let chunk = self
|
||||
.chunks
|
||||
.iter()
|
||||
.find(|x| x.read().id() == chunk_id)
|
||||
.unwrap();
|
||||
chunk.write().set_moving().unwrap();
|
||||
self.events.push(MoverEvents::Move(chunk_id))
|
||||
}
|
||||
|
||||
fn drop_chunk(&mut self, _: String, chunk_id: u32) {
|
||||
self.events.push(MoverEvents::Drop(chunk_id))
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_elapsed_seconds() {
|
||||
assert_eq!(elapsed_seconds(from_secs(10), from_secs(5)), 5);
|
||||
assert_eq!(elapsed_seconds(from_secs(10), from_secs(10)), 0);
|
||||
assert_eq!(elapsed_seconds(from_secs(10), from_secs(15)), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_can_move() {
|
||||
// Cannot move by default
|
||||
let rules = LifecycleRules::default();
|
||||
let chunk = new_chunk(0, Some(0), Some(0));
|
||||
assert!(!can_move(&rules, &chunk, from_secs(20)));
|
||||
|
||||
// If only mutable_linger set can move a chunk once passed
|
||||
let rules = LifecycleRules {
|
||||
mutable_linger_seconds: Some(NonZeroU32::new(10).unwrap()),
|
||||
..Default::default()
|
||||
};
|
||||
let chunk = new_chunk(0, Some(0), Some(0));
|
||||
assert!(!can_move(&rules, &chunk, from_secs(9)));
|
||||
assert!(can_move(&rules, &chunk, from_secs(11)));
|
||||
|
||||
// If mutable_minimum_age_seconds set must also take this into account
|
||||
let rules = LifecycleRules {
|
||||
mutable_linger_seconds: Some(NonZeroU32::new(10).unwrap()),
|
||||
mutable_minimum_age_seconds: Some(NonZeroU32::new(60).unwrap()),
|
||||
..Default::default()
|
||||
};
|
||||
let chunk = new_chunk(0, Some(0), Some(0));
|
||||
assert!(!can_move(&rules, &chunk, from_secs(9)));
|
||||
assert!(!can_move(&rules, &chunk, from_secs(11)));
|
||||
assert!(can_move(&rules, &chunk, from_secs(61)));
|
||||
|
||||
let chunk = new_chunk(0, Some(0), Some(70));
|
||||
assert!(!can_move(&rules, &chunk, from_secs(71)));
|
||||
assert!(can_move(&rules, &chunk, from_secs(81)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_default_rules() {
|
||||
// The default rules shouldn't do anything
|
||||
let rules = LifecycleRules::default();
|
||||
let chunks = vec![
|
||||
new_chunk(0, Some(1), Some(1)),
|
||||
new_chunk(1, Some(20), Some(1)),
|
||||
new_chunk(2, Some(30), Some(1)),
|
||||
];
|
||||
|
||||
let mut mover = DummyMover::new(rules, chunks);
|
||||
mover.check_for_work(from_secs(40));
|
||||
assert_eq!(mover.events, vec![]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_mutable_linger() {
|
||||
let rules = LifecycleRules {
|
||||
mutable_linger_seconds: Some(NonZeroU32::new(10).unwrap()),
|
||||
..Default::default()
|
||||
};
|
||||
let chunks = vec![
|
||||
new_chunk(0, Some(0), Some(8)),
|
||||
new_chunk(1, Some(0), Some(5)),
|
||||
new_chunk(2, Some(0), Some(0)),
|
||||
];
|
||||
|
||||
let mut mover = DummyMover::new(rules, chunks);
|
||||
mover.check_for_work(from_secs(9));
|
||||
|
||||
assert_eq!(mover.events, vec![]);
|
||||
|
||||
mover.check_for_work(from_secs(11));
|
||||
assert_eq!(mover.events, vec![MoverEvents::Move(2)]);
|
||||
|
||||
mover.check_for_work(from_secs(12));
|
||||
assert_eq!(mover.events, vec![MoverEvents::Move(2)]);
|
||||
|
||||
// Should move in the order of chunks in the case of multiple candidates
|
||||
mover.check_for_work(from_secs(20));
|
||||
assert_eq!(
|
||||
mover.events,
|
||||
vec![MoverEvents::Move(2), MoverEvents::Move(0)]
|
||||
);
|
||||
|
||||
mover.check_for_work(from_secs(20));
|
||||
|
||||
assert_eq!(
|
||||
mover.events,
|
||||
vec![
|
||||
MoverEvents::Move(2),
|
||||
MoverEvents::Move(0),
|
||||
MoverEvents::Move(1)
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_in_progress() {
|
||||
let rules = LifecycleRules {
|
||||
mutable_linger_seconds: Some(NonZeroU32::new(10).unwrap()),
|
||||
..Default::default()
|
||||
};
|
||||
let chunks = vec![new_chunk(0, Some(0), Some(0))];
|
||||
|
||||
let mut mover = DummyMover::new(rules, chunks);
|
||||
mover.move_active = true;
|
||||
|
||||
mover.check_for_work(from_secs(80));
|
||||
|
||||
assert_eq!(mover.events, vec![]);
|
||||
|
||||
mover.move_active = false;
|
||||
|
||||
mover.check_for_work(from_secs(80));
|
||||
|
||||
assert_eq!(mover.events, vec![MoverEvents::Move(0)]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_minimum_age() {
|
||||
let rules = LifecycleRules {
|
||||
mutable_linger_seconds: Some(NonZeroU32::new(10).unwrap()),
|
||||
mutable_minimum_age_seconds: Some(NonZeroU32::new(60).unwrap()),
|
||||
..Default::default()
|
||||
};
|
||||
let chunks = vec![
|
||||
new_chunk(0, Some(40), Some(40)),
|
||||
new_chunk(1, Some(0), Some(0)),
|
||||
];
|
||||
|
||||
let mut mover = DummyMover::new(rules, chunks);
|
||||
|
||||
// Expect to move chunk_id=1 first, despite it coming second in
|
||||
// the order, because chunk_id=0 will only become old enough at t=100
|
||||
|
||||
mover.check_for_work(from_secs(80));
|
||||
assert_eq!(mover.events, vec![MoverEvents::Move(1)]);
|
||||
|
||||
mover.check_for_work(from_secs(90));
|
||||
assert_eq!(mover.events, vec![MoverEvents::Move(1)]);
|
||||
|
||||
mover.check_for_work(from_secs(110));
|
||||
|
||||
assert_eq!(
|
||||
mover.events,
|
||||
vec![MoverEvents::Move(1), MoverEvents::Move(0)]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_buffer_size_soft() {
|
||||
let rules = LifecycleRules {
|
||||
buffer_size_soft: Some(NonZeroUsize::new(5).unwrap()),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let rb = Arc::new(read_buffer::Database::new());
|
||||
|
||||
let chunks = vec![new_chunk(0, Some(0), Some(0))];
|
||||
|
||||
let mut mover = DummyMover::new(rules.clone(), chunks);
|
||||
|
||||
mover.check_for_work(from_secs(10));
|
||||
assert_eq!(mover.events, vec![]);
|
||||
|
||||
let mut chunks = vec![
|
||||
new_chunk(0, Some(0), Some(0)),
|
||||
new_chunk(1, Some(0), Some(0)),
|
||||
new_chunk(2, Some(0), Some(0)),
|
||||
];
|
||||
|
||||
chunks[2].set_closing().unwrap();
|
||||
chunks[2].set_moving().unwrap();
|
||||
chunks[2].set_moved(Arc::clone(&rb)).unwrap();
|
||||
|
||||
let mut mover = DummyMover::new(rules, chunks);
|
||||
|
||||
mover.check_for_work(from_secs(10));
|
||||
assert_eq!(mover.events, vec![MoverEvents::Drop(2)]);
|
||||
|
||||
let rules = LifecycleRules {
|
||||
buffer_size_soft: Some(NonZeroUsize::new(40).unwrap()),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let chunks = vec![new_chunk(0, Some(0), Some(0))];
|
||||
|
||||
let mut mover = DummyMover::new(rules, chunks);
|
||||
|
||||
mover.check_for_work(from_secs(10));
|
||||
assert_eq!(mover.events, vec![]);
|
||||
}
|
||||
}
|
|
@ -77,7 +77,7 @@ use bytes::BytesMut;
|
|||
use futures::stream::TryStreamExt;
|
||||
use parking_lot::Mutex;
|
||||
use snafu::{OptionExt, ResultExt, Snafu};
|
||||
use tracing::{debug, error, info, warn};
|
||||
use tracing::{error, info, warn};
|
||||
|
||||
use data_types::{
|
||||
database_rules::{DatabaseRules, WriterId},
|
||||
|
@ -463,28 +463,7 @@ impl<M: ConnectionManager> Server<M> {
|
|||
.db(&name)
|
||||
.context(DatabaseNotFound { db_name: &db_name })?;
|
||||
|
||||
let (tracker, registration) = self.jobs.register(Job::CloseChunk {
|
||||
db_name: db_name.clone(),
|
||||
partition_key: partition_key.clone(),
|
||||
chunk_id,
|
||||
});
|
||||
|
||||
let task = async move {
|
||||
debug!(%db_name, %partition_key, %chunk_id, "background task loading chunk to read buffer");
|
||||
let result = db.load_chunk_to_read_buffer(&partition_key, chunk_id).await;
|
||||
if let Err(e) = result {
|
||||
info!(?e, %db_name, %partition_key, %chunk_id, "background task error loading read buffer chunk");
|
||||
return Err(e);
|
||||
}
|
||||
|
||||
debug!(%db_name, %partition_key, %chunk_id, "background task completed closing chunk");
|
||||
|
||||
Ok(())
|
||||
};
|
||||
|
||||
tokio::spawn(task.track(registration));
|
||||
|
||||
Ok(tracker)
|
||||
Ok(db.load_chunk_to_read_buffer_in_background(partition_key, chunk_id))
|
||||
}
|
||||
|
||||
/// Returns a list of all jobs tracked by this server
|
||||
|
|
|
@ -621,6 +621,53 @@ async fn test_close_partition_chunk_error() {
|
|||
assert_contains!(err.to_string(), "Database not found");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_chunk_lifecycle() {
|
||||
use influxdb_iox_client::management::generated_types::ChunkStorage;
|
||||
|
||||
let fixture = ServerFixture::create_shared().await;
|
||||
let mut management_client = fixture.management_client();
|
||||
let mut write_client = fixture.write_client();
|
||||
|
||||
let db_name = rand_name();
|
||||
management_client
|
||||
.create_database(DatabaseRules {
|
||||
name: db_name.clone(),
|
||||
lifecycle_rules: Some(LifecycleRules {
|
||||
mutable_linger_seconds: 1,
|
||||
..Default::default()
|
||||
}),
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let lp_lines = vec!["cpu,region=west user=23.2 100"];
|
||||
|
||||
write_client
|
||||
.write(&db_name, lp_lines.join("\n"))
|
||||
.await
|
||||
.expect("write succeded");
|
||||
|
||||
let chunks = management_client
|
||||
.list_chunks(&db_name)
|
||||
.await
|
||||
.expect("listing chunks");
|
||||
|
||||
assert_eq!(chunks.len(), 1);
|
||||
assert_eq!(chunks[0].storage, ChunkStorage::OpenMutableBuffer as i32);
|
||||
|
||||
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
|
||||
|
||||
let chunks = management_client
|
||||
.list_chunks(&db_name)
|
||||
.await
|
||||
.expect("listing chunks");
|
||||
|
||||
assert_eq!(chunks.len(), 1);
|
||||
assert_eq!(chunks[0].storage, ChunkStorage::ReadBuffer as i32);
|
||||
}
|
||||
|
||||
/// Normalizes a set of Chunks for comparison by removing timestamps
|
||||
fn normalize_chunks(chunks: Vec<Chunk>) -> Vec<Chunk> {
|
||||
chunks
|
||||
|
|
Loading…
Reference in New Issue