feat: change background worker to use backoff instead of poll (#1339) (#1347)

* feat: change background worker to use backoff instead (#1339)

* chore: fix comment

* chore: fix tests

* chore: review comments

* chore: review feedback
pull/24376/head
Raphael Taylor-Davies 2021-04-29 12:10:51 +01:00 committed by GitHub
parent 74d35ce9a4
commit 262bf446ab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 215 additions and 130 deletions

View File

@ -216,9 +216,9 @@ pub struct LifecycleRules {
/// Do not allow writing new data to this database
pub immutable: bool,
/// The background worker will evaluate whether there is work to do
/// at every `period` milliseconds.
pub background_worker_period_millis: Option<NonZeroU64>,
/// If the background worker doesn't find anything to do it
/// will sleep for this many milliseconds before looking again
pub worker_backoff_millis: Option<NonZeroU64>,
}
impl From<LifecycleRules> for management::LifecycleRules {
@ -248,9 +248,7 @@ impl From<LifecycleRules> for management::LifecycleRules {
drop_non_persisted: config.drop_non_persisted,
persist: config.persist,
immutable: config.immutable,
background_worker_period_millis: config
.background_worker_period_millis
.map_or(0, NonZeroU64::get),
worker_backoff_millis: config.worker_backoff_millis.map_or(0, NonZeroU64::get),
}
}
}
@ -269,7 +267,7 @@ impl TryFrom<management::LifecycleRules> for LifecycleRules {
drop_non_persisted: proto.drop_non_persisted,
persist: proto.persist,
immutable: proto.immutable,
background_worker_period_millis: NonZeroU64::new(proto.background_worker_period_millis),
worker_backoff_millis: NonZeroU64::new(proto.worker_backoff_millis),
})
}
}
@ -1387,7 +1385,7 @@ mod tests {
drop_non_persisted: true,
persist: true,
immutable: true,
background_worker_period_millis: 1000,
worker_backoff_millis: 1000,
};
let config: LifecycleRules = protobuf.clone().try_into().unwrap();
@ -1427,25 +1425,19 @@ mod tests {
assert_eq!(back.buffer_size_hard, protobuf.buffer_size_hard);
assert_eq!(back.drop_non_persisted, protobuf.drop_non_persisted);
assert_eq!(back.immutable, protobuf.immutable);
assert_eq!(
back.background_worker_period_millis,
protobuf.background_worker_period_millis
);
assert_eq!(back.worker_backoff_millis, protobuf.worker_backoff_millis);
}
#[test]
fn default_background_worker_period_millis() {
fn default_background_worker_backoff_millis() {
let protobuf = management::LifecycleRules {
background_worker_period_millis: 0,
worker_backoff_millis: 0,
..Default::default()
};
let config: LifecycleRules = protobuf.clone().try_into().unwrap();
let back: management::LifecycleRules = config.into();
assert_eq!(
back.background_worker_period_millis,
protobuf.background_worker_period_millis
);
assert_eq!(back.worker_backoff_millis, protobuf.worker_backoff_millis);
}
#[test]

View File

@ -156,12 +156,12 @@ message LifecycleRules {
// Do not allow writing new data to this database
bool immutable = 8;
/// The background worker will evaluate whether there is work to do
/// at every `period` milliseconds.
///
/// If 0, the default period is used
// (See data_types::database_rules::LifecycleRules::DEFAULT_BACKGROUND_WORKER_PERIOD_MILLIS)
uint64 background_worker_period_millis = 10;
// If the background worker doesn't find any work to do it will
// sleep for this many milliseconds before looking again
//
// If 0, the default backoff is used
// See server::db::lifecycle::DEFAULT_LIFECYCLE_BACKOFF
uint64 worker_backoff_millis = 10;
}
message DatabaseRules {

View File

@ -15,7 +15,6 @@ use catalog::{
Catalog,
};
pub(crate) use chunk::DbChunk;
use core::num::NonZeroU64;
use data_types::{
chunk::ChunkSummary,
database_rules::DatabaseRules,
@ -200,7 +199,6 @@ pub enum Error {
pub type Result<T, E = Error> = std::result::Result<T, E>;
const STARTING_SEQUENCE: u64 = 1;
const DEFAULT_BACKGROUND_WORKER_PERIOD_MILLIS: u64 = 1000;
/// This is the main IOx Database object. It is the root object of any
/// specific InfluxDB IOx instance
@ -836,39 +834,12 @@ impl Db {
) {
info!("started background worker");
fn make_interval(millis: Option<NonZeroU64>) -> tokio::time::Interval {
tokio::time::interval(tokio::time::Duration::from_millis(
millis.map_or(DEFAULT_BACKGROUND_WORKER_PERIOD_MILLIS, NonZeroU64::get),
))
}
let mut period_millis = {
self.rules
.read()
.lifecycle_rules
.background_worker_period_millis
};
let mut interval = make_interval(period_millis);
let mut lifecycle_manager = LifecycleManager::new(Arc::clone(&self));
while !shutdown.is_cancelled() {
self.worker_iterations.fetch_add(1, Ordering::Relaxed);
lifecycle_manager.check_for_work();
let possibly_updated_period_millis = {
self.rules
.read()
.lifecycle_rules
.background_worker_period_millis
};
if period_millis != possibly_updated_period_millis {
period_millis = possibly_updated_period_millis;
interval = make_interval(period_millis);
}
tokio::select! {
_ = interval.tick() => {},
_ = lifecycle_manager.check_for_work() => {},
_ = shutdown.cancelled() => break
}
}

View File

@ -14,6 +14,10 @@ use super::{
Db,
};
use data_types::database_rules::SortOrder;
use futures::future::BoxFuture;
use std::time::Duration;
pub const DEFAULT_LIFECYCLE_BACKOFF: Duration = Duration::from_secs(1);
/// Handles the lifecycle of chunks within a Db
pub struct LifecycleManager {
@ -40,7 +44,9 @@ impl LifecycleManager {
///
/// Should be called periodically and should spawn any long-running
/// work onto the tokio threadpool and return
pub fn check_for_work(&mut self) {
///
/// Returns a future that resolves when this method should be called next
pub fn check_for_work(&mut self) -> BoxFuture<'static, ()> {
ChunkMover::check_for_work(self, Utc::now())
}
}
@ -49,6 +55,8 @@ impl LifecycleManager {
///
/// This is to enable independent testing of the policy logic
trait ChunkMover {
type Job: Send + Sync + 'static;
/// Returns the size of a chunk - overridden for testing
fn chunk_size(chunk: &Chunk) -> usize {
chunk.size()
@ -64,23 +72,35 @@ trait ChunkMover {
/// they should prioritised
fn chunks(&self, order: &SortOrder) -> Vec<Arc<RwLock<Chunk>>>;
/// Returns a boolean indicating if a move is in progress
fn is_move_active(&self) -> bool;
/// Returns a tracker for the running move task if any
fn move_tracker(&self) -> Option<&TaskTracker<Self::Job>>;
/// Returns a boolean indicating if a write is in progress
fn is_write_active(&self) -> bool;
/// Returns a tracker for the running write task if any
fn write_tracker(&self) -> Option<&TaskTracker<Self::Job>>;
/// Starts an operation to move a chunk to the read buffer
fn move_to_read_buffer(&mut self, partition_key: String, table_name: String, chunk_id: u32);
fn move_to_read_buffer(
&mut self,
partition_key: String,
table_name: String,
chunk_id: u32,
) -> TaskTracker<Self::Job>;
/// Starts an operation to write a chunk to the object store
fn write_to_object_store(&mut self, partition_key: String, table_name: String, chunk_id: u32);
fn write_to_object_store(
&mut self,
partition_key: String,
table_name: String,
chunk_id: u32,
) -> TaskTracker<Self::Job>;
/// Drops a chunk from the database
fn drop_chunk(&mut self, partition_key: String, table_name: String, chunk_id: u32);
/// The core policy logic
fn check_for_work(&mut self, now: DateTime<Utc>) {
///
/// Returns a future that resolves when this method should be called next
fn check_for_work(&mut self, now: DateTime<Utc>) -> BoxFuture<'static, ()> {
let rules = self.rules();
let chunks = self.chunks(&rules.sort_order);
@ -88,25 +108,31 @@ trait ChunkMover {
// Only want to start a new move/write task if there isn't one already in-flight
//
// Note: This does not take into account manually triggered tasks
let mut move_active = self.is_move_active();
let mut write_active = self.is_write_active();
// Fetch the trackers for tasks created by previous loop iterations. If the trackers exist
// and haven't completed, we don't want to create a new task of that type as there
// is still one in-flight.
//
// This is a very weak heuristic for preventing background work from starving query
// workload - i.e. only use one thread for each type of background task.
//
// We may want to revisit this in future
let mut move_tracker = self.move_tracker().filter(|x| !x.is_complete()).cloned();
let mut write_tracker = self.write_tracker().filter(|x| !x.is_complete()).cloned();
// Iterate through the chunks to determine
// - total memory consumption
// - any chunks to move
// TODO: Track size globally to avoid iterating through all chunks (#1100)
for chunk in &chunks {
let chunk_guard = chunk.upgradable_read();
buffer_size += Self::chunk_size(&*chunk_guard);
let would_move = can_move(&rules, &*chunk_guard, now);
let would_write = !write_active && rules.persist;
let would_write = write_tracker.is_none() && rules.persist;
match chunk_guard.state() {
ChunkState::Open(_) if !move_active && would_move => {
ChunkState::Open(_) if move_tracker.is_none() && would_move => {
let mut chunk_guard = RwLockUpgradableReadGuard::upgrade(chunk_guard);
chunk_guard.set_closing().expect("cannot close open chunk");
@ -116,18 +142,18 @@ trait ChunkMover {
std::mem::drop(chunk_guard);
move_active = true;
self.move_to_read_buffer(partition_key, table_name, chunk_id);
move_tracker =
Some(self.move_to_read_buffer(partition_key, table_name, chunk_id));
}
ChunkState::Closing(_) if !move_active => {
ChunkState::Closing(_) if move_tracker.is_none() => {
let partition_key = chunk_guard.key().to_string();
let table_name = chunk_guard.table_name().to_string();
let chunk_id = chunk_guard.id();
std::mem::drop(chunk_guard);
move_active = true;
self.move_to_read_buffer(partition_key, table_name, chunk_id);
move_tracker =
Some(self.move_to_read_buffer(partition_key, table_name, chunk_id));
}
ChunkState::Moved(_) if would_write => {
let partition_key = chunk_guard.key().to_string();
@ -136,8 +162,8 @@ trait ChunkMover {
std::mem::drop(chunk_guard);
write_active = true;
self.write_to_object_store(partition_key, table_name, chunk_id);
write_tracker =
Some(self.write_to_object_store(partition_key, table_name, chunk_id));
}
_ => {}
}
@ -175,10 +201,53 @@ trait ChunkMover {
}
}
}
Box::pin(async move {
let backoff = rules
.worker_backoff_millis
.map(|x| Duration::from_millis(x.get()))
.unwrap_or(DEFAULT_LIFECYCLE_BACKOFF);
// `check_for_work` should be called again if either of the tasks completes
// or the backoff expires.
//
// This formulation ensures that the loop will run again in backoff
// number of milliseconds regardless of if any tasks have finished
//
// Consider the situation where we have an in-progress write but no
// in-progress move. We will look again for new tasks as soon
// as either the backoff expires or the write task finishes
//
// Similarly if there are no in-progress tasks, we will look again
// after the backoff interval
//
// Finally if all tasks are running we still want to be invoked
// after the backoff interval, even if no tasks have completed by then,
// as we may need to drop chunks
tokio::select! {
_ = tokio::time::sleep(backoff) => {}
_ = wait_optional_tracker(move_tracker) => {}
_ = wait_optional_tracker(write_tracker) => {}
};
})
}
}
/// Completes when the provided tracker completes or never if None provided
async fn wait_optional_tracker<Job: Send + Sync + 'static>(tracker: Option<TaskTracker<Job>>) {
match tracker {
None => futures::future::pending().await,
Some(move_tracker) => move_tracker.join().await,
}
}
impl ChunkMover for LifecycleManager {
type Job = Job;
fn db_name(&self) -> &str {
&self.db_name
}
fn rules(&self) -> LifecycleRules {
self.db.rules.read().lifecycle_rules.clone()
}
@ -187,36 +256,40 @@ impl ChunkMover for LifecycleManager {
self.db.catalog.chunks_sorted_by(sort_order)
}
fn is_move_active(&self) -> bool {
self.move_task
.as_ref()
.map(|x| !x.is_complete())
.unwrap_or(false)
fn move_tracker(&self) -> Option<&TaskTracker<Job>> {
self.move_task.as_ref()
}
fn is_write_active(&self) -> bool {
self.write_task
.as_ref()
.map(|x| !x.is_complete())
.unwrap_or(false)
fn write_tracker(&self) -> Option<&TaskTracker<Job>> {
self.write_task.as_ref()
}
fn move_to_read_buffer(&mut self, partition_key: String, table_name: String, chunk_id: u32) {
fn move_to_read_buffer(
&mut self,
partition_key: String,
table_name: String,
chunk_id: u32,
) -> TaskTracker<Self::Job> {
info!(%partition_key, %chunk_id, "moving chunk to read buffer");
self.move_task = Some(self.db.load_chunk_to_read_buffer_in_background(
partition_key,
table_name,
chunk_id,
))
let tracker =
self.db
.load_chunk_to_read_buffer_in_background(partition_key, table_name, chunk_id);
self.move_task = Some(tracker.clone());
tracker
}
fn write_to_object_store(&mut self, partition_key: String, table_name: String, chunk_id: u32) {
fn write_to_object_store(
&mut self,
partition_key: String,
table_name: String,
chunk_id: u32,
) -> TaskTracker<Self::Job> {
info!(%partition_key, %chunk_id, "write chunk to object store");
self.write_task = Some(self.db.write_chunk_to_object_store_in_background(
partition_key,
table_name,
chunk_id,
))
let tracker =
self.db
.write_chunk_to_object_store_in_background(partition_key, table_name, chunk_id);
self.write_task = Some(tracker.clone());
tracker
}
fn drop_chunk(&mut self, partition_key: String, table_name: String, chunk_id: u32) {
@ -226,10 +299,6 @@ impl ChunkMover for LifecycleManager {
.drop_chunk(&partition_key, &table_name, chunk_id)
.log_if_error("dropping chunk to free up memory");
}
fn db_name(&self) -> &str {
&self.db_name
}
}
/// Returns the number of seconds between two times
@ -279,7 +348,7 @@ mod tests {
convert::TryFrom,
num::{NonZeroU32, NonZeroUsize},
};
use tracker::MemRegistry;
use tracker::{MemRegistry, TaskRegistry};
fn from_secs(secs: i64) -> DateTime<Utc> {
DateTime::from_utc(chrono::NaiveDateTime::from_timestamp(secs, 0), Utc)
@ -367,8 +436,8 @@ mod tests {
/// logic within ChunkMover::poll
struct DummyMover {
rules: LifecycleRules,
move_active: bool,
write_active: bool,
move_tracker: Option<TaskTracker<()>>,
write_tracker: Option<TaskTracker<()>>,
chunks: Vec<Arc<RwLock<Chunk>>>,
events: Vec<MoverEvents>,
}
@ -381,19 +450,25 @@ mod tests {
.into_iter()
.map(|x| Arc::new(RwLock::new(x)))
.collect(),
move_active: false,
write_active: false,
move_tracker: None,
write_tracker: None,
events: vec![],
}
}
}
impl ChunkMover for DummyMover {
type Job = ();
fn chunk_size(_: &Chunk) -> usize {
// All chunks are 20 bytes
20
}
fn db_name(&self) -> &str {
"my_awesome_db"
}
fn rules(&self) -> LifecycleRules {
self.rules.clone()
}
@ -402,12 +477,12 @@ mod tests {
self.chunks.clone()
}
fn is_move_active(&self) -> bool {
self.move_active
fn move_tracker(&self) -> Option<&TaskTracker<Self::Job>> {
self.move_tracker.as_ref()
}
fn is_write_active(&self) -> bool {
self.write_active
fn write_tracker(&self) -> Option<&TaskTracker<Self::Job>> {
self.write_tracker.as_ref()
}
fn move_to_read_buffer(
@ -415,14 +490,17 @@ mod tests {
_partition_key: String,
_table_name: String,
chunk_id: u32,
) {
) -> TaskTracker<Self::Job> {
let chunk = self
.chunks
.iter()
.find(|x| x.read().id() == chunk_id)
.unwrap();
chunk.write().set_moving().unwrap();
self.events.push(MoverEvents::Move(chunk_id))
self.events.push(MoverEvents::Move(chunk_id));
let tracker = TaskTracker::complete(());
self.move_tracker = Some(tracker.clone());
tracker
}
fn write_to_object_store(
@ -430,14 +508,17 @@ mod tests {
_partition_key: String,
_table_name: String,
chunk_id: u32,
) {
) -> TaskTracker<Self::Job> {
let chunk = self
.chunks
.iter()
.find(|x| x.read().id() == chunk_id)
.unwrap();
chunk.write().set_writing_to_object_store().unwrap();
self.events.push(MoverEvents::Write(chunk_id))
self.events.push(MoverEvents::Write(chunk_id));
let tracker = TaskTracker::complete(());
self.write_tracker = Some(tracker.clone());
tracker
}
fn drop_chunk(&mut self, _partition_key: String, _table_name: String, chunk_id: u32) {
@ -448,10 +529,6 @@ mod tests {
.collect();
self.events.push(MoverEvents::Drop(chunk_id))
}
fn db_name(&self) -> &str {
"my_awesome_db"
}
}
#[test]
@ -552,6 +629,7 @@ mod tests {
#[test]
fn test_in_progress() {
let mut registry = TaskRegistry::new();
let rules = LifecycleRules {
mutable_linger_seconds: Some(NonZeroU32::new(10).unwrap()),
..Default::default()
@ -559,19 +637,48 @@ mod tests {
let chunks = vec![new_chunk(0, Some(0), Some(0))];
let mut mover = DummyMover::new(rules, chunks);
mover.move_active = true;
let (tracker, registration) = registry.register(());
mover.move_tracker = Some(tracker);
mover.check_for_work(from_secs(80));
assert_eq!(mover.events, vec![]);
mover.move_active = false;
std::mem::drop(registration);
mover.check_for_work(from_secs(80));
assert_eq!(mover.events, vec![MoverEvents::Move(0)]);
}
#[tokio::test]
async fn test_backoff() {
let mut registry = TaskRegistry::new();
let rules = LifecycleRules {
mutable_linger_seconds: Some(NonZeroU32::new(100).unwrap()),
..Default::default()
};
let mut mover = DummyMover::new(rules, vec![]);
let (tracker, registration) = registry.register(());
// Manually set the move_tracker on the DummyMover as if a previous invocation
// of check_for_work had started a background move task
mover.move_tracker = Some(tracker);
let future = mover.check_for_work(from_secs(0));
tokio::time::timeout(Duration::from_millis(1), future)
.await
.expect_err("expected timeout");
let future = mover.check_for_work(from_secs(0));
std::mem::drop(registration);
tokio::time::timeout(Duration::from_millis(1), future)
.await
.expect("expect early return due to task completion");
}
#[test]
fn test_minimum_age() {
let rules = LifecycleRules {

View File

@ -180,7 +180,7 @@ pub async fn command(url: String, config: Config) -> Result<()> {
drop_non_persisted: command.drop_non_persisted,
persist: command.persist,
immutable: command.immutable,
background_worker_period_millis: Default::default(),
worker_backoff_millis: Default::default(),
}),
// Default to hourly partitions

View File

@ -199,6 +199,21 @@ impl<T> Clone for TaskTracker<T> {
}
impl<T> TaskTracker<T> {
/// Creates a new task tracker from the provided registration
pub fn new(id: TaskId, registration: &TaskRegistration, metadata: T) -> Self {
Self {
id,
metadata: Arc::new(metadata),
state: Arc::clone(&registration.state),
}
}
/// Returns a complete task tracker
pub fn complete(metadata: T) -> Self {
let registration = TaskRegistration::new();
Self::new(TaskId(0), &registration, metadata)
}
/// Returns the ID of the Tracker - these are unique per TrackerRegistry
pub fn id(&self) -> TaskId {
self.id
@ -304,8 +319,8 @@ impl Clone for TaskRegistration {
}
}
impl TaskRegistration {
fn new() -> Self {
impl Default for TaskRegistration {
fn default() -> Self {
let state = Arc::new(TrackerState {
start_instant: Instant::now(),
cpu_nanos: AtomicUsize::new(0),
@ -321,6 +336,12 @@ impl TaskRegistration {
}
}
impl TaskRegistration {
pub fn new() -> Self {
Self::default()
}
}
impl Drop for TaskRegistration {
fn drop(&mut self) {
// This synchronizes with the Acquire load in Tracker::get_status

View File

@ -1,7 +1,6 @@
use super::{TaskRegistration, TaskTracker};
use hashbrown::HashMap;
use std::str::FromStr;
use std::sync::Arc;
/// Every future registered with a `TaskRegistry` is assigned a unique
/// `TaskId`
@ -52,12 +51,7 @@ impl<T> TaskRegistry<T> {
self.next_id += 1;
let registration = TaskRegistration::new();
let tracker = TaskTracker {
id,
metadata: Arc::new(metadata),
state: Arc::clone(&registration.state),
};
let tracker = TaskTracker::new(id, &registration, metadata);
self.tasks.insert(id, tracker.clone());