refactor: add TimeProvider abstraction (#2722) (#2815)

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Raphael Taylor-Davies 2021-10-12 22:19:03 +01:00 committed by GitHub
parent 035654b4f9
commit 8a82f92c5d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 328 additions and 269 deletions

9
Cargo.lock generated
View File

@ -1258,7 +1258,6 @@ name = "generated_types"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"bytes", "bytes",
"chrono",
"data_types", "data_types",
"num_cpus", "num_cpus",
"observability_deps", "observability_deps",
@ -1799,7 +1798,6 @@ checksum = "90c11140ffea82edce8dcd74137ce9324ec24b3cf0175fc9d7e29164da9915b8"
name = "internal_types" name = "internal_types"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"chrono",
"futures", "futures",
"parking_lot", "parking_lot",
"snafu", "snafu",
@ -1971,12 +1969,12 @@ dependencies = [
name = "lifecycle" name = "lifecycle"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"chrono",
"data_types", "data_types",
"futures", "futures",
"hashbrown 0.11.2", "hashbrown 0.11.2",
"internal_types", "internal_types",
"observability_deps", "observability_deps",
"parking_lot",
"time 0.1.0", "time 0.1.0",
"tokio", "tokio",
"tracker", "tracker",
@ -2206,7 +2204,6 @@ dependencies = [
"arrow", "arrow",
"arrow_util", "arrow_util",
"async-trait", "async-trait",
"chrono",
"data_types", "data_types",
"entry", "entry",
"hashbrown 0.11.2", "hashbrown 0.11.2",
@ -3353,7 +3350,6 @@ version = "0.1.0"
dependencies = [ dependencies = [
"arrow", "arrow",
"arrow_util", "arrow_util",
"chrono",
"criterion", "criterion",
"croaring", "croaring",
"data_types", "data_types",
@ -4644,7 +4640,6 @@ dependencies = [
name = "tracker" name = "tracker"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"chrono",
"futures", "futures",
"hashbrown 0.11.2", "hashbrown 0.11.2",
"lock_api", "lock_api",
@ -4652,6 +4647,7 @@ dependencies = [
"observability_deps", "observability_deps",
"parking_lot", "parking_lot",
"pin-project", "pin-project",
"time 0.1.0",
"tokio", "tokio",
"tokio-util", "tokio-util",
] ]
@ -4982,7 +4978,6 @@ name = "write_buffer"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"async-trait", "async-trait",
"chrono",
"data_types", "data_types",
"dotenv", "dotenv",
"entry", "entry",

View File

@ -6,7 +6,7 @@ edition = "2018"
description = "The entry format used by the write buffer" description = "The entry format used by the write buffer"
[dependencies] [dependencies]
chrono = { version = "0.4", features = ["serde"] } chrono = "0.4"
data_types = { path = "../data_types" } data_types = { path = "../data_types" }
# See docs/regenerating_flatbuffers.md about updating generated code when updating the # See docs/regenerating_flatbuffers.md about updating generated code when updating the
# version of the flatbuffers crate # version of the flatbuffers crate

View File

@ -18,7 +18,6 @@ tonic = "0.5"
time = { path = "../time" } time = { path = "../time" }
[dev-dependencies] [dev-dependencies]
chrono = { version = "0.4", features = ["serde"] }
num_cpus = "1.13.0" num_cpus = "1.13.0"
[build-dependencies] # In alphabetical order [build-dependencies] # In alphabetical order

View File

@ -7,7 +7,6 @@ description = "InfluxDB IOx internal types, shared between IOx instances"
readme = "README.md" readme = "README.md"
[dependencies] [dependencies]
chrono = "0.4"
parking_lot = "0.11" parking_lot = "0.11"
snafu = "0.6" snafu = "0.6"
time = { path = "../time" } time = { path = "../time" }

View File

@ -6,12 +6,12 @@ edition = "2018"
description = "Implements the IOx data lifecycle" description = "Implements the IOx data lifecycle"
[dependencies] [dependencies]
chrono = "0.4"
data_types = { path = "../data_types" } data_types = { path = "../data_types" }
futures = "0.3" futures = "0.3"
hashbrown = "0.11" hashbrown = "0.11"
internal_types = { path = "../internal_types" } internal_types = { path = "../internal_types" }
observability_deps = { path = "../observability_deps" } observability_deps = { path = "../observability_deps" }
parking_lot = "0.11"
time = { path = "../time" } time = { path = "../time" }
tokio = { version = "1.11", features = ["macros", "time"] } tokio = { version = "1.11", features = ["macros", "time"] }
tracker = { path = "../tracker" } tracker = { path = "../tracker" }

View File

@ -8,20 +8,20 @@
clippy::clone_on_ref_ptr clippy::clone_on_ref_ptr
)] )]
use chrono::{DateTime, Utc};
use data_types::{ use data_types::{
chunk_metadata::{ChunkAddr, ChunkId, ChunkLifecycleAction, ChunkOrder, ChunkStorage}, chunk_metadata::{ChunkAddr, ChunkId, ChunkLifecycleAction, ChunkOrder, ChunkStorage},
database_rules::LifecycleRules, database_rules::LifecycleRules,
DatabaseName, DatabaseName,
}; };
use internal_types::access::AccessMetrics; use internal_types::access::AccessMetrics;
use std::sync::Arc;
use tracker::TaskTracker; use tracker::TaskTracker;
mod guard; mod guard;
pub use guard::*; pub use guard::*;
mod policy; mod policy;
pub use policy::*; pub use policy::*;
use time::Time; use time::{Time, TimeProvider};
/// A trait that encapsulates the database logic that is automated by `LifecyclePolicy` /// A trait that encapsulates the database logic that is automated by `LifecyclePolicy`
pub trait LifecycleDb { pub trait LifecycleDb {
@ -40,6 +40,9 @@ pub trait LifecycleDb {
/// Return the database name. /// Return the database name.
fn name(&self) -> DatabaseName<'static>; fn name(&self) -> DatabaseName<'static>;
/// Return the time provider for this database
fn time_provider(&self) -> Arc<dyn TimeProvider>;
} }
/// A `LockablePartition` is a wrapper around a `LifecyclePartition` that allows /// A `LockablePartition` is a wrapper around a `LifecyclePartition` that allows
@ -170,7 +173,7 @@ pub trait LifecycleChunk {
fn clear_lifecycle_action(&mut self); fn clear_lifecycle_action(&mut self);
/// Returns the min timestamp contained within this chunk /// Returns the min timestamp contained within this chunk
fn min_timestamp(&self) -> DateTime<Utc>; fn min_timestamp(&self) -> Time;
/// Returns the access metrics for this chunk /// Returns the access metrics for this chunk
fn access_metrics(&self) -> AccessMetrics; fn access_metrics(&self) -> AccessMetrics;

View File

@ -2,7 +2,6 @@ use crate::{
LifecycleChunk, LifecycleDb, LifecyclePartition, LifecycleWriteGuard, LockableChunk, LifecycleChunk, LifecycleDb, LifecyclePartition, LifecycleWriteGuard, LockableChunk,
LockablePartition, PersistHandle, LockablePartition, PersistHandle,
}; };
use chrono::{DateTime, Utc};
use data_types::{ use data_types::{
chunk_metadata::{ChunkId, ChunkLifecycleAction, ChunkStorage}, chunk_metadata::{ChunkId, ChunkLifecycleAction, ChunkStorage},
database_rules::LifecycleRules, database_rules::LifecycleRules,
@ -11,12 +10,12 @@ use data_types::{
use futures::future::BoxFuture; use futures::future::BoxFuture;
use internal_types::access::AccessMetrics; use internal_types::access::AccessMetrics;
use observability_deps::tracing::{debug, info, trace, warn}; use observability_deps::tracing::{debug, info, trace, warn};
use std::{convert::TryInto, fmt::Debug}; use std::{fmt::Debug, time::Duration};
use time::Time; use time::Time;
use tracker::TaskTracker; use tracker::TaskTracker;
/// Number of seconds to wait before retrying a failed lifecycle action /// Number of seconds to wait before retrying a failed lifecycle action
pub const LIFECYCLE_ACTION_BACKOFF_SECONDS: i64 = 10; pub const LIFECYCLE_ACTION_BACKOFF: Duration = Duration::from_secs(10);
/// A `LifecyclePolicy` is created with a `LifecycleDb` /// A `LifecyclePolicy` is created with a `LifecycleDb`
/// ///
@ -215,7 +214,7 @@ where
&mut self, &mut self,
partition: &P, partition: &P,
rules: &LifecycleRules, rules: &LifecycleRules,
now: DateTime<Utc>, now: Time,
) { ) {
let mut rows_left = rules.persist_row_threshold.get(); let mut rows_left = rules.persist_row_threshold.get();
@ -338,7 +337,7 @@ where
db_name: &DatabaseName<'static>, db_name: &DatabaseName<'static>,
partition: &P, partition: &P,
rules: &LifecycleRules, rules: &LifecycleRules,
now: DateTime<Utc>, now: Time,
) -> bool { ) -> bool {
// TODO: Encapsulate locking into a CatalogTransaction type // TODO: Encapsulate locking into a CatalogTransaction type
let partition = partition.read(); let partition = partition.read();
@ -348,13 +347,13 @@ where
return false; return false;
} }
let persistable_age_seconds: u32 = partition let persistable_age_seconds = partition
.minimum_unpersisted_age() .minimum_unpersisted_age()
.and_then(|minimum_unpersisted_age| { .and_then(|minimum_unpersisted_age| {
(now - minimum_unpersisted_age.date_time()) Some(
.num_seconds() now.checked_duration_since(minimum_unpersisted_age)?
.try_into() .as_secs(),
.ok() )
}) })
.unwrap_or_default(); .unwrap_or_default();
@ -368,7 +367,7 @@ where
if persistable_row_count >= rules.persist_row_threshold.get() { if persistable_row_count >= rules.persist_row_threshold.get() {
info!(%db_name, %partition, persistable_row_count, "persisting partition as exceeds row threshold"); info!(%db_name, %partition, persistable_row_count, "persisting partition as exceeds row threshold");
} else if persistable_age_seconds >= rules.persist_age_threshold_seconds.get() { } else if persistable_age_seconds >= rules.persist_age_threshold_seconds.get() as u64 {
info!(%db_name, %partition, persistable_age_seconds, "persisting partition as exceeds age threshold"); info!(%db_name, %partition, persistable_age_seconds, "persisting partition as exceeds age threshold");
} else { } else {
debug!(%db_name, %partition, persistable_row_count, "partition not eligible for persist"); debug!(%db_name, %partition, persistable_row_count, "partition not eligible for persist");
@ -417,7 +416,7 @@ where
&mut self, &mut self,
db_name: &DatabaseName<'static>, db_name: &DatabaseName<'static>,
partition: &P, partition: &P,
now: DateTime<Utc>, now: Time,
) { ) {
let partition = partition.read(); let partition = partition.read();
for chunk in LockablePartition::chunks(&partition) { for chunk in LockablePartition::chunks(&partition) {
@ -425,9 +424,9 @@ where
if let Some(lifecycle_action) = chunk.lifecycle_action() { if let Some(lifecycle_action) = chunk.lifecycle_action() {
if lifecycle_action.is_complete() if lifecycle_action.is_complete()
&& now && now
.signed_duration_since(lifecycle_action.start_time()) .checked_duration_since(lifecycle_action.start_time())
.num_seconds() .unwrap_or_default()
>= LIFECYCLE_ACTION_BACKOFF_SECONDS >= LIFECYCLE_ACTION_BACKOFF
{ {
info!(%db_name, chunk=%chunk.addr(), action=?lifecycle_action.metadata(), "clearing failed lifecycle action"); info!(%db_name, chunk=%chunk.addr(), action=?lifecycle_action.metadata(), "clearing failed lifecycle action");
chunk.upgrade().clear_lifecycle_action(); chunk.upgrade().clear_lifecycle_action();
@ -439,12 +438,13 @@ where
/// The core policy logic /// The core policy logic
/// ///
/// Returns a future that resolves when this method should be called next /// Returns a future that resolves when this method should be called next
pub fn check_for_work(&mut self, now: DateTime<Utc>) -> BoxFuture<'static, ()> { pub fn check_for_work(&mut self) -> BoxFuture<'static, ()> {
// Any time-consuming work should be spawned as tokio tasks and not // Any time-consuming work should be spawned as tokio tasks and not
// run directly within this loop // run directly within this loop
// TODO: Add loop iteration count and duration metrics // TODO: Add loop iteration count and duration metrics
let now = self.db.time_provider().now();
let db_name = self.db.name(); let db_name = self.db.name();
let rules = self.db.rules(); let rules = self.db.rules();
let partitions = self.db.partitions(); let partitions = self.db.partitions();
@ -561,29 +561,20 @@ where
} }
} }
/// Returns the number of seconds between two times
///
/// Computes a - b
#[inline]
fn elapsed_seconds(a: DateTime<Utc>, b: DateTime<Utc>) -> u32 {
let seconds = (a - b).num_seconds();
if seconds < 0 {
0 // This can occur as DateTime is not monotonic
} else {
seconds.try_into().unwrap_or(u32::max_value())
}
}
/// Returns if the chunk is sufficiently cold and old to move /// Returns if the chunk is sufficiently cold and old to move
/// ///
/// Note: Does not check the chunk is the correct state /// Note: Does not check the chunk is the correct state
fn can_move<C: LifecycleChunk>(rules: &LifecycleRules, chunk: &C, now: DateTime<Utc>) -> bool { fn can_move<C: LifecycleChunk>(rules: &LifecycleRules, chunk: &C, now: Time) -> bool {
if chunk.row_count() >= rules.mub_row_threshold.get() { if chunk.row_count() >= rules.mub_row_threshold.get() {
return true; return true;
} }
elapsed_seconds(now, chunk.time_of_last_write().date_time()) let elapsed = now
>= rules.late_arrive_window_seconds.get() .checked_duration_since(chunk.time_of_last_write())
.unwrap_or_default()
.as_secs();
elapsed >= rules.late_arrive_window_seconds.get() as u64
} }
/// An action to free up memory /// An action to free up memory
@ -657,7 +648,7 @@ where
// Chunk's data is entirely after the time we are flushing // Chunk's data is entirely after the time we are flushing
// up to, and thus there is reason to include it in the // up to, and thus there is reason to include it in the
// plan // plan
if chunk.min_timestamp() > flush_ts.date_time() { if chunk.min_timestamp() > flush_ts {
// Ignore chunk for now, but we might need it later to close chunk order gaps // Ignore chunk for now, but we might need it later to close chunk order gaps
debug!( debug!(
chunk=%chunk.addr(), chunk=%chunk.addr(),
@ -702,9 +693,9 @@ mod tests {
ChunkLifecycleAction, LifecycleReadGuard, LifecycleWriteGuard, LockableChunk, ChunkLifecycleAction, LifecycleReadGuard, LifecycleWriteGuard, LockableChunk,
LockablePartition, PersistHandle, LockablePartition, PersistHandle,
}; };
use chrono::TimeZone;
use data_types::chunk_metadata::{ChunkAddr, ChunkId, ChunkOrder, ChunkStorage}; use data_types::chunk_metadata::{ChunkAddr, ChunkId, ChunkOrder, ChunkStorage};
use data_types::database_rules::MaxActiveCompactions::MaxActiveCompactions; use data_types::database_rules::MaxActiveCompactions::MaxActiveCompactions;
use parking_lot::Mutex;
use std::time::Duration; use std::time::Duration;
use std::{ use std::{
cmp::max, cmp::max,
@ -713,7 +704,8 @@ mod tests {
num::{NonZeroU32, NonZeroUsize}, num::{NonZeroU32, NonZeroUsize},
sync::Arc, sync::Arc,
}; };
use tracker::{RwLock, TaskId, TaskRegistration, TaskRegistry}; use time::{MockProvider, TimeProvider};
use tracker::{RwLock, TaskRegistry};
#[derive(Debug, Eq, PartialEq)] #[derive(Debug, Eq, PartialEq)]
enum MoverEvents { enum MoverEvents {
@ -753,7 +745,7 @@ mod tests {
struct TestChunk { struct TestChunk {
addr: ChunkAddr, addr: ChunkAddr,
row_count: usize, row_count: usize,
min_timestamp: Option<DateTime<Utc>>, min_timestamp: Option<Time>,
access_metrics: AccessMetrics, access_metrics: AccessMetrics,
time_of_last_write: Time, time_of_last_write: Time,
lifecycle_action: Option<TaskTracker<ChunkLifecycleAction>>, lifecycle_action: Option<TaskTracker<ChunkLifecycleAction>>,
@ -790,12 +782,12 @@ mod tests {
self self
} }
fn with_action(mut self, action: ChunkLifecycleAction) -> Self { fn with_action(mut self, action: TaskTracker<ChunkLifecycleAction>) -> Self {
self.lifecycle_action = Some(TaskTracker::complete(action)); self.lifecycle_action = Some(action);
self self
} }
fn with_min_timestamp(mut self, min_timestamp: DateTime<Utc>) -> Self { fn with_min_timestamp(mut self, min_timestamp: Time) -> Self {
self.min_timestamp = Some(min_timestamp); self.min_timestamp = Some(min_timestamp);
self self
} }
@ -915,9 +907,11 @@ mod tests {
.insert(id, (order, Arc::new(RwLock::new(new_chunk)))); .insert(id, (order, Arc::new(RwLock::new(new_chunk))));
let event = MoverEvents::Compact(chunks.iter().map(|x| x.addr.chunk_id).collect()); let event = MoverEvents::Compact(chunks.iter().map(|x| x.addr.chunk_id).collect());
partition.data().db.events.write().push(event);
Ok(TaskTracker::complete(())) let db = partition.data().db;
db.events.write().push(event);
Ok(db.registry.lock().complete(()))
} }
fn prepare_persist( fn prepare_persist(
@ -944,17 +938,17 @@ mod tests {
partition.next_id += 1; partition.next_id += 1;
// The remainder left behind after the split // The remainder left behind after the split
let new_chunk = TestChunk::new(id, 0, ChunkStorage::ReadBuffer).with_min_timestamp( let new_chunk = TestChunk::new(id, 0, ChunkStorage::ReadBuffer)
handle.timestamp.date_time() + chrono::Duration::nanoseconds(1), .with_min_timestamp(handle.timestamp + Duration::from_nanos(1));
);
partition partition
.chunks .chunks
.insert(id, (order, Arc::new(RwLock::new(new_chunk)))); .insert(id, (order, Arc::new(RwLock::new(new_chunk))));
let event = MoverEvents::Persist(chunks.iter().map(|x| x.addr.chunk_id).collect()); let event = MoverEvents::Persist(chunks.iter().map(|x| x.addr.chunk_id).collect());
partition.data().db.events.write().push(event); let db = partition.data().db;
Ok(TaskTracker::complete(())) db.events.write().push(event);
Ok(db.registry.lock().complete(()))
} }
fn drop_chunk( fn drop_chunk(
@ -963,13 +957,9 @@ mod tests {
) -> Result<TaskTracker<()>, Self::Error> { ) -> Result<TaskTracker<()>, Self::Error> {
let chunk_id = chunk.addr().chunk_id; let chunk_id = chunk.addr().chunk_id;
partition.chunks.remove(&chunk_id); partition.chunks.remove(&chunk_id);
partition let db = partition.data().db;
.data() db.events.write().push(MoverEvents::Drop(chunk_id));
.db Ok(db.registry.lock().complete(()))
.events
.write()
.push(MoverEvents::Drop(chunk_id));
Ok(TaskTracker::complete(()))
} }
} }
@ -990,11 +980,8 @@ mod tests {
mut s: LifecycleWriteGuard<'_, Self::Chunk, Self>, mut s: LifecycleWriteGuard<'_, Self::Chunk, Self>,
) -> Result<(), Self::Error> { ) -> Result<(), Self::Error> {
s.storage = ChunkStorage::ObjectStoreOnly; s.storage = ChunkStorage::ObjectStoreOnly;
s.data() let db = s.data().db;
.db db.events.write().push(MoverEvents::Unload(s.addr.chunk_id));
.events
.write()
.push(MoverEvents::Unload(s.addr.chunk_id));
Ok(()) Ok(())
} }
@ -1034,7 +1021,7 @@ mod tests {
self.lifecycle_action = None self.lifecycle_action = None
} }
fn min_timestamp(&self) -> DateTime<Utc> { fn min_timestamp(&self) -> Time {
self.min_timestamp.unwrap() self.min_timestamp.unwrap()
} }
@ -1087,16 +1074,16 @@ mod tests {
partitions: RwLock<Vec<Arc<RwLock<TestPartition>>>>, partitions: RwLock<Vec<Arc<RwLock<TestPartition>>>>,
// TODO: Move onto TestPartition // TODO: Move onto TestPartition
events: RwLock<Vec<MoverEvents>>, events: RwLock<Vec<MoverEvents>>,
registry: Arc<Mutex<TaskRegistry<()>>>,
time_provider: Arc<dyn TimeProvider>,
} }
impl TestDb { impl TestDb {
fn new(rules: LifecycleRules, chunks: Vec<TestChunk>) -> Self { fn new(
Self::from_partitions(rules, std::iter::once(TestPartition::new(chunks)))
}
fn from_partitions(
rules: LifecycleRules, rules: LifecycleRules,
partitions: impl IntoIterator<Item = TestPartition>, time_provider: Arc<dyn TimeProvider>,
partitions: Vec<TestPartition>,
tasks: TaskRegistry<()>,
) -> Self { ) -> Self {
Self { Self {
rules, rules,
@ -1107,6 +1094,8 @@ mod tests {
.collect(), .collect(),
), ),
events: RwLock::new(vec![]), events: RwLock::new(vec![]),
registry: Arc::new(Mutex::new(tasks)),
time_provider,
} }
} }
} }
@ -1142,17 +1131,14 @@ mod tests {
fn name(&self) -> DatabaseName<'static> { fn name(&self) -> DatabaseName<'static> {
DatabaseName::new("test_db").unwrap() DatabaseName::new("test_db").unwrap()
} }
fn time_provider(&self) -> Arc<dyn TimeProvider> {
Arc::clone(&self.time_provider)
}
} }
fn from_secs(secs: i64) -> DateTime<Utc> { fn from_secs(secs: i64) -> Time {
Utc.timestamp(secs, 0) Time::from_timestamp(secs, 0)
}
#[test]
fn test_elapsed_seconds() {
assert_eq!(elapsed_seconds(from_secs(10), from_secs(5)), 5);
assert_eq!(elapsed_seconds(from_secs(10), from_secs(10)), 0);
assert_eq!(elapsed_seconds(from_secs(10), from_secs(15)), 0);
} }
#[test] #[test]
@ -1243,6 +1229,28 @@ mod tests {
) )
} }
fn test_db_chunks(
rules: LifecycleRules,
chunks: Vec<TestChunk>,
now: Time,
) -> (TestDb, Arc<time::MockProvider>) {
test_db_partitions(rules, vec![TestPartition::new(chunks)], now)
}
fn test_db_partitions(
rules: LifecycleRules,
partitions: Vec<TestPartition>,
now: Time,
) -> (TestDb, Arc<time::MockProvider>) {
let mock_provider = Arc::new(time::MockProvider::new(now));
let time_provider: Arc<dyn TimeProvider> = Arc::<MockProvider>::clone(&mock_provider);
let tasks = TaskRegistry::new(Arc::clone(&time_provider));
let db = TestDb::new(rules, time_provider, partitions, tasks);
(db, mock_provider)
}
#[test] #[test]
fn test_default_rules() { fn test_default_rules() {
// The default rules shouldn't do anything // The default rules shouldn't do anything
@ -1253,9 +1261,9 @@ mod tests {
TestChunk::new(ChunkId::new_test(2), 1, ChunkStorage::OpenMutableBuffer), TestChunk::new(ChunkId::new_test(2), 1, ChunkStorage::OpenMutableBuffer),
]; ];
let db = TestDb::new(rules, chunks); let (db, _) = test_db_chunks(rules, chunks, from_secs(40));
let mut lifecycle = LifecyclePolicy::new(&db); let mut lifecycle = LifecyclePolicy::new(&db);
lifecycle.check_for_work(from_secs(40)); lifecycle.check_for_work();
assert_eq!(*db.events.read(), vec![]); assert_eq!(*db.events.read(), vec![]);
} }
@ -1271,15 +1279,17 @@ mod tests {
TestChunk::new(ChunkId::new_test(2), 0, ChunkStorage::OpenMutableBuffer), TestChunk::new(ChunkId::new_test(2), 0, ChunkStorage::OpenMutableBuffer),
]; ];
let db = TestDb::new(rules, chunks); let (db, time_provider) = test_db_chunks(rules, chunks, from_secs(0));
let mut lifecycle = LifecyclePolicy::new(&db); let mut lifecycle = LifecyclePolicy::new(&db);
let partition = Arc::clone(&db.partitions.read()[0]); let partition = Arc::clone(&db.partitions.read()[0]);
lifecycle.check_for_work(from_secs(9)); time_provider.set(from_secs(9));
lifecycle.check_for_work();
assert_eq!(*db.events.read(), vec![]); assert_eq!(*db.events.read(), vec![]);
lifecycle.check_for_work(from_secs(11)); time_provider.set(from_secs(11));
lifecycle.check_for_work();
let chunks = partition.read().chunks.keys().cloned().collect::<Vec<_>>(); let chunks = partition.read().chunks.keys().cloned().collect::<Vec<_>>();
// expect chunk 2 to have been compacted into a new chunk 3 // expect chunk 2 to have been compacted into a new chunk 3
assert_eq!( assert_eq!(
@ -1295,14 +1305,16 @@ mod tests {
] ]
); );
lifecycle.check_for_work(from_secs(12)); time_provider.set(from_secs(12));
lifecycle.check_for_work();
assert_eq!( assert_eq!(
*db.events.read(), *db.events.read(),
vec![MoverEvents::Compact(vec![ChunkId::new_test(2)])] vec![MoverEvents::Compact(vec![ChunkId::new_test(2)])]
); );
// Should compact everything possible // Should compact everything possible
lifecycle.check_for_work(from_secs(20)); time_provider.set(from_secs(20));
lifecycle.check_for_work();
assert_eq!( assert_eq!(
*db.events.read(), *db.events.read(),
vec![ vec![
@ -1327,12 +1339,12 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_backoff() { async fn test_backoff() {
let mut registry = TaskRegistry::new();
let rules = LifecycleRules { let rules = LifecycleRules {
late_arrive_window_seconds: NonZeroU32::new(100).unwrap(), late_arrive_window_seconds: NonZeroU32::new(100).unwrap(),
..Default::default() ..Default::default()
}; };
let db = TestDb::new(rules, vec![]); let (db, time_provider) = test_db_chunks(rules, vec![], from_secs(0));
let mut registry = TaskRegistry::new(time_provider);
let mut lifecycle = LifecyclePolicy::new(&db); let mut lifecycle = LifecyclePolicy::new(&db);
let (tracker, registration) = registry.register(ChunkLifecycleAction::Compacting); let (tracker, registration) = registry.register(ChunkLifecycleAction::Compacting);
@ -1341,12 +1353,12 @@ mod tests {
// of check_for_work had started a background move task // of check_for_work had started a background move task
lifecycle.trackers.push(tracker); lifecycle.trackers.push(tracker);
let future = lifecycle.check_for_work(from_secs(0)); let future = lifecycle.check_for_work();
tokio::time::timeout(Duration::from_millis(1), future) tokio::time::timeout(Duration::from_millis(1), future)
.await .await
.expect_err("expected timeout"); .expect_err("expected timeout");
let future = lifecycle.check_for_work(from_secs(0)); let future = lifecycle.check_for_work();
std::mem::drop(registration); std::mem::drop(registration);
tokio::time::timeout(Duration::from_millis(1), future) tokio::time::timeout(Duration::from_millis(1), future)
.await .await
@ -1372,21 +1384,28 @@ mod tests {
ChunkStorage::OpenMutableBuffer, ChunkStorage::OpenMutableBuffer,
)]; )];
let db = TestDb::new(rules.clone(), chunks); let now = from_secs(0);
let (db, time_provider) = test_db_chunks(rules.clone(), chunks, now);
let mut lifecycle = LifecyclePolicy::new(&db); let mut lifecycle = LifecyclePolicy::new(&db);
lifecycle.check_for_work(from_secs(10)); time_provider.set(from_secs(10));
lifecycle.check_for_work();
assert_eq!(*db.events.read(), vec![]); assert_eq!(*db.events.read(), vec![]);
let chunks = vec![ let mut tasks = TaskRegistry::new(Arc::<MockProvider>::clone(&time_provider));
let partitions = vec![TestPartition::new(vec![
// two "open" chunks => they must not be dropped (yet) // two "open" chunks => they must not be dropped (yet)
TestChunk::new(ChunkId::new_test(0), 0, ChunkStorage::OpenMutableBuffer), TestChunk::new(ChunkId::new_test(0), 0, ChunkStorage::OpenMutableBuffer),
TestChunk::new(ChunkId::new_test(1), 0, ChunkStorage::OpenMutableBuffer), TestChunk::new(ChunkId::new_test(1), 0, ChunkStorage::OpenMutableBuffer),
// "moved" chunk => can be dropped because `drop_non_persistent=true` // "moved" chunk => can be dropped because `drop_non_persistent=true`
TestChunk::new(ChunkId::new_test(2), 0, ChunkStorage::ReadBuffer), TestChunk::new(ChunkId::new_test(2), 0, ChunkStorage::ReadBuffer),
// "writing" chunk => cannot be unloaded while write is in-progress // "writing" chunk => cannot be unloaded while write is in-progress
TestChunk::new(ChunkId::new_test(3), 0, ChunkStorage::ReadBuffer) TestChunk::new(ChunkId::new_test(3), 0, ChunkStorage::ReadBuffer).with_action(
.with_action(ChunkLifecycleAction::Persisting), tasks
.complete(())
.with_metadata(ChunkLifecycleAction::Persisting),
),
// "written" chunk => can be unloaded // "written" chunk => can be unloaded
TestChunk::new( TestChunk::new(
ChunkId::new_test(4), ChunkId::new_test(4),
@ -1407,13 +1426,13 @@ mod tests {
count: 12, count: 12,
last_access: Time::from_timestamp(4, 0), last_access: Time::from_timestamp(4, 0),
}), }),
]; ])];
let db = TestDb::new(rules, chunks); let db = TestDb::new(rules, time_provider, partitions, tasks);
let mut lifecycle = LifecyclePolicy::new(&db); let mut lifecycle = LifecyclePolicy::new(&db);
// Should unload chunk 5 first as access time is smaller // Should unload chunk 5 first as access time is smaller
lifecycle.check_for_work(from_secs(10)); lifecycle.check_for_work();
assert_eq!( assert_eq!(
*db.events.read(), *db.events.read(),
vec![ vec![
@ -1440,33 +1459,39 @@ mod tests {
ChunkStorage::OpenMutableBuffer, ChunkStorage::OpenMutableBuffer,
)]; )];
let db = TestDb::new(rules.clone(), chunks); let (db, _) = test_db_chunks(rules.clone(), chunks, from_secs(10));
let mut lifecycle = LifecyclePolicy::new(&db); let mut lifecycle = LifecyclePolicy::new(&db);
lifecycle.check_for_work(from_secs(10)); lifecycle.check_for_work();
assert_eq!(*db.events.read(), vec![]); assert_eq!(*db.events.read(), vec![]);
let chunks = vec![ let time_provider = Arc::new(time::MockProvider::new(from_secs(0)));
let mut tasks = TaskRegistry::new(Arc::<MockProvider>::clone(&time_provider));
let partitions = vec![TestPartition::new(vec![
// two "open" chunks => they must not be dropped (yet) // two "open" chunks => they must not be dropped (yet)
TestChunk::new(ChunkId::new_test(0), 0, ChunkStorage::OpenMutableBuffer), TestChunk::new(ChunkId::new_test(0), 0, ChunkStorage::OpenMutableBuffer),
TestChunk::new(ChunkId::new_test(1), 0, ChunkStorage::OpenMutableBuffer), TestChunk::new(ChunkId::new_test(1), 0, ChunkStorage::OpenMutableBuffer),
// "moved" chunk => cannot be dropped because `drop_non_persistent=false` // "moved" chunk => cannot be dropped because `drop_non_persistent=false`
TestChunk::new(ChunkId::new_test(2), 0, ChunkStorage::ReadBuffer), TestChunk::new(ChunkId::new_test(2), 0, ChunkStorage::ReadBuffer),
// "writing" chunk => cannot be drop while write is in-progess // "writing" chunk => cannot be drop while write is in-progress
TestChunk::new(ChunkId::new_test(3), 0, ChunkStorage::ReadBuffer) TestChunk::new(ChunkId::new_test(3), 0, ChunkStorage::ReadBuffer).with_action(
.with_action(ChunkLifecycleAction::Persisting), tasks
.complete(())
.with_metadata(ChunkLifecycleAction::Persisting),
),
// "written" chunk => can be unloaded // "written" chunk => can be unloaded
TestChunk::new( TestChunk::new(
ChunkId::new_test(4), ChunkId::new_test(4),
0, 0,
ChunkStorage::ReadBufferAndObjectStore, ChunkStorage::ReadBufferAndObjectStore,
), ),
]; ])];
let db = TestDb::new(rules, chunks); let db = TestDb::new(rules, time_provider, partitions, tasks);
let mut lifecycle = LifecyclePolicy::new(&db); let mut lifecycle = LifecyclePolicy::new(&db);
lifecycle.check_for_work(from_secs(10)); lifecycle.check_for_work();
assert_eq!( assert_eq!(
*db.events.read(), *db.events.read(),
vec![MoverEvents::Unload(ChunkId::new_test(4))] vec![MoverEvents::Unload(ChunkId::new_test(4))]
@ -1487,10 +1512,10 @@ mod tests {
ChunkStorage::OpenMutableBuffer, ChunkStorage::OpenMutableBuffer,
)]; )];
let db = TestDb::new(rules, chunks); let (db, _) = test_db_chunks(rules, chunks, from_secs(10));
let mut lifecycle = LifecyclePolicy::new(&db); let mut lifecycle = LifecyclePolicy::new(&db);
lifecycle.check_for_work(from_secs(10)); lifecycle.check_for_work();
assert_eq!(*db.events.read(), vec![]); assert_eq!(*db.events.read(), vec![]);
} }
@ -1503,7 +1528,8 @@ mod tests {
..Default::default() ..Default::default()
}; };
let now = from_secs(20); let time_provider = Arc::new(time::MockProvider::new(from_secs(20)));
let mut tasks = TaskRegistry::new(Arc::<MockProvider>::clone(&time_provider));
let partitions = vec![ let partitions = vec![
TestPartition::new(vec![ TestPartition::new(vec![
@ -1583,7 +1609,11 @@ mod tests {
TestChunk::new(ChunkId::new_test(20), 20, ChunkStorage::ReadBuffer) TestChunk::new(ChunkId::new_test(20), 20, ChunkStorage::ReadBuffer)
.with_row_count(400) .with_row_count(400)
.with_order(ChunkOrder::new(4).unwrap()) .with_order(ChunkOrder::new(4).unwrap())
.with_action(ChunkLifecycleAction::Compacting), .with_action(
tasks
.complete(())
.with_metadata(ChunkLifecycleAction::Compacting),
),
// closed => can compact // closed => can compact
TestChunk::new(ChunkId::new_test(21), 20, ChunkStorage::ReadBuffer) TestChunk::new(ChunkId::new_test(21), 20, ChunkStorage::ReadBuffer)
.with_row_count(400) .with_row_count(400)
@ -1595,14 +1625,18 @@ mod tests {
TestChunk::new(ChunkId::new_test(23), 20, ChunkStorage::ReadBuffer) TestChunk::new(ChunkId::new_test(23), 20, ChunkStorage::ReadBuffer)
.with_row_count(400) .with_row_count(400)
.with_order(ChunkOrder::new(1).unwrap()) .with_order(ChunkOrder::new(1).unwrap())
.with_action(ChunkLifecycleAction::Compacting), .with_action(
tasks
.complete(())
.with_metadata(ChunkLifecycleAction::Compacting),
),
]), ]),
]; ];
let db = TestDb::from_partitions(rules, partitions); let db = TestDb::new(rules, time_provider, partitions, tasks);
let mut lifecycle = LifecyclePolicy::new(&db); let mut lifecycle = LifecyclePolicy::new(&db);
lifecycle.check_for_work(now); lifecycle.check_for_work();
assert_eq!( assert_eq!(
*db.events.read(), *db.events.read(),
vec![ vec![
@ -1620,7 +1654,7 @@ mod tests {
); );
db.events.write().clear(); db.events.write().clear();
lifecycle.check_for_work(now); lifecycle.check_for_work();
assert_eq!( assert_eq!(
*db.events.read(), *db.events.read(),
vec![MoverEvents::Compact(vec![ vec![MoverEvents::Compact(vec![
@ -1633,11 +1667,10 @@ mod tests {
#[test] #[test]
fn test_compaction_limiter() { fn test_compaction_limiter() {
let rules = LifecycleRules { let rules = LifecycleRules {
max_active_compactions: MaxActiveCompactions(2.try_into().unwrap()), max_active_compactions: MaxActiveCompactions(NonZeroU32::new(2).unwrap()),
..Default::default() ..Default::default()
}; };
let now = from_secs(50);
let partitions = vec![ let partitions = vec![
TestPartition::new(vec![ TestPartition::new(vec![
// closed => can compact // closed => can compact
@ -1661,10 +1694,10 @@ mod tests {
]), ]),
]; ];
let db = TestDb::from_partitions(rules, partitions); let (db, _) = test_db_partitions(rules, partitions, from_secs(50));
let mut lifecycle = LifecyclePolicy::new(&db); let mut lifecycle = LifecyclePolicy::new(&db);
lifecycle.check_for_work(now); lifecycle.check_for_work();
assert_eq!( assert_eq!(
*db.events.read(), *db.events.read(),
vec![ vec![
@ -1680,7 +1713,7 @@ mod tests {
db.events.write().clear(); db.events.write().clear();
// Compaction slots freed up, other partition can now compact. // Compaction slots freed up, other partition can now compact.
lifecycle.check_for_work(now); lifecycle.check_for_work();
assert_eq!( assert_eq!(
*db.events.read(), *db.events.read(),
vec![MoverEvents::Compact(vec![ChunkId::new_test(200)]),], vec![MoverEvents::Compact(vec![ChunkId::new_test(200)]),],
@ -1698,7 +1731,9 @@ mod tests {
..Default::default() ..Default::default()
}; };
let now = from_secs(0); let now = from_secs(0);
let time_now = Time::from_date_time(now);
let time_provider = Arc::new(time::MockProvider::new(now));
let mut tasks = TaskRegistry::new(Arc::<MockProvider>::clone(&time_provider));
let partitions = vec![ let partitions = vec![
// Insufficient rows and not old enough => don't persist but can compact // Insufficient rows and not old enough => don't persist but can compact
@ -1708,7 +1743,7 @@ mod tests {
TestChunk::new(ChunkId::new_test(1), 0, ChunkStorage::ReadBuffer) TestChunk::new(ChunkId::new_test(1), 0, ChunkStorage::ReadBuffer)
.with_min_timestamp(from_secs(5)), .with_min_timestamp(from_secs(5)),
]) ])
.with_persistence(10, time_now, Time::from_timestamp(20, 0)), .with_persistence(10, now, from_secs(20)),
// Sufficient rows => persist // Sufficient rows => persist
TestPartition::new(vec![ TestPartition::new(vec![
TestChunk::new(ChunkId::new_test(2), 0, ChunkStorage::ClosedMutableBuffer) TestChunk::new(ChunkId::new_test(2), 0, ChunkStorage::ClosedMutableBuffer)
@ -1716,7 +1751,7 @@ mod tests {
TestChunk::new(ChunkId::new_test(3), 0, ChunkStorage::ReadBuffer) TestChunk::new(ChunkId::new_test(3), 0, ChunkStorage::ReadBuffer)
.with_min_timestamp(from_secs(5)), .with_min_timestamp(from_secs(5)),
]) ])
.with_persistence(1_000, time_now, Time::from_timestamp(20, 0)), .with_persistence(1_000, now, from_secs(20)),
// Writes too old => persist // Writes too old => persist
TestPartition::new(vec![ TestPartition::new(vec![
// Should split open chunks // Should split open chunks
@ -1727,39 +1762,47 @@ mod tests {
TestChunk::new(ChunkId::new_test(6), 0, ChunkStorage::ObjectStoreOnly) TestChunk::new(ChunkId::new_test(6), 0, ChunkStorage::ObjectStoreOnly)
.with_min_timestamp(from_secs(5)), .with_min_timestamp(from_secs(5)),
]) ])
.with_persistence( .with_persistence(10, now - Duration::from_secs(10), from_secs(20)),
10,
time_now - Duration::from_secs(10),
Time::from_timestamp(20, 0),
),
// Sufficient rows but conflicting compaction => prevent compaction // Sufficient rows but conflicting compaction => prevent compaction
TestPartition::new(vec![ TestPartition::new(vec![
TestChunk::new(ChunkId::new_test(7), 0, ChunkStorage::ClosedMutableBuffer) TestChunk::new(ChunkId::new_test(7), 0, ChunkStorage::ClosedMutableBuffer)
.with_min_timestamp(from_secs(10)) .with_min_timestamp(from_secs(10))
.with_action(ChunkLifecycleAction::Compacting), .with_action(
tasks
.complete(())
.with_metadata(ChunkLifecycleAction::Compacting),
),
// This chunk would be a compaction candidate, but we want to persist it // This chunk would be a compaction candidate, but we want to persist it
TestChunk::new(ChunkId::new_test(8), 0, ChunkStorage::ClosedMutableBuffer) TestChunk::new(ChunkId::new_test(8), 0, ChunkStorage::ClosedMutableBuffer)
.with_min_timestamp(from_secs(10)), .with_min_timestamp(from_secs(10)),
TestChunk::new(ChunkId::new_test(9), 0, ChunkStorage::ReadBuffer) TestChunk::new(ChunkId::new_test(9), 0, ChunkStorage::ReadBuffer)
.with_min_timestamp(from_secs(5)), .with_min_timestamp(from_secs(5)),
]) ])
.with_persistence(1_000, time_now, Time::from_timestamp(20, 0)), .with_persistence(1_000, now, from_secs(20)),
// Sufficient rows and non-conflicting compaction => persist // Sufficient rows and non-conflicting compaction => persist
TestPartition::new(vec![ TestPartition::new(vec![
TestChunk::new(ChunkId::new_test(10), 0, ChunkStorage::ClosedMutableBuffer) TestChunk::new(ChunkId::new_test(10), 0, ChunkStorage::ClosedMutableBuffer)
.with_min_timestamp(from_secs(21)) .with_min_timestamp(from_secs(21))
.with_action(ChunkLifecycleAction::Compacting), .with_action(
tasks
.complete(())
.with_metadata(ChunkLifecycleAction::Compacting),
),
TestChunk::new(ChunkId::new_test(11), 0, ChunkStorage::ClosedMutableBuffer) TestChunk::new(ChunkId::new_test(11), 0, ChunkStorage::ClosedMutableBuffer)
.with_min_timestamp(from_secs(10)), .with_min_timestamp(from_secs(10)),
TestChunk::new(ChunkId::new_test(12), 0, ChunkStorage::ReadBuffer) TestChunk::new(ChunkId::new_test(12), 0, ChunkStorage::ReadBuffer)
.with_min_timestamp(from_secs(5)), .with_min_timestamp(from_secs(5)),
]) ])
.with_persistence(1_000, time_now, Time::from_timestamp(20, 0)), .with_persistence(1_000, now, from_secs(20)),
// Sufficient rows, non-conflicting compaction and compact-able chunk => persist + compact // Sufficient rows, non-conflicting compaction and compact-able chunk => persist + compact
TestPartition::new(vec![ TestPartition::new(vec![
TestChunk::new(ChunkId::new_test(13), 0, ChunkStorage::ClosedMutableBuffer) TestChunk::new(ChunkId::new_test(13), 0, ChunkStorage::ClosedMutableBuffer)
.with_min_timestamp(from_secs(21)) .with_min_timestamp(from_secs(21))
.with_action(ChunkLifecycleAction::Compacting), .with_action(
tasks
.complete(())
.with_metadata(ChunkLifecycleAction::Compacting),
),
TestChunk::new(ChunkId::new_test(14), 0, ChunkStorage::ClosedMutableBuffer) TestChunk::new(ChunkId::new_test(14), 0, ChunkStorage::ClosedMutableBuffer)
.with_min_timestamp(from_secs(21)) .with_min_timestamp(from_secs(21))
.with_order(ChunkOrder::new(10).unwrap()), .with_order(ChunkOrder::new(10).unwrap()),
@ -1768,7 +1811,7 @@ mod tests {
TestChunk::new(ChunkId::new_test(16), 0, ChunkStorage::ReadBuffer) TestChunk::new(ChunkId::new_test(16), 0, ChunkStorage::ReadBuffer)
.with_min_timestamp(from_secs(5)), .with_min_timestamp(from_secs(5)),
]) ])
.with_persistence(1_000, time_now, Time::from_timestamp(20, 0)), .with_persistence(1_000, now, from_secs(20)),
// Checks that we include chunks in a closed "order"-based interval. // Checks that we include chunks in a closed "order"-based interval.
// Note that the chunks here are ordered in reverse to check if the lifecycle policy really uses the chunk // Note that the chunks here are ordered in reverse to check if the lifecycle policy really uses the chunk
// order during iteration. // order during iteration.
@ -1789,13 +1832,13 @@ mod tests {
.with_min_timestamp(from_secs(25)) .with_min_timestamp(from_secs(25))
.with_order(ChunkOrder::new(1).unwrap()), .with_order(ChunkOrder::new(1).unwrap()),
]) ])
.with_persistence(1_000, time_now, Time::from_timestamp(20, 0)), .with_persistence(1_000, now, from_secs(20)),
]; ];
let db = TestDb::from_partitions(rules, partitions); let db = TestDb::new(rules, time_provider, partitions, tasks);
let mut lifecycle = LifecyclePolicy::new(&db); let mut lifecycle = LifecyclePolicy::new(&db);
lifecycle.check_for_work(from_secs(0)); lifecycle.check_for_work();
assert_eq!( assert_eq!(
*db.events.read(), *db.events.read(),
vec![ vec![
@ -1828,22 +1871,17 @@ mod tests {
persist_age_threshold_seconds: NonZeroU32::new(20).unwrap(), persist_age_threshold_seconds: NonZeroU32::new(20).unwrap(),
..Default::default() ..Default::default()
}; };
let now = Utc::now();
let time_now = Time::from_date_time(now);
// This could occur if the in-memory contents of a partition are deleted, and // This could occur if the in-memory contents of a partition are deleted, and
// compaction causes the chunks to be removed. In such a scenario the persistence // compaction causes the chunks to be removed. In such a scenario the persistence
// windows will still think there are rows to be persisted // windows will still think there are rows to be persisted
let partitions = vec![TestPartition::new(vec![]).with_persistence( let partitions =
10, vec![TestPartition::new(vec![]).with_persistence(10, from_secs(0), from_secs(20))];
time_now - Duration::from_secs(20),
Time::from_timestamp(20, 0),
)];
let db = TestDb::from_partitions(rules, partitions); let (db, _) = test_db_partitions(rules, partitions, from_secs(20));
let mut lifecycle = LifecyclePolicy::new(&db); let mut lifecycle = LifecyclePolicy::new(&db);
lifecycle.check_for_work(now); lifecycle.check_for_work();
assert_eq!(*db.events.read(), vec![MoverEvents::Persist(vec![]),]); assert_eq!(*db.events.read(), vec![MoverEvents::Persist(vec![]),]);
} }
@ -1857,8 +1895,7 @@ mod tests {
max_active_compactions: MaxActiveCompactions(NonZeroU32::new(10).unwrap()), max_active_compactions: MaxActiveCompactions(NonZeroU32::new(10).unwrap()),
..Default::default() ..Default::default()
}; };
let now = Utc::now(); let now = Time::from_timestamp_nanos(0);
let time_now = Time::from_date_time(now);
let partitions = vec![ let partitions = vec![
// Sufficient rows => could persist but should be suppressed // Sufficient rows => could persist but should be suppressed
@ -1868,13 +1905,13 @@ mod tests {
TestChunk::new(ChunkId::new_test(3), 0, ChunkStorage::ReadBuffer) TestChunk::new(ChunkId::new_test(3), 0, ChunkStorage::ReadBuffer)
.with_min_timestamp(from_secs(5)), .with_min_timestamp(from_secs(5)),
]) ])
.with_persistence(1_000, time_now, Time::from_timestamp(20, 0)), .with_persistence(1_000, now, from_secs(20)),
]; ];
let db = TestDb::from_partitions(rules, partitions); let (db, _) = test_db_partitions(rules, partitions, now);
let mut lifecycle = LifecyclePolicy::new_suppress_persistence(&db); let mut lifecycle = LifecyclePolicy::new_suppress_persistence(&db);
lifecycle.check_for_work(now); lifecycle.check_for_work();
assert_eq!( assert_eq!(
*db.events.read(), *db.events.read(),
vec![MoverEvents::Compact(vec![ vec![MoverEvents::Compact(vec![
@ -1883,7 +1920,7 @@ mod tests {
]),] ]),]
); );
lifecycle.check_for_work(now); lifecycle.check_for_work();
assert_eq!( assert_eq!(
*db.events.read(), *db.events.read(),
vec![MoverEvents::Compact(vec![ vec![MoverEvents::Compact(vec![
@ -1894,7 +1931,7 @@ mod tests {
lifecycle.unsuppress_persistence(); lifecycle.unsuppress_persistence();
lifecycle.check_for_work(now); lifecycle.check_for_work();
assert_eq!( assert_eq!(
*db.events.read(), *db.events.read(),
vec![ vec![
@ -1916,10 +1953,10 @@ mod tests {
ChunkStorage::OpenMutableBuffer, ChunkStorage::OpenMutableBuffer,
)]; )];
let db = TestDb::new(rules, chunks); let (db, _) = test_db_chunks(rules, chunks, from_secs(80));
let mut lifecycle = LifecyclePolicy::new(&db); let mut lifecycle = LifecyclePolicy::new(&db);
lifecycle.check_for_work(from_secs(80)); lifecycle.check_for_work();
assert_eq!( assert_eq!(
*db.events.read(), *db.events.read(),
vec![MoverEvents::Compact(vec![ChunkId::new_test(0)])] vec![MoverEvents::Compact(vec![ChunkId::new_test(0)])]
@ -1938,10 +1975,10 @@ mod tests {
ChunkStorage::ClosedMutableBuffer, ChunkStorage::ClosedMutableBuffer,
)]; )];
let db = TestDb::new(rules, chunks); let (db, _) = test_db_chunks(rules, chunks, from_secs(80));
let mut lifecycle = LifecyclePolicy::new(&db); let mut lifecycle = LifecyclePolicy::new(&db);
lifecycle.check_for_work(from_secs(80)); lifecycle.check_for_work();
assert_eq!( assert_eq!(
*db.events.read(), *db.events.read(),
vec![MoverEvents::Compact(vec![ChunkId::new_test(0)])] vec![MoverEvents::Compact(vec![ChunkId::new_test(0)])]
@ -1957,35 +1994,34 @@ mod tests {
ChunkStorage::ClosedMutableBuffer, ChunkStorage::ClosedMutableBuffer,
)]; )];
let db = TestDb::new(rules, chunks); let (db, time_provider) = test_db_chunks(rules, chunks, from_secs(0));
let mut lifecycle = LifecyclePolicy::new(&db); let mut lifecycle = LifecyclePolicy::new(&db);
let chunk = Arc::clone(&db.partitions.read()[0].read().chunks[&ChunkId::new_test(0)].1); let chunk = Arc::clone(&db.partitions.read()[0].read().chunks[&ChunkId::new_test(0)].1);
let r0 = TaskRegistration::default(); let (tracker, r0) = db.registry.lock().register(());
let tracker = TaskTracker::new(TaskId(0), &r0, ChunkLifecycleAction::Compacting); chunk.write().lifecycle_action =
chunk.write().lifecycle_action = Some(tracker.clone()); Some(tracker.with_metadata(ChunkLifecycleAction::Compacting));
// Shouldn't do anything // Shouldn't do anything
lifecycle.check_for_work(tracker.start_time()); lifecycle.check_for_work();
assert!(chunk.read().lifecycle_action().is_some()); assert!(chunk.read().lifecycle_action().is_some());
// Shouldn't do anything as job hasn't finished // Shouldn't do anything as job hasn't finished
lifecycle.check_for_work( time_provider.set(from_secs(0) + LIFECYCLE_ACTION_BACKOFF);
tracker.start_time() + chrono::Duration::seconds(LIFECYCLE_ACTION_BACKOFF_SECONDS), lifecycle.check_for_work();
);
assert!(chunk.read().lifecycle_action().is_some()); assert!(chunk.read().lifecycle_action().is_some());
// "Finish" job // "Finish" job
std::mem::drop(r0); std::mem::drop(r0);
// Shouldn't do anything as insufficient time passed // Shouldn't do anything as insufficient time passed
lifecycle.check_for_work(tracker.start_time()); time_provider.set(from_secs(0));
lifecycle.check_for_work();
assert!(chunk.read().lifecycle_action().is_some()); assert!(chunk.read().lifecycle_action().is_some());
// Should clear job // Should clear job
lifecycle.check_for_work( time_provider.set(from_secs(0) + LIFECYCLE_ACTION_BACKOFF);
tracker.start_time() + chrono::Duration::seconds(LIFECYCLE_ACTION_BACKOFF_SECONDS), lifecycle.check_for_work();
);
assert!(chunk.read().lifecycle_action().is_none()); assert!(chunk.read().lifecycle_action().is_none());
} }
} }

View File

@ -17,7 +17,6 @@ edition = "2018"
arrow = { version = "5.5", features = ["prettyprint"] } arrow = { version = "5.5", features = ["prettyprint"] }
arrow_util = { path = "../arrow_util" } arrow_util = { path = "../arrow_util" }
async-trait = "0.1" async-trait = "0.1"
chrono = "0.4"
data_types = { path = "../data_types" } data_types = { path = "../data_types" }
entry = { path = "../entry" } entry = { path = "../entry" }
hashbrown = "0.11" hashbrown = "0.11"

View File

@ -13,7 +13,6 @@ edition = "2018"
[dependencies] # In alphabetical order [dependencies] # In alphabetical order
arrow = { version = "5.5", features = ["prettyprint"] } arrow = { version = "5.5", features = ["prettyprint"] }
arrow_util = { path = "../arrow_util" } arrow_util = { path = "../arrow_util" }
chrono = "0.4"
croaring = "0.5" croaring = "0.5"
data_types = { path = "../data_types" } data_types = { path = "../data_types" }
datafusion = { path = "../datafusion" } datafusion = { path = "../datafusion" }

View File

@ -30,7 +30,10 @@ impl ApplicationState {
let metric_registry = Arc::new(metric::Registry::new()); let metric_registry = Arc::new(metric::Registry::new());
let time_provider: Arc<dyn TimeProvider> = Arc::new(time::SystemProvider::new()); let time_provider: Arc<dyn TimeProvider> = Arc::new(time::SystemProvider::new());
let job_registry = Arc::new(JobRegistry::new(Arc::clone(&metric_registry))); let job_registry = Arc::new(JobRegistry::new(
Arc::clone(&metric_registry),
Arc::clone(&time_provider),
));
let write_buffer_factory = let write_buffer_factory =
Arc::new(WriteBufferConfigFactory::new(Arc::clone(&time_provider))); Arc::new(WriteBufferConfigFactory::new(Arc::clone(&time_provider)));

View File

@ -826,7 +826,7 @@ impl Db {
.as_mut() .as_mut()
.expect("lifecycle policy should be initialized"); .expect("lifecycle policy should be initialized");
policy.check_for_work(self.time_provider.now().date_time()) policy.check_for_work()
}; };
fut.await fut.await
} }

View File

@ -939,7 +939,7 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn set_compacting_freezes_chunk() { async fn set_compacting_freezes_chunk() {
let mut chunk = make_open_chunk(); let mut chunk = make_open_chunk();
let registration = TaskRegistration::new(); let registration = TaskRegistration::new(Arc::clone(&chunk.time_provider));
assert!(matches!(chunk.stage, ChunkStage::Open { .. })); assert!(matches!(chunk.stage, ChunkStage::Open { .. }));
chunk.set_compacting(&registration).unwrap(); chunk.set_compacting(&registration).unwrap();
@ -956,7 +956,7 @@ mod tests {
assert_ne!(size_before, 0); assert_ne!(size_before, 0);
// start dropping it // start dropping it
let registration = TaskRegistration::new(); let registration = TaskRegistration::new(Arc::clone(&chunk.time_provider));
chunk.set_dropping(&registration).unwrap(); chunk.set_dropping(&registration).unwrap();
// size should now be reported as zero // size should now be reported as zero
@ -968,7 +968,7 @@ mod tests {
assert_eq!(memory_metrics.total(), size_before); assert_eq!(memory_metrics.total(), size_before);
// when the lifecycle action cannot be set (e.g. due to an action already in progress), do NOT zero out the size // when the lifecycle action cannot be set (e.g. due to an action already in progress), do NOT zero out the size
let registration = TaskRegistration::new(); let registration = TaskRegistration::new(Arc::clone(&chunk.time_provider));
chunk.set_compacting(&registration).unwrap(); chunk.set_compacting(&registration).unwrap();
let size_before = memory_metrics.total(); let size_before = memory_metrics.total();
chunk.set_dropping(&registration).unwrap_err(); chunk.set_dropping(&registration).unwrap_err();
@ -978,7 +978,7 @@ mod tests {
#[test] #[test]
fn test_lifecycle_action() { fn test_lifecycle_action() {
let mut chunk = make_open_chunk(); let mut chunk = make_open_chunk();
let registration = TaskRegistration::new(); let registration = TaskRegistration::new(Arc::clone(&chunk.time_provider));
// no action to begin with // no action to begin with
assert!(chunk.lifecycle_action().is_none()); assert!(chunk.lifecycle_action().is_none());
@ -1040,7 +1040,7 @@ mod tests {
#[test] #[test]
fn test_clear_lifecycle_action() { fn test_clear_lifecycle_action() {
let mut chunk = make_open_chunk(); let mut chunk = make_open_chunk();
let registration = TaskRegistration::new(); let registration = TaskRegistration::new(Arc::clone(&chunk.time_provider));
// clearing w/o any action in-progress works // clearing w/o any action in-progress works
chunk.clear_lifecycle_action().unwrap(); chunk.clear_lifecycle_action().unwrap();

View File

@ -1,16 +1,20 @@
use super::partition::Partition; use std::{ops::Deref, result::Result, sync::Arc};
use crate::db::catalog::metrics::TableMetrics;
use data_types::partition_metadata::{PartitionAddr, PartitionSummary};
use hashbrown::HashMap; use hashbrown::HashMap;
use data_types::partition_metadata::{PartitionAddr, PartitionSummary};
use schema::{ use schema::{
builder::SchemaBuilder, builder::SchemaBuilder,
merge::{Error as SchemaMergerError, SchemaMerger}, merge::{Error as SchemaMergerError, SchemaMerger},
Schema, Schema,
}; };
use std::{ops::Deref, result::Result, sync::Arc};
use time::TimeProvider; use time::TimeProvider;
use tracker::{RwLock, RwLockReadGuard, RwLockWriteGuard}; use tracker::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use crate::db::catalog::metrics::TableMetrics;
use super::partition::Partition;
/// A `Table` is a collection of `Partition` each of which is a collection of `Chunk` /// A `Table` is a collection of `Partition` each of which is a collection of `Chunk`
#[derive(Debug)] #[derive(Debug)]
pub struct Table { pub struct Table {

View File

@ -4,13 +4,12 @@ use crate::{
Db, Db,
}; };
use ::lifecycle::LifecycleDb; use ::lifecycle::LifecycleDb;
use chrono::{DateTime, TimeZone, Utc};
use data_types::{ use data_types::{
chunk_metadata::{ChunkAddr, ChunkId, ChunkLifecycleAction, ChunkOrder, ChunkStorage}, chunk_metadata::{ChunkAddr, ChunkId, ChunkLifecycleAction, ChunkOrder, ChunkStorage},
database_rules::LifecycleRules, database_rules::LifecycleRules,
error::ErrorLogger, error::ErrorLogger,
job::Job, job::Job,
partition_metadata::Statistics, partition_metadata::{PartitionAddr, Statistics},
DatabaseName, DatabaseName,
}; };
use datafusion::physical_plan::SendableRecordBatchStream; use datafusion::physical_plan::SendableRecordBatchStream;
@ -28,11 +27,10 @@ use std::{
fmt::Display, fmt::Display,
sync::{Arc, Weak}, sync::{Arc, Weak},
}; };
use time::Time; use time::{Time, TimeProvider};
use tracker::{RwLock, TaskTracker}; use tracker::{RwLock, TaskTracker};
pub(crate) use compact::compact_chunks; pub(crate) use compact::compact_chunks;
use data_types::partition_metadata::PartitionAddr;
pub(crate) use drop::{drop_chunk, drop_partition}; pub(crate) use drop::{drop_chunk, drop_partition};
pub(crate) use error::{Error, Result}; pub(crate) use error::{Error, Result};
pub(crate) use persist::persist_chunks; pub(crate) use persist::persist_chunks;
@ -46,6 +44,8 @@ mod unload;
mod write; mod write;
/// A newtype wrapper around `Weak<Db>` to workaround trait orphan rules /// A newtype wrapper around `Weak<Db>` to workaround trait orphan rules
///
/// TODO: Pull LifecyclePolicy out of Db to allow strong reference (#2242)
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct WeakDb(pub(super) Weak<Db>); pub struct WeakDb(pub(super) Weak<Db>);
@ -276,6 +276,16 @@ impl LifecycleDb for WeakDb {
.map(|db| db.rules.read().name.clone()) .map(|db| db.rules.read().name.clone())
.unwrap_or_else(|| "gone".to_string().try_into().unwrap()) .unwrap_or_else(|| "gone".to_string().try_into().unwrap())
} }
fn time_provider(&self) -> Arc<dyn TimeProvider> {
Arc::clone(
&self
.0
.upgrade()
.expect("database dropped without shutting down lifecycle")
.time_provider,
)
}
} }
impl LifecyclePartition for Partition { impl LifecyclePartition for Partition {
@ -311,7 +321,7 @@ impl LifecycleChunk for CatalogChunk {
.expect("failed to clear lifecycle action") .expect("failed to clear lifecycle action")
} }
fn min_timestamp(&self) -> DateTime<Utc> { fn min_timestamp(&self) -> Time {
let table_summary = self.table_summary(); let table_summary = self.table_summary();
let col = table_summary let col = table_summary
.columns .columns
@ -324,7 +334,7 @@ impl LifecycleChunk for CatalogChunk {
_ => panic!("unexpected time column type"), _ => panic!("unexpected time column type"),
}; };
Utc.timestamp_nanos(min) Time::from_timestamp_nanos(min)
} }
fn access_metrics(&self) -> AccessMetrics { fn access_metrics(&self) -> AccessMetrics {

View File

@ -7,6 +7,7 @@ use std::{
sync::Arc, sync::Arc,
time::Duration, time::Duration,
}; };
use time::TimeProvider;
use tracker::{TaskId, TaskRegistration, TaskRegistryWithHistory, TaskTracker, TrackedFutureExt}; use tracker::{TaskId, TaskRegistration, TaskRegistryWithHistory, TaskTracker, TrackedFutureExt};
const JOB_HISTORY_SIZE: usize = 1000; const JOB_HISTORY_SIZE: usize = 1000;
@ -24,10 +25,13 @@ pub struct JobRegistryInner {
} }
impl JobRegistry { impl JobRegistry {
pub fn new(metric_registry: Arc<metric::Registry>) -> Self { pub fn new(
metric_registry: Arc<metric::Registry>,
time_provider: Arc<dyn TimeProvider>,
) -> Self {
Self { Self {
inner: Mutex::new(JobRegistryInner { inner: Mutex::new(JobRegistryInner {
registry: TaskRegistryWithHistory::new(JOB_HISTORY_SIZE), registry: TaskRegistryWithHistory::new(time_provider, JOB_HISTORY_SIZE),
metrics: JobRegistryMetrics::new(metric_registry), metrics: JobRegistryMetrics::new(metric_registry),
}), }),
} }

View File

@ -124,6 +124,11 @@ impl TestDbBuilder {
}; };
} }
let jobs = Arc::new(JobRegistry::new(
Default::default(),
Arc::clone(&time_provider),
));
let database_to_commit = DatabaseToCommit { let database_to_commit = DatabaseToCommit {
rules: Arc::new(rules), rules: Arc::new(rules),
server_id, server_id,
@ -138,10 +143,7 @@ impl TestDbBuilder {
TestDb { TestDb {
metric_registry, metric_registry,
db: Db::new( db: Db::new(database_to_commit, jobs),
database_to_commit,
Arc::new(JobRegistry::new(Default::default())),
),
replay_plan: replay_plan.expect("did not skip replay"), replay_plan: replay_plan.expect("did not skip replay"),
} }
} }

View File

@ -82,6 +82,14 @@ impl Time {
Self(Utc.timestamp_millis(millis)) Self(Utc.timestamp_millis(millis))
} }
/// Makes a new `DateTime` from the number of non-leap milliseconds
/// since January 1, 1970 0:00:00 UTC (aka "UNIX timestamp").
///
/// Returns None if out of range
pub fn from_timestamp_millis_opt(millis: i64) -> Option<Self> {
Some(Self(Utc.timestamp_millis_opt(millis).single()?))
}
/// Makes a new `Time` from the number of non-leap seconds /// Makes a new `Time` from the number of non-leap seconds
/// since January 1, 1970 0:00:00 UTC (aka "UNIX timestamp") /// since January 1, 1970 0:00:00 UTC (aka "UNIX timestamp")
/// and the number of nanoseconds since the last whole non-leap second. /// and the number of nanoseconds since the last whole non-leap second.
@ -307,6 +315,8 @@ mod test {
assert!(chrono::Duration::from_std(duration).is_err()); assert!(chrono::Duration::from_std(duration).is_err());
assert!(time.checked_add(duration).is_none()); assert!(time.checked_add(duration).is_none());
assert!(time.checked_sub(duration).is_none()); assert!(time.checked_sub(duration).is_none());
assert!(Time::from_timestamp_millis_opt(i64::MAX).is_none())
} }
#[test] #[test]

View File

@ -7,7 +7,6 @@ description = "Utilities for tracking resource utilisation within IOx"
[dependencies] [dependencies]
chrono = "0.4"
futures = "0.3" futures = "0.3"
hashbrown = "0.11" hashbrown = "0.11"
lock_api = "0.4.4" lock_api = "0.4.4"
@ -15,6 +14,7 @@ metric = { path = "../metric" }
observability_deps = { path = "../observability_deps" } observability_deps = { path = "../observability_deps" }
parking_lot = "0.11.2" parking_lot = "0.11.2"
pin-project = "1.0" pin-project = "1.0"
time = { path = "../time" }
tokio = { version = "1.11", features = ["macros", "time"] } tokio = { version = "1.11", features = ["macros", "time"] }
tokio-util = { version = "0.6.3" } tokio-util = { version = "0.6.3" }

View File

@ -80,11 +80,11 @@
//! etc... between threads as any such functionality must perform the necessary //! etc... between threads as any such functionality must perform the necessary
//! synchronisation to be well-formed. //! synchronisation to be well-formed.
use chrono::{DateTime, Utc};
use std::sync::{ use std::sync::{
atomic::{AtomicUsize, Ordering}, atomic::{AtomicUsize, Ordering},
Arc, Arc,
}; };
use time::{Time, TimeProvider};
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
@ -100,7 +100,9 @@ mod registry;
/// The state shared between all sibling tasks /// The state shared between all sibling tasks
#[derive(Debug)] #[derive(Debug)]
struct TrackerState { struct TrackerState {
start_time: DateTime<Utc>, start_time: Time,
time_provider: Arc<dyn TimeProvider>,
cancel_token: CancellationToken, cancel_token: CancellationToken,
cpu_nanos: AtomicUsize, cpu_nanos: AtomicUsize,
wall_nanos: AtomicUsize, wall_nanos: AtomicUsize,
@ -346,12 +348,6 @@ where
} }
} }
/// Returns a complete task tracker
pub fn complete(metadata: T) -> Self {
let registration = TaskRegistration::new();
Self::new(TaskId(0), &registration, metadata)
}
/// Returns the ID of the Tracker - these are unique per TrackerRegistry /// Returns the ID of the Tracker - these are unique per TrackerRegistry
pub fn id(&self) -> TaskId { pub fn id(&self) -> TaskId {
self.id self.id
@ -384,7 +380,7 @@ where
} }
/// Returns the instant the tracker was created /// Returns the instant the tracker was created
pub fn start_time(&self) -> DateTime<Utc> { pub fn start_time(&self) -> Time {
self.state.start_time self.state.start_time
} }
@ -441,10 +437,11 @@ impl Clone for TaskRegistration {
} }
} }
impl Default for TaskRegistration { impl TaskRegistration {
fn default() -> Self { pub fn new(time_provider: Arc<dyn TimeProvider>) -> Self {
let state = Arc::new(TrackerState { let state = Arc::new(TrackerState {
start_time: Utc::now(), start_time: time_provider.now(),
time_provider,
cpu_nanos: AtomicUsize::new(0), cpu_nanos: AtomicUsize::new(0),
wall_nanos: AtomicUsize::new(0), wall_nanos: AtomicUsize::new(0),
cancel_token: CancellationToken::new(), cancel_token: CancellationToken::new(),
@ -459,12 +456,6 @@ impl Default for TaskRegistration {
Self { state } Self { state }
} }
}
impl TaskRegistration {
pub fn new() -> Self {
Self::default()
}
/// Converts the registration into a tracker with id 0 and specified metadata /// Converts the registration into a tracker with id 0 and specified metadata
pub fn into_tracker<T>(self, metadata: T) -> TaskTracker<T> pub fn into_tracker<T>(self, metadata: T) -> TaskTracker<T>
@ -515,10 +506,14 @@ mod tests {
futures::future::ready(Ok(())) futures::future::ready(Ok(()))
} }
fn test_registry<T: Send + Sync>() -> TaskRegistry<T> {
TaskRegistry::new(Arc::new(time::SystemProvider::new()))
}
#[tokio::test] #[tokio::test]
async fn test_lifecycle() { async fn test_lifecycle() {
let (sender, receive) = oneshot::channel(); let (sender, receive) = oneshot::channel();
let mut registry = TaskRegistry::new(); let mut registry = test_registry();
let (tracker, registration) = registry.register(()); let (tracker, registration) = registry.register(());
tokio::spawn(receive.track(registration)); tokio::spawn(receive.track(registration));
@ -535,7 +530,7 @@ mod tests {
async fn test_interleaved() { async fn test_interleaved() {
let (sender1, receive1) = oneshot::channel(); let (sender1, receive1) = oneshot::channel();
let (sender2, receive2) = oneshot::channel(); let (sender2, receive2) = oneshot::channel();
let mut registry = TaskRegistry::new(); let mut registry = test_registry();
let (t1, registration1) = registry.register(1); let (t1, registration1) = registry.register(1);
let (t2, registration2) = registry.register(2); let (t2, registration2) = registry.register(2);
@ -559,7 +554,7 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_drop() { async fn test_drop() {
let mut registry = TaskRegistry::new(); let mut registry = test_registry();
let (_, registration) = registry.register(()); let (_, registration) = registry.register(());
{ {
@ -575,7 +570,7 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_drop_multiple() { async fn test_drop_multiple() {
let mut registry = TaskRegistry::new(); let mut registry = test_registry();
let (_, registration) = registry.register(()); let (_, registration) = registry.register(());
{ {
@ -594,7 +589,7 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_terminate() { async fn test_terminate() {
let mut registry = TaskRegistry::new(); let mut registry = test_registry();
let (_, registration) = registry.register(()); let (_, registration) = registry.register(());
let task = tokio::spawn(pending().track(registration)); let task = tokio::spawn(pending().track(registration));
@ -611,7 +606,7 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_terminate_early() { async fn test_terminate_early() {
let mut registry = TaskRegistry::new(); let mut registry = test_registry();
let (tracker, registration) = registry.register(()); let (tracker, registration) = registry.register(());
tracker.cancel(); tracker.cancel();
@ -624,7 +619,7 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_terminate_multiple() { async fn test_terminate_multiple() {
let mut registry = TaskRegistry::new(); let mut registry = test_registry();
let (_, registration) = registry.register(()); let (_, registration) = registry.register(());
let task1 = tokio::spawn(pending().track(registration.clone())); let task1 = tokio::spawn(pending().track(registration.clone()));
@ -645,7 +640,7 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_reclaim() { async fn test_reclaim() {
let mut registry = TaskRegistry::new(); let mut registry = test_registry();
let (_, registration1) = registry.register(1); let (_, registration1) = registry.register(1);
let (_, registration2) = registry.register(2); let (_, registration2) = registry.register(2);
@ -744,7 +739,7 @@ mod tests {
// to prevent stalling the tokio executor // to prevent stalling the tokio executor
#[tokio::test(flavor = "multi_thread", worker_threads = 4)] #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_timing() { async fn test_timing() {
let mut registry = TaskRegistry::new(); let mut registry = test_registry();
let (tracker1, registration1) = registry.register(1); let (tracker1, registration1) = registry.register(1);
let (tracker2, registration2) = registry.register(2); let (tracker2, registration2) = registry.register(2);
let (tracker3, registration3) = registry.register(3); let (tracker3, registration3) = registry.register(3);
@ -819,7 +814,7 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_register_race() { async fn test_register_race() {
let mut registry = TaskRegistry::new(); let mut registry = test_registry();
let (_, registration) = registry.register(()); let (_, registration) = registry.register(());
let task1 = tokio::spawn(ready_ok().track(registration.clone())); let task1 = tokio::spawn(ready_ok().track(registration.clone()));
@ -842,7 +837,7 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_failure() { async fn test_failure() {
let mut registry = TaskRegistry::new(); let mut registry = test_registry();
let zero_clocks = |mut status: TaskStatus| { let zero_clocks = |mut status: TaskStatus| {
match &mut status { match &mut status {
TaskStatus::Creating => {} TaskStatus::Creating => {}
@ -970,7 +965,7 @@ mod tests {
use std::future::Future; use std::future::Future;
use std::task::Poll; use std::task::Poll;
let mut registry = TaskRegistry::new(); let mut registry = test_registry();
let (tracker, registration) = registry.register(()); let (tracker, registration) = registry.register(());
let (s1, r1) = oneshot::channel(); let (s1, r1) = oneshot::channel();
@ -1022,7 +1017,7 @@ mod tests {
use std::future::Future; use std::future::Future;
use std::task::Poll; use std::task::Poll;
let mut registry = TaskRegistry::new(); let mut registry = test_registry();
let (tracker, registration) = registry.register(()); let (tracker, registration) = registry.register(());
// This executor goop is necessary to get a future into // This executor goop is necessary to get a future into

View File

@ -93,16 +93,13 @@ impl<F: TryFuture> Future for TrackedFuture<F> {
#[pinned_drop] #[pinned_drop]
impl<F: TryFuture> PinnedDrop for TrackedFuture<F> { impl<F: TryFuture> PinnedDrop for TrackedFuture<F> {
fn drop(self: Pin<&mut Self>) { fn drop(self: Pin<&mut Self>) {
use std::convert::TryInto;
let state: &TrackerState = self.project().tracker; let state: &TrackerState = self.project().tracker;
let elapsed: i64 = chrono::Utc::now() let now = state.time_provider.now();
.signed_duration_since(state.start_time) let wall_nanos = now
.num_nanoseconds() .checked_duration_since(state.start_time)
.unwrap(); .unwrap_or_default()
.as_nanos() as usize;
let wall_nanos: usize = elapsed.try_into().unwrap_or_default();
state.wall_nanos.fetch_max(wall_nanos, Ordering::Relaxed); state.wall_nanos.fetch_max(wall_nanos, Ordering::Relaxed);

View File

@ -3,6 +3,8 @@ use hashbrown::hash_map::Entry;
use hashbrown::HashMap; use hashbrown::HashMap;
use observability_deps::tracing::info; use observability_deps::tracing::info;
use std::hash::Hash; use std::hash::Hash;
use std::sync::Arc;
use time::TimeProvider;
/// A wrapper around a TaskRegistry that automatically retains a history /// A wrapper around a TaskRegistry that automatically retains a history
#[derive(Debug)] #[derive(Debug)]
@ -18,10 +20,10 @@ impl<T: std::fmt::Debug> TaskRegistryWithHistory<T>
where where
T: Send + Sync, T: Send + Sync,
{ {
pub fn new(capacity: usize) -> Self { pub fn new(time_provider: Arc<dyn TimeProvider>, capacity: usize) -> Self {
Self { Self {
history: SizeLimitedHashMap::new(capacity), history: SizeLimitedHashMap::new(capacity),
registry: TaskRegistry::new(), registry: TaskRegistry::new(time_provider),
} }
} }
@ -192,7 +194,8 @@ mod tests {
assert_eq!(&collected, expected_ids); assert_eq!(&collected, expected_ids);
}; };
let mut archive = TaskRegistryWithHistory::new(4); let time_provider = Arc::new(time::SystemProvider::new());
let mut archive = TaskRegistryWithHistory::new(time_provider, 4);
for i in 0..=3 { for i in 0..=3 {
archive.register(i); archive.register(i);

View File

@ -1,6 +1,11 @@
use super::{TaskRegistration, TaskTracker};
use hashbrown::HashMap;
use std::str::FromStr; use std::str::FromStr;
use std::sync::Arc;
use hashbrown::HashMap;
use time::TimeProvider;
use super::{TaskRegistration, TaskTracker};
/// Every future registered with a `TaskRegistry` is assigned a unique /// Every future registered with a `TaskRegistry` is assigned a unique
/// `TaskId` /// `TaskId`
@ -32,26 +37,19 @@ where
{ {
next_id: usize, next_id: usize,
tasks: HashMap<TaskId, TaskTracker<T>>, tasks: HashMap<TaskId, TaskTracker<T>>,
} time_provider: Arc<dyn TimeProvider>,
impl<T> Default for TaskRegistry<T>
where
T: Send + Sync,
{
fn default() -> Self {
Self {
next_id: 0,
tasks: Default::default(),
}
}
} }
impl<T> TaskRegistry<T> impl<T> TaskRegistry<T>
where where
T: Send + Sync, T: Send + Sync,
{ {
pub fn new() -> Self { pub fn new(time_provider: Arc<dyn TimeProvider>) -> Self {
Default::default() Self {
next_id: 0,
tasks: Default::default(),
time_provider,
}
} }
/// Register a new tracker in the registry /// Register a new tracker in the registry
@ -59,7 +57,7 @@ where
let id = TaskId(self.next_id); let id = TaskId(self.next_id);
self.next_id += 1; self.next_id += 1;
let registration = TaskRegistration::new(); let registration = TaskRegistration::new(Arc::clone(&self.time_provider));
let tracker = TaskTracker::new(id, &registration, metadata); let tracker = TaskTracker::new(id, &registration, metadata);
self.tasks.insert(id, tracker.clone()); self.tasks.insert(id, tracker.clone());
@ -67,6 +65,11 @@ where
(tracker, registration) (tracker, registration)
} }
/// Returns a complete tracker
pub fn complete(&mut self, metadata: T) -> TaskTracker<T> {
self.register(metadata).0
}
/// Removes completed tasks from the registry and returns an iterator of /// Removes completed tasks from the registry and returns an iterator of
/// those removed /// those removed
pub fn reclaim(&mut self) -> impl Iterator<Item = TaskTracker<T>> + '_ { pub fn reclaim(&mut self) -> impl Iterator<Item = TaskTracker<T>> + '_ {

View File

@ -5,7 +5,6 @@ edition = "2018"
[dependencies] [dependencies]
async-trait = "0.1" async-trait = "0.1"
chrono = "0.4"
data_types = { path = "../data_types" } data_types = { path = "../data_types" }
dotenv = "0.15.0" dotenv = "0.15.0"
entry = { path = "../entry" } entry = { path = "../entry" }

View File

@ -93,7 +93,6 @@ pub mod test_utils {
}; };
use async_trait::async_trait; use async_trait::async_trait;
use chrono::{TimeZone, Utc};
use entry::{test_helpers::lp_to_entry, Entry}; use entry::{test_helpers::lp_to_entry, Entry};
use futures::{StreamExt, TryStreamExt}; use futures::{StreamExt, TryStreamExt};
use time::{Time, TimeProvider}; use time::{Time, TimeProvider};
@ -517,7 +516,7 @@ pub mod test_utils {
T: TestAdapter, T: TestAdapter,
{ {
// Note: Roundtrips are only guaranteed for millisecond-precision // Note: Roundtrips are only guaranteed for millisecond-precision
let t0 = Time::from_date_time(Utc.timestamp_millis(129)); let t0 = Time::from_timestamp_millis(129);
let time = Arc::new(time::MockProvider::new(t0)); let time = Arc::new(time::MockProvider::new(t0));
let context = adapter let context = adapter
.new_context_with_time( .new_context_with_time(

View File

@ -7,7 +7,6 @@ use std::{
}; };
use async_trait::async_trait; use async_trait::async_trait;
use chrono::{TimeZone, Utc};
use data_types::{ use data_types::{
database_rules::WriteBufferCreationConfig, sequence::Sequence, server_id::ServerId, database_rules::WriteBufferCreationConfig, sequence::Sequence, server_id::ServerId,
}; };
@ -269,7 +268,8 @@ impl WriteBufferReading for KafkaBufferConsumer {
let timestamp_millis = message.timestamp().to_millis().ok_or_else::<WriteBufferError, _>(|| { let timestamp_millis = message.timestamp().to_millis().ok_or_else::<WriteBufferError, _>(|| {
"The connected Kafka does not seem to support message timestamps (KIP-32). Please upgrade to >= 0.10.0.0".to_string().into() "The connected Kafka does not seem to support message timestamps (KIP-32). Please upgrade to >= 0.10.0.0".to_string().into()
})?; })?;
let timestamp = Utc.timestamp_millis_opt(timestamp_millis).single().ok_or_else::<WriteBufferError, _>(|| {
let timestamp = Time::from_timestamp_millis_opt(timestamp_millis).ok_or_else::<WriteBufferError, _>(|| {
format!("Cannot parse timestamp for milliseconds: {}", timestamp_millis).into() format!("Cannot parse timestamp for milliseconds: {}", timestamp_millis).into()
})?; })?;
@ -278,7 +278,7 @@ impl WriteBufferReading for KafkaBufferConsumer {
number: message.offset().try_into()?, number: message.offset().try_into()?,
}; };
Ok(SequencedEntry::new_from_sequence(sequence, Time::from_date_time(timestamp), entry)) Ok(SequencedEntry::new_from_sequence(sequence, timestamp, entry))
}) })
.boxed(); .boxed();