diff --git a/data_types/src/database_rules.rs b/data_types/src/database_rules.rs index 98d4a2d26c..9e9301592c 100644 --- a/data_types/src/database_rules.rs +++ b/data_types/src/database_rules.rs @@ -177,6 +177,8 @@ pub struct LifecycleRules { /// Once the amount of data in memory reaches this size start /// rejecting writes + /// + /// TODO: Implement this limit pub buffer_size_hard: Option, /// Configure order to transition data @@ -364,7 +366,7 @@ pub enum Order { impl Default for Order { fn default() -> Self { - Self::Desc + Self::Asc } } diff --git a/server/Cargo.toml b/server/Cargo.toml index 5673fe0baa..aaeeceb967 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -7,7 +7,7 @@ edition = "2018" [dependencies] # In alphabetical order arrow_deps = { path = "../arrow_deps" } async-trait = "0.1" -bytes = { version = "1.0", features = ["serde"] } +bytes = { version = "1.0" } chrono = "0.4" crc32fast = "1.2.0" data_types = { path = "../data_types" } diff --git a/server/src/db.rs b/server/src/db.rs index e9903d90ba..843498e62f 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -23,10 +23,14 @@ use read_buffer::Database as ReadBufferDb; pub(crate) use chunk::DBChunk; +use crate::db::lifecycle::LifecycleManager; +use crate::tracker::{TrackedFutureExt, Tracker}; use crate::{buffer::Buffer, JobRegistry}; +use data_types::job::Job; pub mod catalog; mod chunk; +mod lifecycle; pub mod pred; mod streams; @@ -369,6 +373,40 @@ impl Db { Ok(DBChunk::snapshot(&chunk)) } + /// Spawns a task to perform load_chunk_to_read_buffer + pub fn load_chunk_to_read_buffer_in_background( + self: &Arc, + partition_key: String, + chunk_id: u32, + ) -> Tracker { + let name = self.rules.read().name.clone(); + let (tracker, registration) = self.jobs.register(Job::CloseChunk { + db_name: name.clone(), + partition_key: partition_key.clone(), + chunk_id, + }); + + let captured = Arc::clone(&self); + let task = async move { + debug!(%name, %partition_key, %chunk_id, "background task loading chunk to read buffer"); + let result = captured + .load_chunk_to_read_buffer(&partition_key, chunk_id) + .await; + if let Err(e) = result { + info!(?e, %name, %partition_key, %chunk_id, "background task error loading read buffer chunk"); + return Err(e); + } + + debug!(%name, %partition_key, %chunk_id, "background task completed closing chunk"); + + Ok(()) + }; + + tokio::spawn(task.track(registration)); + + tracker + } + /// Returns the next write sequence number pub fn next_sequence(&self) -> u64 { self.sequence.fetch_add(1, Ordering::SeqCst) @@ -420,14 +458,20 @@ impl Db { } /// Background worker function - pub async fn background_worker(&self, shutdown: tokio_util::sync::CancellationToken) { + pub async fn background_worker( + self: &Arc, + shutdown: tokio_util::sync::CancellationToken, + ) { info!("started background worker"); let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(1)); + let mut lifecycle_manager = LifecycleManager::new(Arc::clone(&self)); while !shutdown.is_cancelled() { self.worker_iterations.fetch_add(1, Ordering::Relaxed); + lifecycle_manager.check_for_work(); + tokio::select! { _ = interval.tick() => {}, _ = shutdown.cancelled() => break diff --git a/server/src/db/catalog/chunk.rs b/server/src/db/catalog/chunk.rs index 3a29cd5bda..275f3753a4 100644 --- a/server/src/db/catalog/chunk.rs +++ b/server/src/db/catalog/chunk.rs @@ -25,9 +25,6 @@ pub enum ChunkState { /// Chunk can still accept new writes, but will likely be closed soon Closing(MBChunk), - /// Chunk is closed for new writes and has become read only - Closed(Arc), - /// Chunk is closed for new writes, and is actively moving to the read /// buffer Moving(Arc), @@ -42,7 +39,6 @@ impl ChunkState { Self::Invalid => "Invalid", Self::Open(_) => "Open", Self::Closing(_) => "Closing", - Self::Closed(_) => "Closed", Self::Moving(_) => "Moving", Self::Moved(_) => "Moved", } @@ -91,7 +87,7 @@ macro_rules! unexpected_state { } impl Chunk { - /// Create a new chunk in the Open state + /// Create a new chunk in the provided state pub(crate) fn new(partition_key: impl Into, id: u32, state: ChunkState) -> Self { Self { partition_key: Arc::new(partition_key.into()), @@ -103,6 +99,23 @@ impl Chunk { } } + /// Creates a new open chunk + pub(crate) fn new_open(partition_key: impl Into, id: u32) -> Self { + let state = ChunkState::Open(mutable_buffer::chunk::Chunk::new(id)); + Self::new(partition_key, id, state) + } + + /// Used for testing + #[cfg(test)] + pub(crate) fn set_timestamps( + &mut self, + time_of_first_write: Option>, + time_of_last_write: Option>, + ) { + self.time_of_first_write = time_of_first_write; + self.time_of_last_write = time_of_last_write; + } + pub fn id(&self) -> u32 { self.id } @@ -156,7 +169,7 @@ impl Chunk { match &self.state { ChunkState::Invalid => false, ChunkState::Open(chunk) | ChunkState::Closing(chunk) => chunk.has_table(table_name), - ChunkState::Moving(chunk) | ChunkState::Closed(chunk) => chunk.has_table(table_name), + ChunkState::Moving(chunk) => chunk.has_table(table_name), ChunkState::Moved(db) => { db.has_table(self.partition_key.as_str(), table_name, &[self.id]) } @@ -168,13 +181,26 @@ impl Chunk { match &self.state { ChunkState::Invalid => {} ChunkState::Open(chunk) | ChunkState::Closing(chunk) => chunk.all_table_names(names), - ChunkState::Moving(chunk) | ChunkState::Closed(chunk) => chunk.all_table_names(names), + ChunkState::Moving(chunk) => chunk.all_table_names(names), ChunkState::Moved(db) => { db.all_table_names(self.partition_key.as_str(), &[self.id], names) } } } + /// Returns an approximation of the amount of process memory consumed by the + /// chunk + pub fn size(&self) -> usize { + match &self.state { + ChunkState::Invalid => 0, + ChunkState::Open(chunk) | ChunkState::Closing(chunk) => chunk.size(), + ChunkState::Moving(chunk) => chunk.size(), + ChunkState::Moved(db) => db + .chunks_size(self.partition_key.as_str(), &[self.id]) + .unwrap_or(0) as usize, + } + } + /// Returns a mutable reference to the mutable buffer storage for /// chunks in the Open or Closing state /// @@ -218,18 +244,9 @@ impl Chunk { self.state = ChunkState::Moving(Arc::clone(&chunk)); Ok(chunk) } - ChunkState::Closed(chunk) => { - self.state = ChunkState::Moving(Arc::clone(&chunk)); - Ok(chunk) - } state => { self.state = state; - unexpected_state!( - self, - "setting moving", - "Open, Closing or Closed", - &self.state - ) + unexpected_state!(self, "setting moving", "Open or Closing", &self.state) } } } diff --git a/server/src/db/catalog/partition.rs b/server/src/db/catalog/partition.rs index d3d18d8f56..0b9d786066 100644 --- a/server/src/db/catalog/partition.rs +++ b/server/src/db/catalog/partition.rs @@ -78,8 +78,7 @@ impl Partition { let chunk_id = self.next_chunk_id; self.next_chunk_id += 1; - let state = ChunkState::Open(mutable_buffer::chunk::Chunk::new(chunk_id)); - let chunk = Arc::new(RwLock::new(Chunk::new(&self.key, chunk_id, state))); + let chunk = Arc::new(RwLock::new(Chunk::new_open(&self.key, chunk_id))); if self.chunks.insert(chunk_id, Arc::clone(&chunk)).is_some() { // A fundamental invariant has been violated - abort diff --git a/server/src/db/chunk.rs b/server/src/db/chunk.rs index c9c29cad08..61cc9fa8ce 100644 --- a/server/src/db/chunk.rs +++ b/server/src/db/chunk.rs @@ -77,11 +77,13 @@ impl DBChunk { let partition_key = Arc::new(chunk.key().to_string()); let chunk_id = chunk.id(); + use super::catalog::chunk::ChunkState; + let db_chunk = match chunk.state() { - super::catalog::chunk::ChunkState::Invalid => { + ChunkState::Invalid => { panic!("Invalid internal state"); } - super::catalog::chunk::ChunkState::Open(chunk) => { + ChunkState::Open(chunk) => { // TODO the performance if cloning the chunk is terrible // Proper performance is tracked in // https://github.com/influxdata/influxdb_iox/issues/635 @@ -92,7 +94,7 @@ impl DBChunk { open: true, } } - super::catalog::chunk::ChunkState::Closing(chunk) => { + ChunkState::Closing(chunk) => { // TODO the performance if cloning the chunk is terrible // Proper performance is tracked in // https://github.com/influxdata/influxdb_iox/issues/635 @@ -103,8 +105,7 @@ impl DBChunk { open: false, } } - super::catalog::chunk::ChunkState::Closed(chunk) - | super::catalog::chunk::ChunkState::Moving(chunk) => { + ChunkState::Moving(chunk) => { let chunk = Arc::clone(chunk); Self::MutableBuffer { chunk, @@ -112,7 +113,7 @@ impl DBChunk { open: false, } } - super::catalog::chunk::ChunkState::Moved(db) => { + ChunkState::Moved(db) => { let db = Arc::clone(db); Self::ReadBuffer { db, diff --git a/server/src/db/lifecycle.rs b/server/src/db/lifecycle.rs new file mode 100644 index 0000000000..445dbeced2 --- /dev/null +++ b/server/src/db/lifecycle.rs @@ -0,0 +1,495 @@ +use std::convert::TryInto; +use std::sync::Arc; + +use chrono::{DateTime, Utc}; +use parking_lot::{RwLock, RwLockUpgradableReadGuard}; +use tracing::{error, info}; + +use data_types::{database_rules::LifecycleRules, error::ErrorLogger, job::Job}; + +use crate::tracker::Tracker; + +use super::{ + catalog::chunk::{Chunk, ChunkState}, + Db, +}; +use data_types::database_rules::SortOrder; + +/// Handles the lifecycle of chunks within a Db +pub struct LifecycleManager { + db: Arc, + move_task: Option>, +} + +impl LifecycleManager { + pub fn new(db: Arc) -> Self { + Self { + db, + move_task: None, + } + } + + /// Polls the lifecycle manager to find and spawn work that needs + /// to be done as determined by the database's lifecycle policy + /// + /// Should be called periodically and should spawn any long-running + /// work onto the tokio threadpool and return + pub fn check_for_work(&mut self) { + ChunkMover::check_for_work(self, Utc::now()) + } +} + +/// A trait that encapsulates the core chunk lifecycle logic +/// +/// This is to enable independent testing of the policy logic +trait ChunkMover { + /// Returns the size of a chunk - overridden for testing + fn chunk_size(chunk: &Chunk) -> usize { + chunk.size() + } + + /// Returns the lifecycle policy + fn rules(&self) -> LifecycleRules; + + /// Returns a list of chunks sorted in the order + /// they should prioritised + fn chunks(&self, order: &SortOrder) -> Vec>>; + + /// Returns a boolean indicating if a move is in progress + fn is_move_active(&self) -> bool; + + /// Starts an operation to move a chunk to the read buffer + fn move_to_read_buffer(&mut self, partition_key: String, chunk_id: u32); + + /// Drops a chunk from the database + fn drop_chunk(&mut self, partition_key: String, chunk_id: u32); + + /// The core policy logic + fn check_for_work(&mut self, now: DateTime) { + let rules = self.rules(); + let chunks = self.chunks(&rules.sort_order); + + let mut buffer_size = 0; + + // Only want to start a new move task if there isn't one already in-flight + // + // Note: This does not take into account manually triggered tasks + let mut move_active = self.is_move_active(); + + // Iterate through the chunks to determine + // - total memory consumption + // - any chunks to move + + // TODO: Track size globally to avoid iterating through all chunks (#1100) + for chunk in &chunks { + let chunk_guard = chunk.upgradable_read(); + buffer_size += Self::chunk_size(&*chunk_guard); + + if !move_active && can_move(&rules, &*chunk_guard, now) { + match chunk_guard.state() { + ChunkState::Open(_) => { + let mut chunk_guard = RwLockUpgradableReadGuard::upgrade(chunk_guard); + chunk_guard.set_closing().expect("cannot close open chunk"); + + let partition_key = chunk_guard.key().to_string(); + let chunk_id = chunk_guard.id(); + + std::mem::drop(chunk_guard); + + move_active = true; + self.move_to_read_buffer(partition_key, chunk_id); + } + ChunkState::Closing(_) => { + let partition_key = chunk_guard.key().to_string(); + let chunk_id = chunk_guard.id(); + + std::mem::drop(chunk_guard); + + move_active = true; + self.move_to_read_buffer(partition_key, chunk_id); + } + _ => {} + } + } + + // TODO: Find and recover cancelled move jobs (#1099) + } + + if let Some(soft_limit) = rules.buffer_size_soft { + let mut chunks = chunks.iter(); + + while buffer_size > soft_limit.get() { + match chunks.next() { + Some(chunk) => { + let chunk_guard = chunk.read(); + if rules.drop_non_persisted + || matches!(chunk_guard.state(), ChunkState::Moved(_)) + { + let partition_key = chunk_guard.key().to_string(); + let chunk_id = chunk_guard.id(); + buffer_size = + buffer_size.saturating_sub(Self::chunk_size(&*chunk_guard)); + + std::mem::drop(chunk_guard); + + self.drop_chunk(partition_key, chunk_id) + } + } + None => { + error!("failed to find chunk to evict"); + break; + } + } + } + } + } +} + +impl ChunkMover for LifecycleManager { + fn rules(&self) -> LifecycleRules { + self.db.rules.read().lifecycle_rules.clone() + } + + fn chunks(&self, sort_order: &SortOrder) -> Vec>> { + self.db.catalog.chunks_sorted_by(sort_order) + } + + fn is_move_active(&self) -> bool { + self.move_task + .as_ref() + .map(|x| !x.is_complete()) + .unwrap_or(false) + } + + fn move_to_read_buffer(&mut self, partition_key: String, chunk_id: u32) { + info!(%partition_key, %chunk_id, "moving chunk to read buffer"); + self.move_task = Some( + self.db + .load_chunk_to_read_buffer_in_background(partition_key, chunk_id), + ) + } + + fn drop_chunk(&mut self, partition_key: String, chunk_id: u32) { + info!(%partition_key, %chunk_id, "dropping chunk"); + let _ = self + .db + .drop_chunk(&partition_key, chunk_id) + .log_if_error("dropping chunk to free up memory"); + } +} + +/// Returns the number of seconds between two times +/// +/// Computes a - b +fn elapsed_seconds(a: DateTime, b: DateTime) -> u32 { + let seconds = (a - b).num_seconds(); + if seconds < 0 { + 0 // This can occur as DateTime is not monotonic + } else { + seconds.try_into().unwrap_or(u32::max_value()) + } +} + +/// Returns if the chunk is sufficiently cold and old to move +/// +/// Note: Does not check the chunk is the correct state +fn can_move(rules: &LifecycleRules, chunk: &Chunk, now: DateTime) -> bool { + match (rules.mutable_linger_seconds, chunk.time_of_last_write()) { + (Some(linger), Some(last_write)) if elapsed_seconds(now, last_write) >= linger.get() => { + match ( + rules.mutable_minimum_age_seconds, + chunk.time_of_first_write(), + ) { + (Some(min_age), Some(first_write)) => { + // Chunk can be moved if it is old enough + elapsed_seconds(now, first_write) >= min_age.get() + } + // If no minimum age set - permit chunk movement + (None, _) => true, + (_, None) => unreachable!("chunk with last write and no first write"), + } + } + + // Disable movement if no mutable_linger set, + // or the chunk is empty, or the linger hasn't expired + _ => false, + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::num::{NonZeroU32, NonZeroUsize}; + + fn from_secs(secs: i64) -> DateTime { + DateTime::from_utc(chrono::NaiveDateTime::from_timestamp(secs, 0), Utc) + } + + fn new_chunk( + id: u32, + time_of_first_write: Option, + time_of_last_write: Option, + ) -> Chunk { + let mut chunk = Chunk::new_open("", id); + chunk.set_timestamps( + time_of_first_write.map(from_secs), + time_of_last_write.map(from_secs), + ); + chunk + } + + #[derive(Debug, Eq, PartialEq)] + enum MoverEvents { + Move(u32), + Drop(u32), + } + + /// A dummy mover that is used to test the policy + /// logic within ChunkMover::poll + struct DummyMover { + rules: LifecycleRules, + move_active: bool, + chunks: Vec>>, + events: Vec, + } + + impl DummyMover { + fn new(rules: LifecycleRules, chunks: Vec) -> Self { + Self { + rules, + chunks: chunks + .into_iter() + .map(|x| Arc::new(RwLock::new(x))) + .collect(), + move_active: false, + events: vec![], + } + } + } + + impl ChunkMover for DummyMover { + fn chunk_size(_: &Chunk) -> usize { + // All chunks are 20 bytes + 20 + } + + fn rules(&self) -> LifecycleRules { + self.rules.clone() + } + + fn chunks(&self, _: &SortOrder) -> Vec>> { + self.chunks.clone() + } + + fn is_move_active(&self) -> bool { + self.move_active + } + + fn move_to_read_buffer(&mut self, _: String, chunk_id: u32) { + let chunk = self + .chunks + .iter() + .find(|x| x.read().id() == chunk_id) + .unwrap(); + chunk.write().set_moving().unwrap(); + self.events.push(MoverEvents::Move(chunk_id)) + } + + fn drop_chunk(&mut self, _: String, chunk_id: u32) { + self.events.push(MoverEvents::Drop(chunk_id)) + } + } + + #[test] + fn test_elapsed_seconds() { + assert_eq!(elapsed_seconds(from_secs(10), from_secs(5)), 5); + assert_eq!(elapsed_seconds(from_secs(10), from_secs(10)), 0); + assert_eq!(elapsed_seconds(from_secs(10), from_secs(15)), 0); + } + + #[test] + fn test_can_move() { + // Cannot move by default + let rules = LifecycleRules::default(); + let chunk = new_chunk(0, Some(0), Some(0)); + assert!(!can_move(&rules, &chunk, from_secs(20))); + + // If only mutable_linger set can move a chunk once passed + let rules = LifecycleRules { + mutable_linger_seconds: Some(NonZeroU32::new(10).unwrap()), + ..Default::default() + }; + let chunk = new_chunk(0, Some(0), Some(0)); + assert!(!can_move(&rules, &chunk, from_secs(9))); + assert!(can_move(&rules, &chunk, from_secs(11))); + + // If mutable_minimum_age_seconds set must also take this into account + let rules = LifecycleRules { + mutable_linger_seconds: Some(NonZeroU32::new(10).unwrap()), + mutable_minimum_age_seconds: Some(NonZeroU32::new(60).unwrap()), + ..Default::default() + }; + let chunk = new_chunk(0, Some(0), Some(0)); + assert!(!can_move(&rules, &chunk, from_secs(9))); + assert!(!can_move(&rules, &chunk, from_secs(11))); + assert!(can_move(&rules, &chunk, from_secs(61))); + + let chunk = new_chunk(0, Some(0), Some(70)); + assert!(!can_move(&rules, &chunk, from_secs(71))); + assert!(can_move(&rules, &chunk, from_secs(81))); + } + + #[test] + fn test_default_rules() { + // The default rules shouldn't do anything + let rules = LifecycleRules::default(); + let chunks = vec![ + new_chunk(0, Some(1), Some(1)), + new_chunk(1, Some(20), Some(1)), + new_chunk(2, Some(30), Some(1)), + ]; + + let mut mover = DummyMover::new(rules, chunks); + mover.check_for_work(from_secs(40)); + assert_eq!(mover.events, vec![]); + } + + #[test] + fn test_mutable_linger() { + let rules = LifecycleRules { + mutable_linger_seconds: Some(NonZeroU32::new(10).unwrap()), + ..Default::default() + }; + let chunks = vec![ + new_chunk(0, Some(0), Some(8)), + new_chunk(1, Some(0), Some(5)), + new_chunk(2, Some(0), Some(0)), + ]; + + let mut mover = DummyMover::new(rules, chunks); + mover.check_for_work(from_secs(9)); + + assert_eq!(mover.events, vec![]); + + mover.check_for_work(from_secs(11)); + assert_eq!(mover.events, vec![MoverEvents::Move(2)]); + + mover.check_for_work(from_secs(12)); + assert_eq!(mover.events, vec![MoverEvents::Move(2)]); + + // Should move in the order of chunks in the case of multiple candidates + mover.check_for_work(from_secs(20)); + assert_eq!( + mover.events, + vec![MoverEvents::Move(2), MoverEvents::Move(0)] + ); + + mover.check_for_work(from_secs(20)); + + assert_eq!( + mover.events, + vec![ + MoverEvents::Move(2), + MoverEvents::Move(0), + MoverEvents::Move(1) + ] + ); + } + + #[test] + fn test_in_progress() { + let rules = LifecycleRules { + mutable_linger_seconds: Some(NonZeroU32::new(10).unwrap()), + ..Default::default() + }; + let chunks = vec![new_chunk(0, Some(0), Some(0))]; + + let mut mover = DummyMover::new(rules, chunks); + mover.move_active = true; + + mover.check_for_work(from_secs(80)); + + assert_eq!(mover.events, vec![]); + + mover.move_active = false; + + mover.check_for_work(from_secs(80)); + + assert_eq!(mover.events, vec![MoverEvents::Move(0)]); + } + + #[test] + fn test_minimum_age() { + let rules = LifecycleRules { + mutable_linger_seconds: Some(NonZeroU32::new(10).unwrap()), + mutable_minimum_age_seconds: Some(NonZeroU32::new(60).unwrap()), + ..Default::default() + }; + let chunks = vec![ + new_chunk(0, Some(40), Some(40)), + new_chunk(1, Some(0), Some(0)), + ]; + + let mut mover = DummyMover::new(rules, chunks); + + // Expect to move chunk_id=1 first, despite it coming second in + // the order, because chunk_id=0 will only become old enough at t=100 + + mover.check_for_work(from_secs(80)); + assert_eq!(mover.events, vec![MoverEvents::Move(1)]); + + mover.check_for_work(from_secs(90)); + assert_eq!(mover.events, vec![MoverEvents::Move(1)]); + + mover.check_for_work(from_secs(110)); + + assert_eq!( + mover.events, + vec![MoverEvents::Move(1), MoverEvents::Move(0)] + ); + } + + #[test] + fn test_buffer_size_soft() { + let rules = LifecycleRules { + buffer_size_soft: Some(NonZeroUsize::new(5).unwrap()), + ..Default::default() + }; + + let rb = Arc::new(read_buffer::Database::new()); + + let chunks = vec![new_chunk(0, Some(0), Some(0))]; + + let mut mover = DummyMover::new(rules.clone(), chunks); + + mover.check_for_work(from_secs(10)); + assert_eq!(mover.events, vec![]); + + let mut chunks = vec![ + new_chunk(0, Some(0), Some(0)), + new_chunk(1, Some(0), Some(0)), + new_chunk(2, Some(0), Some(0)), + ]; + + chunks[2].set_closing().unwrap(); + chunks[2].set_moving().unwrap(); + chunks[2].set_moved(Arc::clone(&rb)).unwrap(); + + let mut mover = DummyMover::new(rules, chunks); + + mover.check_for_work(from_secs(10)); + assert_eq!(mover.events, vec![MoverEvents::Drop(2)]); + + let rules = LifecycleRules { + buffer_size_soft: Some(NonZeroUsize::new(40).unwrap()), + ..Default::default() + }; + + let chunks = vec![new_chunk(0, Some(0), Some(0))]; + + let mut mover = DummyMover::new(rules, chunks); + + mover.check_for_work(from_secs(10)); + assert_eq!(mover.events, vec![]); + } +} diff --git a/server/src/lib.rs b/server/src/lib.rs index 6b5d5424ff..5eda9570d0 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -77,7 +77,7 @@ use bytes::BytesMut; use futures::stream::TryStreamExt; use parking_lot::Mutex; use snafu::{OptionExt, ResultExt, Snafu}; -use tracing::{debug, error, info, warn}; +use tracing::{error, info, warn}; use data_types::{ database_rules::{DatabaseRules, WriterId}, @@ -463,28 +463,7 @@ impl Server { .db(&name) .context(DatabaseNotFound { db_name: &db_name })?; - let (tracker, registration) = self.jobs.register(Job::CloseChunk { - db_name: db_name.clone(), - partition_key: partition_key.clone(), - chunk_id, - }); - - let task = async move { - debug!(%db_name, %partition_key, %chunk_id, "background task loading chunk to read buffer"); - let result = db.load_chunk_to_read_buffer(&partition_key, chunk_id).await; - if let Err(e) = result { - info!(?e, %db_name, %partition_key, %chunk_id, "background task error loading read buffer chunk"); - return Err(e); - } - - debug!(%db_name, %partition_key, %chunk_id, "background task completed closing chunk"); - - Ok(()) - }; - - tokio::spawn(task.track(registration)); - - Ok(tracker) + Ok(db.load_chunk_to_read_buffer_in_background(partition_key, chunk_id)) } /// Returns a list of all jobs tracked by this server diff --git a/tests/end_to_end_cases/management_api.rs b/tests/end_to_end_cases/management_api.rs index 884dd48d57..28e1a6ed12 100644 --- a/tests/end_to_end_cases/management_api.rs +++ b/tests/end_to_end_cases/management_api.rs @@ -621,6 +621,53 @@ async fn test_close_partition_chunk_error() { assert_contains!(err.to_string(), "Database not found"); } +#[tokio::test] +async fn test_chunk_lifecycle() { + use influxdb_iox_client::management::generated_types::ChunkStorage; + + let fixture = ServerFixture::create_shared().await; + let mut management_client = fixture.management_client(); + let mut write_client = fixture.write_client(); + + let db_name = rand_name(); + management_client + .create_database(DatabaseRules { + name: db_name.clone(), + lifecycle_rules: Some(LifecycleRules { + mutable_linger_seconds: 1, + ..Default::default() + }), + ..Default::default() + }) + .await + .unwrap(); + + let lp_lines = vec!["cpu,region=west user=23.2 100"]; + + write_client + .write(&db_name, lp_lines.join("\n")) + .await + .expect("write succeded"); + + let chunks = management_client + .list_chunks(&db_name) + .await + .expect("listing chunks"); + + assert_eq!(chunks.len(), 1); + assert_eq!(chunks[0].storage, ChunkStorage::OpenMutableBuffer as i32); + + tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; + + let chunks = management_client + .list_chunks(&db_name) + .await + .expect("listing chunks"); + + assert_eq!(chunks.len(), 1); + assert_eq!(chunks[0].storage, ChunkStorage::ReadBuffer as i32); +} + /// Normalizes a set of Chunks for comparison by removing timestamps fn normalize_chunks(chunks: Vec) -> Vec { chunks