diff --git a/Cargo.lock b/Cargo.lock index 7d0107e218..6c26dabe46 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -809,7 +809,6 @@ dependencies = [ "influxdb_line_protocol", "num_cpus", "observability_deps", - "once_cell", "percent-encoding", "regex", "serde", @@ -1797,10 +1796,12 @@ version = "0.1.0" dependencies = [ "arrow", "arrow_util", + "chrono", "futures", "hashbrown 0.11.2", "indexmap", "itertools", + "parking_lot", "snafu", "tokio", ] @@ -4628,6 +4629,7 @@ dependencies = [ name = "tracker" version = "0.1.0" dependencies = [ + "chrono", "futures", "hashbrown 0.11.2", "lock_api", diff --git a/data_types/Cargo.toml b/data_types/Cargo.toml index f721424708..812a437e81 100644 --- a/data_types/Cargo.toml +++ b/data_types/Cargo.toml @@ -11,7 +11,6 @@ chrono = { version = "0.4", features = ["serde"] } influxdb_line_protocol = { path = "../influxdb_line_protocol" } num_cpus = "1.13.0" observability_deps = { path = "../observability_deps" } -once_cell = { version = "1.4.0", features = ["parking_lot"] } percent-encoding = "2.1.0" regex = "1.4" serde = { version = "1.0", features = ["rc", "derive"] } diff --git a/data_types/src/instant.rs b/data_types/src/instant.rs deleted file mode 100644 index 4f1569e303..0000000000 --- a/data_types/src/instant.rs +++ /dev/null @@ -1,69 +0,0 @@ -use chrono::{DateTime, Utc}; -use once_cell::sync::OnceCell; -use std::time::Instant; - -/// Stores an Instant and DateTime captured as close as possible together -static INSTANCE: OnceCell<(DateTime, Instant)> = OnceCell::new(); - -/// Provides a conversion from Instant to DateTime for display purposes -/// -/// It is an approximation as if the system clock changes, the returned DateTime will not be -/// the same as the DateTime that would have been recorded at the time the Instant was created. -/// -/// The conversion does, however, preserve the monotonic property of Instant, i.e. a larger -/// Instant will have a larger returned DateTime. -/// -/// This should ONLY be used for display purposes, the results should not be used to -/// drive logic, nor persisted -pub fn to_approximate_datetime(instant: Instant) -> DateTime { - let (ref_date, ref_instant) = *INSTANCE.get_or_init(|| (Utc::now(), Instant::now())); - - if ref_instant > instant { - ref_date - - chrono::Duration::from_std(ref_instant.duration_since(instant)) - .expect("date overflow") - } else { - ref_date - + chrono::Duration::from_std(instant.duration_since(ref_instant)) - .expect("date overflow") - } -} - -// *NOTE*: these tests currently fail on (at least) aarch64 architectures -// such as an Apple M1 machine. -// -// Possibly related to https://github.com/rust-lang/rust/issues/87906 but -// not clear at this point. -// -// Ignoring the tests here to get the suite green on aarch64. -#[cfg(not(target_arch = "aarch64"))] -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_to_datetime() { - // Seed global state - to_approximate_datetime(Instant::now()); - - let (ref_date, ref_instant) = *INSTANCE.get().unwrap(); - - assert_eq!( - to_approximate_datetime(ref_instant + std::time::Duration::from_nanos(78)), - ref_date + chrono::Duration::nanoseconds(78) - ); - - assert_eq!( - to_approximate_datetime(ref_instant - std::time::Duration::from_nanos(23)), - ref_date - chrono::Duration::nanoseconds(23) - ); - } - - #[test] - fn test_to_datetime_simple() { - let d = std::time::Duration::from_nanos(78); - let a = Instant::now(); - let b = a + d; - assert_eq!(b.duration_since(a), d); - } -} diff --git a/data_types/src/lib.rs b/data_types/src/lib.rs index f5858c9d56..beb03ce295 100644 --- a/data_types/src/lib.rs +++ b/data_types/src/lib.rs @@ -16,7 +16,6 @@ mod database_name; pub mod database_rules; pub mod detailed_database; pub mod error; -pub mod instant; pub mod job; pub mod names; pub mod partition_metadata; diff --git a/internal_types/Cargo.toml b/internal_types/Cargo.toml index 86e9e1a2db..5af229e986 100644 --- a/internal_types/Cargo.toml +++ b/internal_types/Cargo.toml @@ -8,9 +8,11 @@ readme = "README.md" [dependencies] arrow = { version = "5.5", features = ["prettyprint"] } +chrono = "0.4" hashbrown = "0.11" indexmap = "1.6" itertools = "0.10.1" +parking_lot = "0.11" snafu = "0.6" tokio = { version = "1.11", features = ["sync"] } diff --git a/internal_types/src/access.rs b/internal_types/src/access.rs index cb4744f6aa..dda62ff019 100644 --- a/internal_types/src/access.rs +++ b/internal_types/src/access.rs @@ -1,91 +1,75 @@ -use crate::atomic_instant::AtomicInstant; -use std::sync::atomic::{AtomicUsize, Ordering}; +use chrono::{DateTime, Utc}; +use parking_lot::RwLock; use std::sync::Arc; -use std::time::Instant; /// A struct that allows recording access by a query #[derive(Debug, Clone)] pub struct AccessRecorder { - state: Arc, + state: Arc>, } impl Default for AccessRecorder { fn default() -> Self { - Self::new(Instant::now()) + Self::new(Utc::now()) } } -#[derive(Debug)] -struct AccessRecorderInner { - count: AtomicUsize, - last_instant: AtomicInstant, -} - #[derive(Debug, Clone, Eq, PartialEq)] pub struct AccessMetrics { /// The number of accesses that have been recorded pub count: usize, - /// The instant of the last access or if none the - /// instant when the `AccessRecorder` was created - pub last_instant: Instant, + /// The time of the last access or if none the + /// time when the `AccessRecorder` was created + pub last_access: DateTime, } impl AccessMetrics { /// Returns the Instant of the last access if any - pub fn last_access(&self) -> Option { - (self.count > 0).then(|| self.last_instant) + pub fn last_access(&self) -> Option> { + (self.count > 0).then(|| self.last_access) } } impl AccessRecorder { - /// Creates a new AccessRecorder with the provided creation Instant - pub fn new(instant: Instant) -> Self { + /// Creates a new AccessRecorder with the provided creation DateTime + pub fn new(now: DateTime) -> Self { Self { - state: Arc::new(AccessRecorderInner { - count: AtomicUsize::new(0), - last_instant: AtomicInstant::new(instant), - }), + state: Arc::new(RwLock::new(AccessMetrics { + count: 0, + last_access: now, + })), } } - /// Records an access at the given instant - pub fn record_access(&self, instant: Instant) { - self.state - .last_instant - .fetch_max(instant, Ordering::Relaxed); - self.state.count.fetch_add(1, Ordering::Release); + /// Records an access at the given DateTime + pub fn record_access(&self, now: DateTime) { + let mut state = self.state.write(); + state.last_access = state.last_access.max(now); + state.count += 1; } - /// Records an access at the current instant + /// Records an access at the current time pub fn record_access_now(&self) { - self.record_access(Instant::now()) + self.record_access(Utc::now()) } /// Gets the access metrics pub fn get_metrics(&self) -> AccessMetrics { - // Acquire and Release ensures that if we observe the count from an access, - // we are guaranteed that the observed last_instant will be greater than - // or equal to the time of this access - let count = self.state.count.load(Ordering::Acquire); - let last_instant = self.state.last_instant.load(Ordering::Relaxed); - AccessMetrics { - count, - last_instant, - } + self.state.read().clone() } } #[cfg(test)] mod tests { use super::*; - use std::time::Duration; + use chrono::Duration; #[test] fn test_access() { - let t1 = Instant::now(); - let t2 = t1 + Duration::from_nanos(1); - let t3 = t1 + Duration::from_nanos(2); + let t1 = Utc::now(); + let t2 = t1 + Duration::nanoseconds(1); + let t3 = t1 + Duration::nanoseconds(2); let access_recorder = AccessRecorder::new(t1); @@ -93,7 +77,7 @@ mod tests { access_recorder.get_metrics(), AccessMetrics { count: 0, - last_instant: t1 + last_access: t1 } ); @@ -102,7 +86,7 @@ mod tests { access_recorder.get_metrics(), AccessMetrics { count: 1, - last_instant: t3 + last_access: t3 } ); @@ -111,7 +95,7 @@ mod tests { access_recorder.get_metrics(), AccessMetrics { count: 2, - last_instant: t3 + last_access: t3 } ); } diff --git a/internal_types/src/atomic_instant.rs b/internal_types/src/atomic_instant.rs deleted file mode 100644 index 4cf4e26d57..0000000000 --- a/internal_types/src/atomic_instant.rs +++ /dev/null @@ -1,113 +0,0 @@ -use std::sync::atomic::{AtomicI64, Ordering}; -use std::time::{Duration, Instant}; - -/// Provides the ability to perform atomic operations on an `Instant` -#[derive(Debug)] -pub struct AtomicInstant { - /// The start instant to measure relative to - start: Instant, - /// An offset in nanoseconds from the start instant - /// - /// We use an offset from `start` as an Instant is an opaque type that - /// cannot be mutated atomically - offset: AtomicI64, -} - -impl AtomicInstant { - /// Creates a new AtomicInstant with `Instant::now` as the current value - pub fn now() -> Self { - Self::new(Instant::now()) - } - - /// Creates a new AtomicInstant with the provided value - pub fn new(start: Instant) -> Self { - Self { - start, - offset: AtomicI64::new(0), - } - } - - fn offset(&self, instant: Instant) -> i64 { - use std::cmp::Ordering; - - match self.start.cmp(&instant) { - Ordering::Greater => -(self.start.duration_since(instant).as_nanos() as i64), - Ordering::Equal => 0, - Ordering::Less => instant.duration_since(self.start).as_nanos() as i64, - } - } - - fn instant(&self, offset: i64) -> Instant { - match offset > 0 { - true => self.start + Duration::from_nanos(offset as u64), - false => self.start - Duration::from_nanos((-offset) as u64), - } - } - - /// Gets the current Instant - pub fn load(&self, ordering: Ordering) -> Instant { - self.instant(self.offset.load(ordering)) - } - - /// Stores the given Instant - pub fn store(&self, instant: Instant, ordering: Ordering) { - self.offset.store(self.offset(instant), ordering); - } - - /// Sets the value to the maximum of the current value and the provided Instant - /// - /// Returns the previous value - pub fn fetch_max(&self, instant: Instant, ordering: Ordering) -> Instant { - let previous_offset = self.offset.fetch_max(self.offset(instant), ordering); - self.instant(previous_offset) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_atomic_instant() { - let start = Instant::now(); - let instant = AtomicInstant::new(start); - - instant.store(start - Duration::from_secs(5), Ordering::Relaxed); - assert_eq!( - instant.load(Ordering::Relaxed), - start - Duration::from_secs(5) - ); - - instant.store(start - Duration::from_secs(1), Ordering::Relaxed); - assert_eq!( - instant.load(Ordering::Relaxed), - start - Duration::from_secs(1) - ); - - instant.store(start + Duration::from_secs(1), Ordering::Relaxed); - assert_eq!( - instant.load(Ordering::Relaxed), - start + Duration::from_secs(1) - ); - - instant.store(start + Duration::from_nanos(1), Ordering::Relaxed); - assert_eq!( - instant.load(Ordering::Relaxed), - start + Duration::from_nanos(1) - ); - - let ret = instant.fetch_max(start + Duration::from_secs(2), Ordering::Relaxed); - assert_eq!(ret, start + Duration::from_nanos(1)); - assert_eq!( - instant.load(Ordering::Relaxed), - start + Duration::from_secs(2) - ); - - let ret = instant.fetch_max(start + Duration::from_secs(1), Ordering::Relaxed); - assert_eq!(ret, start + Duration::from_secs(2)); - assert_eq!( - instant.load(Ordering::Relaxed), - start + Duration::from_secs(2) - ); - } -} diff --git a/internal_types/src/lib.rs b/internal_types/src/lib.rs index 39b2160687..39a6b91472 100644 --- a/internal_types/src/lib.rs +++ b/internal_types/src/lib.rs @@ -7,7 +7,6 @@ )] pub mod access; -pub mod atomic_instant; pub mod freezable; pub mod once; pub mod schema; diff --git a/lifecycle/src/policy.rs b/lifecycle/src/policy.rs index 3f8c248ce3..a8e7bbf078 100644 --- a/lifecycle/src/policy.rs +++ b/lifecycle/src/policy.rs @@ -11,15 +11,11 @@ use data_types::{ use futures::future::BoxFuture; use internal_types::access::AccessMetrics; use observability_deps::tracing::{debug, info, trace, warn}; -use std::{ - convert::TryInto, - fmt::Debug, - time::{Duration, Instant}, -}; +use std::{convert::TryInto, fmt::Debug}; use tracker::TaskTracker; -/// Number of seconds to wait before retying a failed lifecycle action -pub const LIFECYCLE_ACTION_BACKOFF: Duration = Duration::from_secs(10); +/// Number of seconds to wait before retrying a failed lifecycle action +pub const LIFECYCLE_ACTION_BACKOFF_SECONDS: i64 = 10; /// A `LifecyclePolicy` is created with a `LifecycleDb` /// @@ -459,7 +455,7 @@ where &mut self, db_name: &DatabaseName<'static>, partition: &P, - now: Instant, + now: DateTime, ) { let partition = partition.read(); for chunk in LockablePartition::chunks(&partition) { @@ -467,9 +463,9 @@ where if let Some(lifecycle_action) = chunk.lifecycle_action() { if lifecycle_action.is_complete() && now - .checked_duration_since(lifecycle_action.start_instant()) - .map(|x| x >= LIFECYCLE_ACTION_BACKOFF) - .unwrap_or(false) + .signed_duration_since(lifecycle_action.start_time()) + .num_seconds() + >= LIFECYCLE_ACTION_BACKOFF_SECONDS { info!(%db_name, chunk=%chunk.addr(), action=?lifecycle_action.metadata(), "clearing failed lifecycle action"); chunk.upgrade().clear_lifecycle_action(); @@ -481,11 +477,7 @@ where /// The core policy logic /// /// Returns a future that resolves when this method should be called next - pub fn check_for_work( - &mut self, - now: DateTime, - now_instant: Instant, - ) -> BoxFuture<'_, ()> { + pub fn check_for_work(&mut self, now: DateTime) -> BoxFuture<'_, ()> { // Any time-consuming work should be spawned as tokio tasks and not // run directly within this loop @@ -496,7 +488,7 @@ where let partitions = self.db.partitions(); for partition in &partitions { - self.maybe_cleanup_failed(&db_name, partition, now_instant); + self.maybe_cleanup_failed(&db_name, partition, now); // Persistence cannot split chunks if they are currently being compacted // @@ -652,8 +644,8 @@ fn sort_free_candidates

(candidates: &mut Vec>) { // Order candidates with the same FreeAction by last access time std::cmp::Ordering::Equal => a .access_metrics - .last_instant - .cmp(&b.access_metrics.last_instant), + .last_access + .cmp(&b.access_metrics.last_access), o => o, }) } @@ -668,6 +660,7 @@ mod tests { use chrono::TimeZone; use data_types::chunk_metadata::{ChunkAddr, ChunkId, ChunkOrder, ChunkStorage}; use data_types::database_rules::MaxActiveCompactions::MaxActiveCompactions; + use std::time::Duration; use std::{ cmp::max, collections::BTreeMap, @@ -738,7 +731,7 @@ mod tests { min_timestamp: None, access_metrics: AccessMetrics { count: 0, - last_instant: Instant::now(), + last_access: Utc::now(), }, time_of_last_write: from_secs(time_of_last_write), lifecycle_action: None, @@ -1146,10 +1139,10 @@ mod tests { #[test] fn test_sort_free_candidates() { - let instant = Instant::now(); - let access_metrics = |secs: u64| AccessMetrics { + let now = Utc::now(); + let access_metrics = |secs: i64| AccessMetrics { count: 1, - last_instant: instant + Duration::from_secs(secs), + last_access: now + chrono::Duration::seconds(secs), }; let mut candidates = vec![ @@ -1216,7 +1209,7 @@ mod tests { let db = TestDb::new(rules, chunks); let mut lifecycle = LifecyclePolicy::new(&db); - lifecycle.check_for_work(from_secs(40), Instant::now()); + lifecycle.check_for_work(from_secs(40)); assert_eq!(*db.events.read(), vec![]); } @@ -1236,11 +1229,11 @@ mod tests { let mut lifecycle = LifecyclePolicy::new(&db); let partition = Arc::clone(&db.partitions.read()[0]); - lifecycle.check_for_work(from_secs(9), Instant::now()); + lifecycle.check_for_work(from_secs(9)); assert_eq!(*db.events.read(), vec![]); - lifecycle.check_for_work(from_secs(11), Instant::now()); + lifecycle.check_for_work(from_secs(11)); let chunks = partition.read().chunks.keys().cloned().collect::>(); // expect chunk 2 to have been compacted into a new chunk 3 assert_eq!( @@ -1252,14 +1245,14 @@ mod tests { vec![ChunkId::new(0), ChunkId::new(1), ChunkId::new(3)] ); - lifecycle.check_for_work(from_secs(12), Instant::now()); + lifecycle.check_for_work(from_secs(12)); assert_eq!( *db.events.read(), vec![MoverEvents::Compact(vec![ChunkId::new(2)])] ); // Should compact everything possible - lifecycle.check_for_work(from_secs(20), Instant::now()); + lifecycle.check_for_work(from_secs(20)); assert_eq!( *db.events.read(), vec![ @@ -1291,12 +1284,12 @@ mod tests { // of check_for_work had started a background move task lifecycle.trackers.push(tracker); - let future = lifecycle.check_for_work(from_secs(0), Instant::now()); + let future = lifecycle.check_for_work(from_secs(0)); tokio::time::timeout(Duration::from_millis(1), future) .await .expect_err("expected timeout"); - let future = lifecycle.check_for_work(from_secs(0), Instant::now()); + let future = lifecycle.check_for_work(from_secs(0)); std::mem::drop(registration); tokio::time::timeout(Duration::from_millis(1), future) .await @@ -1325,10 +1318,10 @@ mod tests { let db = TestDb::new(rules.clone(), chunks); let mut lifecycle = LifecyclePolicy::new(&db); - lifecycle.check_for_work(from_secs(10), Instant::now()); + lifecycle.check_for_work(from_secs(10)); assert_eq!(*db.events.read(), vec![]); - let instant = Instant::now(); + let now = Utc::now(); let chunks = vec![ // two "open" chunks => they must not be dropped (yet) @@ -1343,13 +1336,13 @@ mod tests { TestChunk::new(ChunkId::new(4), 0, ChunkStorage::ReadBufferAndObjectStore) .with_access_metrics(AccessMetrics { count: 1, - last_instant: instant, + last_access: now, }), // "written" chunk => can be unloaded TestChunk::new(ChunkId::new(5), 0, ChunkStorage::ReadBufferAndObjectStore) .with_access_metrics(AccessMetrics { count: 12, - last_instant: instant - Duration::from_secs(1), + last_access: now - chrono::Duration::seconds(1), }), ]; @@ -1357,7 +1350,7 @@ mod tests { let mut lifecycle = LifecyclePolicy::new(&db); // Should unload chunk 5 first as access time is smaller - lifecycle.check_for_work(from_secs(10), Instant::now()); + lifecycle.check_for_work(from_secs(10)); assert_eq!( *db.events.read(), vec![ @@ -1387,7 +1380,7 @@ mod tests { let db = TestDb::new(rules.clone(), chunks); let mut lifecycle = LifecyclePolicy::new(&db); - lifecycle.check_for_work(from_secs(10), Instant::now()); + lifecycle.check_for_work(from_secs(10)); assert_eq!(*db.events.read(), vec![]); let chunks = vec![ @@ -1406,7 +1399,7 @@ mod tests { let db = TestDb::new(rules, chunks); let mut lifecycle = LifecyclePolicy::new(&db); - lifecycle.check_for_work(from_secs(10), Instant::now()); + lifecycle.check_for_work(from_secs(10)); assert_eq!( *db.events.read(), vec![MoverEvents::Unload(ChunkId::new(4))] @@ -1430,7 +1423,7 @@ mod tests { let db = TestDb::new(rules, chunks); let mut lifecycle = LifecyclePolicy::new(&db); - lifecycle.check_for_work(from_secs(10), Instant::now()); + lifecycle.check_for_work(from_secs(10)); assert_eq!(*db.events.read(), vec![]); } @@ -1530,7 +1523,7 @@ mod tests { let db = TestDb::from_partitions(rules, partitions); let mut lifecycle = LifecyclePolicy::new(&db); - lifecycle.check_for_work(now, Instant::now()); + lifecycle.check_for_work(now); assert_eq!( *db.events.read(), vec![ @@ -1544,7 +1537,7 @@ mod tests { ); db.events.write().clear(); - lifecycle.check_for_work(now, Instant::now()); + lifecycle.check_for_work(now); assert_eq!( *db.events.read(), vec![MoverEvents::Compact(vec![ @@ -1584,7 +1577,7 @@ mod tests { let db = TestDb::from_partitions(rules, partitions); let mut lifecycle = LifecyclePolicy::new(&db); - lifecycle.check_for_work(now, Instant::now()); + lifecycle.check_for_work(now); assert_eq!( *db.events.read(), vec![ @@ -1596,7 +1589,7 @@ mod tests { db.events.write().clear(); // Compaction slots freed up, other partition can now compact. - lifecycle.check_for_work(now, Instant::now()); + lifecycle.check_for_work(now); assert_eq!( *db.events.read(), vec![MoverEvents::Compact(vec![ChunkId::new(200)]),], @@ -1614,7 +1607,6 @@ mod tests { ..Default::default() }; let now = from_secs(0); - let now_instant = Instant::now(); let partitions = vec![ // Insufficient rows and not old enough => don't persist but can compact @@ -1685,7 +1677,7 @@ mod tests { let db = TestDb::from_partitions(rules, partitions); let mut lifecycle = LifecyclePolicy::new(&db); - lifecycle.check_for_work(from_secs(0), now_instant); + lifecycle.check_for_work(from_secs(0)); assert_eq!( *db.events.read(), vec![ @@ -1710,7 +1702,6 @@ mod tests { ..Default::default() }; let now = Utc::now(); - let now_instant = Instant::now(); // This could occur if the in-memory contents of a partition are deleted, and // compaction causes the chunks to be removed. In such a scenario the persistence @@ -1724,7 +1715,7 @@ mod tests { let db = TestDb::from_partitions(rules, partitions); let mut lifecycle = LifecyclePolicy::new(&db); - lifecycle.check_for_work(now, now_instant); + lifecycle.check_for_work(now); assert_eq!(*db.events.read(), vec![MoverEvents::Persist(vec![]),]); } @@ -1739,7 +1730,6 @@ mod tests { ..Default::default() }; let now = Utc::now(); - let now_instant = Instant::now(); let partitions = vec![ // Sufficient rows => could persist but should be suppressed @@ -1755,13 +1745,13 @@ mod tests { let db = TestDb::from_partitions(rules, partitions); let mut lifecycle = LifecyclePolicy::new_suppress_persistence(&db); - lifecycle.check_for_work(now, now_instant); + lifecycle.check_for_work(now); assert_eq!( *db.events.read(), vec![MoverEvents::Compact(vec![ChunkId::new(2), ChunkId::new(3)]),] ); - lifecycle.check_for_work(now, now_instant); + lifecycle.check_for_work(now); assert_eq!( *db.events.read(), vec![MoverEvents::Compact(vec![ChunkId::new(2), ChunkId::new(3)]),] @@ -1769,7 +1759,7 @@ mod tests { lifecycle.unsuppress_persistence(); - lifecycle.check_for_work(now, now_instant); + lifecycle.check_for_work(now); assert_eq!( *db.events.read(), vec![ @@ -1794,7 +1784,7 @@ mod tests { let db = TestDb::new(rules, chunks); let mut lifecycle = LifecyclePolicy::new(&db); - lifecycle.check_for_work(from_secs(80), Instant::now()); + lifecycle.check_for_work(from_secs(80)); assert_eq!( *db.events.read(), vec![MoverEvents::Compact(vec![ChunkId::new(0)])] @@ -1816,7 +1806,7 @@ mod tests { let db = TestDb::new(rules, chunks); let mut lifecycle = LifecyclePolicy::new(&db); - lifecycle.check_for_work(from_secs(80), Instant::now()); + lifecycle.check_for_work(from_secs(80)); assert_eq!( *db.events.read(), vec![MoverEvents::Compact(vec![ChunkId::new(0)])] @@ -1841,13 +1831,12 @@ mod tests { chunk.write().lifecycle_action = Some(tracker.clone()); // Shouldn't do anything - lifecycle.check_for_work(from_secs(0), tracker.start_instant()); + lifecycle.check_for_work(tracker.start_time()); assert!(chunk.read().lifecycle_action().is_some()); // Shouldn't do anything as job hasn't finished lifecycle.check_for_work( - from_secs(0), - tracker.start_instant() + LIFECYCLE_ACTION_BACKOFF, + tracker.start_time() + chrono::Duration::seconds(LIFECYCLE_ACTION_BACKOFF_SECONDS), ); assert!(chunk.read().lifecycle_action().is_some()); @@ -1855,13 +1844,12 @@ mod tests { std::mem::drop(r0); // Shouldn't do anything as insufficient time passed - lifecycle.check_for_work(from_secs(0), tracker.start_instant()); + lifecycle.check_for_work(tracker.start_time()); assert!(chunk.read().lifecycle_action().is_some()); // Should clear job lifecycle.check_for_work( - from_secs(0), - tracker.start_instant() + LIFECYCLE_ACTION_BACKOFF, + tracker.start_time() + chrono::Duration::seconds(LIFECYCLE_ACTION_BACKOFF_SECONDS), ); assert!(chunk.read().lifecycle_action().is_none()); } diff --git a/server/src/db.rs b/server/src/db.rs index d385e2cebf..8ad9692d67 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -9,7 +9,7 @@ use std::{ atomic::{AtomicUsize, Ordering}, Arc, }, - time::{Duration, Instant}, + time::Duration, }; use async_trait::async_trait; @@ -831,7 +831,7 @@ impl Db { .as_mut() .expect("lifecycle policy should be initialized"); - policy.check_for_work(self.utc_now(), Instant::now()).await + policy.check_for_work(self.utc_now()).await } }; diff --git a/server/src/db/catalog/chunk.rs b/server/src/db/catalog/chunk.rs index c819ae46e6..0820fafbc3 100644 --- a/server/src/db/catalog/chunk.rs +++ b/server/src/db/catalog/chunk.rs @@ -8,7 +8,6 @@ use data_types::{ ChunkAddr, ChunkColumnSummary, ChunkId, ChunkLifecycleAction, ChunkOrder, ChunkStorage, ChunkSummary, DetailedChunkSummary, }, - instant::to_approximate_datetime, partition_metadata::TableSummary, write_summary::TimestampSummary, }; @@ -576,11 +575,7 @@ impl CatalogChunk { .as_ref() .map(|tracker| *tracker.metadata()); - let time_of_last_access = self - .access_recorder - .get_metrics() - .last_access() - .map(to_approximate_datetime); + let time_of_last_access = self.access_recorder.get_metrics().last_access(); ChunkSummary { partition_key: Arc::clone(&self.addr.partition_key), diff --git a/server/src/db/chunk.rs b/server/src/db/chunk.rs index f4700fb4b0..6daeca7b59 100644 --- a/server/src/db/chunk.rs +++ b/server/src/db/chunk.rs @@ -596,13 +596,13 @@ mod tests { // Query should count as an access assert_eq!(t2.count + 1, t3.count); - assert!(t2.last_instant < t3.last_instant); + assert!(t2.last_access < t3.last_access); // If column names successful should record access match column_names { true => { assert_eq!(t3.count + 1, t4.count); - assert!(t3.last_instant < t4.last_instant); + assert!(t3.last_access < t4.last_access); } false => { assert_eq!(t3, t4); @@ -613,7 +613,7 @@ mod tests { match column_values { true => { assert_eq!(t4.count + 1, t5.count); - assert!(t4.last_instant < t5.last_instant); + assert!(t4.last_access < t5.last_access); } false => { assert_eq!(t4, t5); diff --git a/server/src/db/system_tables/operations.rs b/server/src/db/system_tables/operations.rs index 71eee011f6..7ca8c36f36 100644 --- a/server/src/db/system_tables/operations.rs +++ b/server/src/db/system_tables/operations.rs @@ -4,7 +4,6 @@ use arrow::array::{ArrayRef, StringArray, Time64NanosecondArray, TimestampNanose use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit}; use arrow::error::Result; use arrow::record_batch::RecordBatch; -use data_types::instant::to_approximate_datetime; use itertools::Itertools; use data_types::error::ErrorLogger; @@ -93,7 +92,7 @@ fn from_task_trackers( .collect::(); let start_time = jobs .iter() - .map(|job| Some(to_approximate_datetime(job.start_instant()).timestamp_nanos())) + .map(|job| Some(job.start_time().timestamp_nanos())) .collect::(); let cpu_time_used = jobs .iter() diff --git a/tracker/Cargo.toml b/tracker/Cargo.toml index afbc6298fb..85198b95cd 100644 --- a/tracker/Cargo.toml +++ b/tracker/Cargo.toml @@ -7,6 +7,7 @@ description = "Utilities for tracking resource utilisation within IOx" [dependencies] +chrono = "0.4" futures = "0.3" hashbrown = "0.11" lock_api = "0.4.4" diff --git a/tracker/src/task.rs b/tracker/src/task.rs index 29a0ebd160..805ac59c2d 100644 --- a/tracker/src/task.rs +++ b/tracker/src/task.rs @@ -80,12 +80,10 @@ //! etc... between threads as any such functionality must perform the necessary //! synchronisation to be well-formed. -use std::{ - sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, - }, - time::Instant, +use chrono::{DateTime, Utc}; +use std::sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, }; use tokio_util::sync::CancellationToken; @@ -102,7 +100,7 @@ mod registry; /// The state shared between all sibling tasks #[derive(Debug)] struct TrackerState { - start_instant: Instant, + start_time: DateTime, cancel_token: CancellationToken, cpu_nanos: AtomicUsize, wall_nanos: AtomicUsize, @@ -376,8 +374,8 @@ where } /// Returns the instant the tracker was created - pub fn start_instant(&self) -> Instant { - self.state.start_instant + pub fn start_time(&self) -> DateTime { + self.state.start_time } /// Returns if this tracker has been cancelled @@ -433,7 +431,7 @@ impl Clone for TaskRegistration { impl Default for TaskRegistration { fn default() -> Self { let state = Arc::new(TrackerState { - start_instant: Instant::now(), + start_time: Utc::now(), cpu_nanos: AtomicUsize::new(0), wall_nanos: AtomicUsize::new(0), cancel_token: CancellationToken::new(), diff --git a/tracker/src/task/future.rs b/tracker/src/task/future.rs index 05f7f3d779..f564b2322f 100644 --- a/tracker/src/task/future.rs +++ b/tracker/src/task/future.rs @@ -93,9 +93,16 @@ impl Future for TrackedFuture { #[pinned_drop] impl PinnedDrop for TrackedFuture { fn drop(self: Pin<&mut Self>) { + use std::convert::TryInto; + let state: &TrackerState = self.project().tracker; - let wall_nanos = state.start_instant.elapsed().as_nanos() as usize; + let elapsed: i64 = chrono::Utc::now() + .signed_duration_since(state.start_time) + .num_nanoseconds() + .unwrap(); + + let wall_nanos: usize = elapsed.try_into().unwrap_or_default(); state.wall_nanos.fetch_max(wall_nanos, Ordering::Relaxed);