feat: remove remaining usages of Instant (#2722) (#2749)

* feat: remove remaining usages of Instant (#2722)

* chore: review feedback

Co-authored-by: Andrew Lamb <alamb@influxdata.com>

Co-authored-by: Andrew Lamb <alamb@influxdata.com>
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Raphael Taylor-Davies 2021-10-06 17:44:02 +01:00 committed by GitHub
parent 772c7621db
commit 39157828b1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 105 additions and 314 deletions

4
Cargo.lock generated
View File

@ -809,7 +809,6 @@ dependencies = [
"influxdb_line_protocol",
"num_cpus",
"observability_deps",
"once_cell",
"percent-encoding",
"regex",
"serde",
@ -1797,10 +1796,12 @@ version = "0.1.0"
dependencies = [
"arrow",
"arrow_util",
"chrono",
"futures",
"hashbrown 0.11.2",
"indexmap",
"itertools",
"parking_lot",
"snafu",
"tokio",
]
@ -4628,6 +4629,7 @@ dependencies = [
name = "tracker"
version = "0.1.0"
dependencies = [
"chrono",
"futures",
"hashbrown 0.11.2",
"lock_api",

View File

@ -11,7 +11,6 @@ chrono = { version = "0.4", features = ["serde"] }
influxdb_line_protocol = { path = "../influxdb_line_protocol" }
num_cpus = "1.13.0"
observability_deps = { path = "../observability_deps" }
once_cell = { version = "1.4.0", features = ["parking_lot"] }
percent-encoding = "2.1.0"
regex = "1.4"
serde = { version = "1.0", features = ["rc", "derive"] }

View File

@ -1,69 +0,0 @@
use chrono::{DateTime, Utc};
use once_cell::sync::OnceCell;
use std::time::Instant;
/// Stores an Instant and DateTime<Utc> captured as close as possible together
static INSTANCE: OnceCell<(DateTime<Utc>, Instant)> = OnceCell::new();
/// Provides a conversion from Instant to DateTime<Utc> for display purposes
///
/// It is an approximation as if the system clock changes, the returned DateTime will not be
/// the same as the DateTime that would have been recorded at the time the Instant was created.
///
/// The conversion does, however, preserve the monotonic property of Instant, i.e. a larger
/// Instant will have a larger returned DateTime.
///
/// This should ONLY be used for display purposes, the results should not be used to
/// drive logic, nor persisted
pub fn to_approximate_datetime(instant: Instant) -> DateTime<Utc> {
let (ref_date, ref_instant) = *INSTANCE.get_or_init(|| (Utc::now(), Instant::now()));
if ref_instant > instant {
ref_date
- chrono::Duration::from_std(ref_instant.duration_since(instant))
.expect("date overflow")
} else {
ref_date
+ chrono::Duration::from_std(instant.duration_since(ref_instant))
.expect("date overflow")
}
}
// *NOTE*: these tests currently fail on (at least) aarch64 architectures
// such as an Apple M1 machine.
//
// Possibly related to https://github.com/rust-lang/rust/issues/87906 but
// not clear at this point.
//
// Ignoring the tests here to get the suite green on aarch64.
#[cfg(not(target_arch = "aarch64"))]
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_to_datetime() {
// Seed global state
to_approximate_datetime(Instant::now());
let (ref_date, ref_instant) = *INSTANCE.get().unwrap();
assert_eq!(
to_approximate_datetime(ref_instant + std::time::Duration::from_nanos(78)),
ref_date + chrono::Duration::nanoseconds(78)
);
assert_eq!(
to_approximate_datetime(ref_instant - std::time::Duration::from_nanos(23)),
ref_date - chrono::Duration::nanoseconds(23)
);
}
#[test]
fn test_to_datetime_simple() {
let d = std::time::Duration::from_nanos(78);
let a = Instant::now();
let b = a + d;
assert_eq!(b.duration_since(a), d);
}
}

View File

@ -16,7 +16,6 @@ mod database_name;
pub mod database_rules;
pub mod detailed_database;
pub mod error;
pub mod instant;
pub mod job;
pub mod names;
pub mod partition_metadata;

View File

@ -8,9 +8,11 @@ readme = "README.md"
[dependencies]
arrow = { version = "5.5", features = ["prettyprint"] }
chrono = "0.4"
hashbrown = "0.11"
indexmap = "1.6"
itertools = "0.10.1"
parking_lot = "0.11"
snafu = "0.6"
tokio = { version = "1.11", features = ["sync"] }

View File

@ -1,91 +1,75 @@
use crate::atomic_instant::AtomicInstant;
use std::sync::atomic::{AtomicUsize, Ordering};
use chrono::{DateTime, Utc};
use parking_lot::RwLock;
use std::sync::Arc;
use std::time::Instant;
/// A struct that allows recording access by a query
#[derive(Debug, Clone)]
pub struct AccessRecorder {
state: Arc<AccessRecorderInner>,
state: Arc<RwLock<AccessMetrics>>,
}
impl Default for AccessRecorder {
fn default() -> Self {
Self::new(Instant::now())
Self::new(Utc::now())
}
}
#[derive(Debug)]
struct AccessRecorderInner {
count: AtomicUsize,
last_instant: AtomicInstant,
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct AccessMetrics {
/// The number of accesses that have been recorded
pub count: usize,
/// The instant of the last access or if none the
/// instant when the `AccessRecorder` was created
pub last_instant: Instant,
/// The time of the last access or if none the
/// time when the `AccessRecorder` was created
pub last_access: DateTime<Utc>,
}
impl AccessMetrics {
/// Returns the Instant of the last access if any
pub fn last_access(&self) -> Option<Instant> {
(self.count > 0).then(|| self.last_instant)
pub fn last_access(&self) -> Option<DateTime<Utc>> {
(self.count > 0).then(|| self.last_access)
}
}
impl AccessRecorder {
/// Creates a new AccessRecorder with the provided creation Instant
pub fn new(instant: Instant) -> Self {
/// Creates a new AccessRecorder with the provided creation DateTime
pub fn new(now: DateTime<Utc>) -> Self {
Self {
state: Arc::new(AccessRecorderInner {
count: AtomicUsize::new(0),
last_instant: AtomicInstant::new(instant),
}),
state: Arc::new(RwLock::new(AccessMetrics {
count: 0,
last_access: now,
})),
}
}
/// Records an access at the given instant
pub fn record_access(&self, instant: Instant) {
self.state
.last_instant
.fetch_max(instant, Ordering::Relaxed);
self.state.count.fetch_add(1, Ordering::Release);
/// Records an access at the given DateTime
pub fn record_access(&self, now: DateTime<Utc>) {
let mut state = self.state.write();
state.last_access = state.last_access.max(now);
state.count += 1;
}
/// Records an access at the current instant
/// Records an access at the current time
pub fn record_access_now(&self) {
self.record_access(Instant::now())
self.record_access(Utc::now())
}
/// Gets the access metrics
pub fn get_metrics(&self) -> AccessMetrics {
// Acquire and Release ensures that if we observe the count from an access,
// we are guaranteed that the observed last_instant will be greater than
// or equal to the time of this access
let count = self.state.count.load(Ordering::Acquire);
let last_instant = self.state.last_instant.load(Ordering::Relaxed);
AccessMetrics {
count,
last_instant,
}
self.state.read().clone()
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;
use chrono::Duration;
#[test]
fn test_access() {
let t1 = Instant::now();
let t2 = t1 + Duration::from_nanos(1);
let t3 = t1 + Duration::from_nanos(2);
let t1 = Utc::now();
let t2 = t1 + Duration::nanoseconds(1);
let t3 = t1 + Duration::nanoseconds(2);
let access_recorder = AccessRecorder::new(t1);
@ -93,7 +77,7 @@ mod tests {
access_recorder.get_metrics(),
AccessMetrics {
count: 0,
last_instant: t1
last_access: t1
}
);
@ -102,7 +86,7 @@ mod tests {
access_recorder.get_metrics(),
AccessMetrics {
count: 1,
last_instant: t3
last_access: t3
}
);
@ -111,7 +95,7 @@ mod tests {
access_recorder.get_metrics(),
AccessMetrics {
count: 2,
last_instant: t3
last_access: t3
}
);
}

View File

@ -1,113 +0,0 @@
use std::sync::atomic::{AtomicI64, Ordering};
use std::time::{Duration, Instant};
/// Provides the ability to perform atomic operations on an `Instant`
#[derive(Debug)]
pub struct AtomicInstant {
/// The start instant to measure relative to
start: Instant,
/// An offset in nanoseconds from the start instant
///
/// We use an offset from `start` as an Instant is an opaque type that
/// cannot be mutated atomically
offset: AtomicI64,
}
impl AtomicInstant {
/// Creates a new AtomicInstant with `Instant::now` as the current value
pub fn now() -> Self {
Self::new(Instant::now())
}
/// Creates a new AtomicInstant with the provided value
pub fn new(start: Instant) -> Self {
Self {
start,
offset: AtomicI64::new(0),
}
}
fn offset(&self, instant: Instant) -> i64 {
use std::cmp::Ordering;
match self.start.cmp(&instant) {
Ordering::Greater => -(self.start.duration_since(instant).as_nanos() as i64),
Ordering::Equal => 0,
Ordering::Less => instant.duration_since(self.start).as_nanos() as i64,
}
}
fn instant(&self, offset: i64) -> Instant {
match offset > 0 {
true => self.start + Duration::from_nanos(offset as u64),
false => self.start - Duration::from_nanos((-offset) as u64),
}
}
/// Gets the current Instant
pub fn load(&self, ordering: Ordering) -> Instant {
self.instant(self.offset.load(ordering))
}
/// Stores the given Instant
pub fn store(&self, instant: Instant, ordering: Ordering) {
self.offset.store(self.offset(instant), ordering);
}
/// Sets the value to the maximum of the current value and the provided Instant
///
/// Returns the previous value
pub fn fetch_max(&self, instant: Instant, ordering: Ordering) -> Instant {
let previous_offset = self.offset.fetch_max(self.offset(instant), ordering);
self.instant(previous_offset)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_atomic_instant() {
let start = Instant::now();
let instant = AtomicInstant::new(start);
instant.store(start - Duration::from_secs(5), Ordering::Relaxed);
assert_eq!(
instant.load(Ordering::Relaxed),
start - Duration::from_secs(5)
);
instant.store(start - Duration::from_secs(1), Ordering::Relaxed);
assert_eq!(
instant.load(Ordering::Relaxed),
start - Duration::from_secs(1)
);
instant.store(start + Duration::from_secs(1), Ordering::Relaxed);
assert_eq!(
instant.load(Ordering::Relaxed),
start + Duration::from_secs(1)
);
instant.store(start + Duration::from_nanos(1), Ordering::Relaxed);
assert_eq!(
instant.load(Ordering::Relaxed),
start + Duration::from_nanos(1)
);
let ret = instant.fetch_max(start + Duration::from_secs(2), Ordering::Relaxed);
assert_eq!(ret, start + Duration::from_nanos(1));
assert_eq!(
instant.load(Ordering::Relaxed),
start + Duration::from_secs(2)
);
let ret = instant.fetch_max(start + Duration::from_secs(1), Ordering::Relaxed);
assert_eq!(ret, start + Duration::from_secs(2));
assert_eq!(
instant.load(Ordering::Relaxed),
start + Duration::from_secs(2)
);
}
}

View File

@ -7,7 +7,6 @@
)]
pub mod access;
pub mod atomic_instant;
pub mod freezable;
pub mod once;
pub mod schema;

View File

@ -11,15 +11,11 @@ use data_types::{
use futures::future::BoxFuture;
use internal_types::access::AccessMetrics;
use observability_deps::tracing::{debug, info, trace, warn};
use std::{
convert::TryInto,
fmt::Debug,
time::{Duration, Instant},
};
use std::{convert::TryInto, fmt::Debug};
use tracker::TaskTracker;
/// Number of seconds to wait before retying a failed lifecycle action
pub const LIFECYCLE_ACTION_BACKOFF: Duration = Duration::from_secs(10);
/// Number of seconds to wait before retrying a failed lifecycle action
pub const LIFECYCLE_ACTION_BACKOFF_SECONDS: i64 = 10;
/// A `LifecyclePolicy` is created with a `LifecycleDb`
///
@ -459,7 +455,7 @@ where
&mut self,
db_name: &DatabaseName<'static>,
partition: &P,
now: Instant,
now: DateTime<Utc>,
) {
let partition = partition.read();
for chunk in LockablePartition::chunks(&partition) {
@ -467,9 +463,9 @@ where
if let Some(lifecycle_action) = chunk.lifecycle_action() {
if lifecycle_action.is_complete()
&& now
.checked_duration_since(lifecycle_action.start_instant())
.map(|x| x >= LIFECYCLE_ACTION_BACKOFF)
.unwrap_or(false)
.signed_duration_since(lifecycle_action.start_time())
.num_seconds()
>= LIFECYCLE_ACTION_BACKOFF_SECONDS
{
info!(%db_name, chunk=%chunk.addr(), action=?lifecycle_action.metadata(), "clearing failed lifecycle action");
chunk.upgrade().clear_lifecycle_action();
@ -481,11 +477,7 @@ where
/// The core policy logic
///
/// Returns a future that resolves when this method should be called next
pub fn check_for_work(
&mut self,
now: DateTime<Utc>,
now_instant: Instant,
) -> BoxFuture<'_, ()> {
pub fn check_for_work(&mut self, now: DateTime<Utc>) -> BoxFuture<'_, ()> {
// Any time-consuming work should be spawned as tokio tasks and not
// run directly within this loop
@ -496,7 +488,7 @@ where
let partitions = self.db.partitions();
for partition in &partitions {
self.maybe_cleanup_failed(&db_name, partition, now_instant);
self.maybe_cleanup_failed(&db_name, partition, now);
// Persistence cannot split chunks if they are currently being compacted
//
@ -652,8 +644,8 @@ fn sort_free_candidates<P>(candidates: &mut Vec<FreeCandidate<'_, P>>) {
// Order candidates with the same FreeAction by last access time
std::cmp::Ordering::Equal => a
.access_metrics
.last_instant
.cmp(&b.access_metrics.last_instant),
.last_access
.cmp(&b.access_metrics.last_access),
o => o,
})
}
@ -668,6 +660,7 @@ mod tests {
use chrono::TimeZone;
use data_types::chunk_metadata::{ChunkAddr, ChunkId, ChunkOrder, ChunkStorage};
use data_types::database_rules::MaxActiveCompactions::MaxActiveCompactions;
use std::time::Duration;
use std::{
cmp::max,
collections::BTreeMap,
@ -738,7 +731,7 @@ mod tests {
min_timestamp: None,
access_metrics: AccessMetrics {
count: 0,
last_instant: Instant::now(),
last_access: Utc::now(),
},
time_of_last_write: from_secs(time_of_last_write),
lifecycle_action: None,
@ -1146,10 +1139,10 @@ mod tests {
#[test]
fn test_sort_free_candidates() {
let instant = Instant::now();
let access_metrics = |secs: u64| AccessMetrics {
let now = Utc::now();
let access_metrics = |secs: i64| AccessMetrics {
count: 1,
last_instant: instant + Duration::from_secs(secs),
last_access: now + chrono::Duration::seconds(secs),
};
let mut candidates = vec![
@ -1216,7 +1209,7 @@ mod tests {
let db = TestDb::new(rules, chunks);
let mut lifecycle = LifecyclePolicy::new(&db);
lifecycle.check_for_work(from_secs(40), Instant::now());
lifecycle.check_for_work(from_secs(40));
assert_eq!(*db.events.read(), vec![]);
}
@ -1236,11 +1229,11 @@ mod tests {
let mut lifecycle = LifecyclePolicy::new(&db);
let partition = Arc::clone(&db.partitions.read()[0]);
lifecycle.check_for_work(from_secs(9), Instant::now());
lifecycle.check_for_work(from_secs(9));
assert_eq!(*db.events.read(), vec![]);
lifecycle.check_for_work(from_secs(11), Instant::now());
lifecycle.check_for_work(from_secs(11));
let chunks = partition.read().chunks.keys().cloned().collect::<Vec<_>>();
// expect chunk 2 to have been compacted into a new chunk 3
assert_eq!(
@ -1252,14 +1245,14 @@ mod tests {
vec![ChunkId::new(0), ChunkId::new(1), ChunkId::new(3)]
);
lifecycle.check_for_work(from_secs(12), Instant::now());
lifecycle.check_for_work(from_secs(12));
assert_eq!(
*db.events.read(),
vec![MoverEvents::Compact(vec![ChunkId::new(2)])]
);
// Should compact everything possible
lifecycle.check_for_work(from_secs(20), Instant::now());
lifecycle.check_for_work(from_secs(20));
assert_eq!(
*db.events.read(),
vec![
@ -1291,12 +1284,12 @@ mod tests {
// of check_for_work had started a background move task
lifecycle.trackers.push(tracker);
let future = lifecycle.check_for_work(from_secs(0), Instant::now());
let future = lifecycle.check_for_work(from_secs(0));
tokio::time::timeout(Duration::from_millis(1), future)
.await
.expect_err("expected timeout");
let future = lifecycle.check_for_work(from_secs(0), Instant::now());
let future = lifecycle.check_for_work(from_secs(0));
std::mem::drop(registration);
tokio::time::timeout(Duration::from_millis(1), future)
.await
@ -1325,10 +1318,10 @@ mod tests {
let db = TestDb::new(rules.clone(), chunks);
let mut lifecycle = LifecyclePolicy::new(&db);
lifecycle.check_for_work(from_secs(10), Instant::now());
lifecycle.check_for_work(from_secs(10));
assert_eq!(*db.events.read(), vec![]);
let instant = Instant::now();
let now = Utc::now();
let chunks = vec![
// two "open" chunks => they must not be dropped (yet)
@ -1343,13 +1336,13 @@ mod tests {
TestChunk::new(ChunkId::new(4), 0, ChunkStorage::ReadBufferAndObjectStore)
.with_access_metrics(AccessMetrics {
count: 1,
last_instant: instant,
last_access: now,
}),
// "written" chunk => can be unloaded
TestChunk::new(ChunkId::new(5), 0, ChunkStorage::ReadBufferAndObjectStore)
.with_access_metrics(AccessMetrics {
count: 12,
last_instant: instant - Duration::from_secs(1),
last_access: now - chrono::Duration::seconds(1),
}),
];
@ -1357,7 +1350,7 @@ mod tests {
let mut lifecycle = LifecyclePolicy::new(&db);
// Should unload chunk 5 first as access time is smaller
lifecycle.check_for_work(from_secs(10), Instant::now());
lifecycle.check_for_work(from_secs(10));
assert_eq!(
*db.events.read(),
vec![
@ -1387,7 +1380,7 @@ mod tests {
let db = TestDb::new(rules.clone(), chunks);
let mut lifecycle = LifecyclePolicy::new(&db);
lifecycle.check_for_work(from_secs(10), Instant::now());
lifecycle.check_for_work(from_secs(10));
assert_eq!(*db.events.read(), vec![]);
let chunks = vec![
@ -1406,7 +1399,7 @@ mod tests {
let db = TestDb::new(rules, chunks);
let mut lifecycle = LifecyclePolicy::new(&db);
lifecycle.check_for_work(from_secs(10), Instant::now());
lifecycle.check_for_work(from_secs(10));
assert_eq!(
*db.events.read(),
vec![MoverEvents::Unload(ChunkId::new(4))]
@ -1430,7 +1423,7 @@ mod tests {
let db = TestDb::new(rules, chunks);
let mut lifecycle = LifecyclePolicy::new(&db);
lifecycle.check_for_work(from_secs(10), Instant::now());
lifecycle.check_for_work(from_secs(10));
assert_eq!(*db.events.read(), vec![]);
}
@ -1530,7 +1523,7 @@ mod tests {
let db = TestDb::from_partitions(rules, partitions);
let mut lifecycle = LifecyclePolicy::new(&db);
lifecycle.check_for_work(now, Instant::now());
lifecycle.check_for_work(now);
assert_eq!(
*db.events.read(),
vec![
@ -1544,7 +1537,7 @@ mod tests {
);
db.events.write().clear();
lifecycle.check_for_work(now, Instant::now());
lifecycle.check_for_work(now);
assert_eq!(
*db.events.read(),
vec![MoverEvents::Compact(vec![
@ -1584,7 +1577,7 @@ mod tests {
let db = TestDb::from_partitions(rules, partitions);
let mut lifecycle = LifecyclePolicy::new(&db);
lifecycle.check_for_work(now, Instant::now());
lifecycle.check_for_work(now);
assert_eq!(
*db.events.read(),
vec![
@ -1596,7 +1589,7 @@ mod tests {
db.events.write().clear();
// Compaction slots freed up, other partition can now compact.
lifecycle.check_for_work(now, Instant::now());
lifecycle.check_for_work(now);
assert_eq!(
*db.events.read(),
vec![MoverEvents::Compact(vec![ChunkId::new(200)]),],
@ -1614,7 +1607,6 @@ mod tests {
..Default::default()
};
let now = from_secs(0);
let now_instant = Instant::now();
let partitions = vec![
// Insufficient rows and not old enough => don't persist but can compact
@ -1685,7 +1677,7 @@ mod tests {
let db = TestDb::from_partitions(rules, partitions);
let mut lifecycle = LifecyclePolicy::new(&db);
lifecycle.check_for_work(from_secs(0), now_instant);
lifecycle.check_for_work(from_secs(0));
assert_eq!(
*db.events.read(),
vec![
@ -1710,7 +1702,6 @@ mod tests {
..Default::default()
};
let now = Utc::now();
let now_instant = Instant::now();
// This could occur if the in-memory contents of a partition are deleted, and
// compaction causes the chunks to be removed. In such a scenario the persistence
@ -1724,7 +1715,7 @@ mod tests {
let db = TestDb::from_partitions(rules, partitions);
let mut lifecycle = LifecyclePolicy::new(&db);
lifecycle.check_for_work(now, now_instant);
lifecycle.check_for_work(now);
assert_eq!(*db.events.read(), vec![MoverEvents::Persist(vec![]),]);
}
@ -1739,7 +1730,6 @@ mod tests {
..Default::default()
};
let now = Utc::now();
let now_instant = Instant::now();
let partitions = vec![
// Sufficient rows => could persist but should be suppressed
@ -1755,13 +1745,13 @@ mod tests {
let db = TestDb::from_partitions(rules, partitions);
let mut lifecycle = LifecyclePolicy::new_suppress_persistence(&db);
lifecycle.check_for_work(now, now_instant);
lifecycle.check_for_work(now);
assert_eq!(
*db.events.read(),
vec![MoverEvents::Compact(vec![ChunkId::new(2), ChunkId::new(3)]),]
);
lifecycle.check_for_work(now, now_instant);
lifecycle.check_for_work(now);
assert_eq!(
*db.events.read(),
vec![MoverEvents::Compact(vec![ChunkId::new(2), ChunkId::new(3)]),]
@ -1769,7 +1759,7 @@ mod tests {
lifecycle.unsuppress_persistence();
lifecycle.check_for_work(now, now_instant);
lifecycle.check_for_work(now);
assert_eq!(
*db.events.read(),
vec![
@ -1794,7 +1784,7 @@ mod tests {
let db = TestDb::new(rules, chunks);
let mut lifecycle = LifecyclePolicy::new(&db);
lifecycle.check_for_work(from_secs(80), Instant::now());
lifecycle.check_for_work(from_secs(80));
assert_eq!(
*db.events.read(),
vec![MoverEvents::Compact(vec![ChunkId::new(0)])]
@ -1816,7 +1806,7 @@ mod tests {
let db = TestDb::new(rules, chunks);
let mut lifecycle = LifecyclePolicy::new(&db);
lifecycle.check_for_work(from_secs(80), Instant::now());
lifecycle.check_for_work(from_secs(80));
assert_eq!(
*db.events.read(),
vec![MoverEvents::Compact(vec![ChunkId::new(0)])]
@ -1841,13 +1831,12 @@ mod tests {
chunk.write().lifecycle_action = Some(tracker.clone());
// Shouldn't do anything
lifecycle.check_for_work(from_secs(0), tracker.start_instant());
lifecycle.check_for_work(tracker.start_time());
assert!(chunk.read().lifecycle_action().is_some());
// Shouldn't do anything as job hasn't finished
lifecycle.check_for_work(
from_secs(0),
tracker.start_instant() + LIFECYCLE_ACTION_BACKOFF,
tracker.start_time() + chrono::Duration::seconds(LIFECYCLE_ACTION_BACKOFF_SECONDS),
);
assert!(chunk.read().lifecycle_action().is_some());
@ -1855,13 +1844,12 @@ mod tests {
std::mem::drop(r0);
// Shouldn't do anything as insufficient time passed
lifecycle.check_for_work(from_secs(0), tracker.start_instant());
lifecycle.check_for_work(tracker.start_time());
assert!(chunk.read().lifecycle_action().is_some());
// Should clear job
lifecycle.check_for_work(
from_secs(0),
tracker.start_instant() + LIFECYCLE_ACTION_BACKOFF,
tracker.start_time() + chrono::Duration::seconds(LIFECYCLE_ACTION_BACKOFF_SECONDS),
);
assert!(chunk.read().lifecycle_action().is_none());
}

View File

@ -9,7 +9,7 @@ use std::{
atomic::{AtomicUsize, Ordering},
Arc,
},
time::{Duration, Instant},
time::Duration,
};
use async_trait::async_trait;
@ -831,7 +831,7 @@ impl Db {
.as_mut()
.expect("lifecycle policy should be initialized");
policy.check_for_work(self.utc_now(), Instant::now()).await
policy.check_for_work(self.utc_now()).await
}
};

View File

@ -8,7 +8,6 @@ use data_types::{
ChunkAddr, ChunkColumnSummary, ChunkId, ChunkLifecycleAction, ChunkOrder, ChunkStorage,
ChunkSummary, DetailedChunkSummary,
},
instant::to_approximate_datetime,
partition_metadata::TableSummary,
write_summary::TimestampSummary,
};
@ -576,11 +575,7 @@ impl CatalogChunk {
.as_ref()
.map(|tracker| *tracker.metadata());
let time_of_last_access = self
.access_recorder
.get_metrics()
.last_access()
.map(to_approximate_datetime);
let time_of_last_access = self.access_recorder.get_metrics().last_access();
ChunkSummary {
partition_key: Arc::clone(&self.addr.partition_key),

View File

@ -596,13 +596,13 @@ mod tests {
// Query should count as an access
assert_eq!(t2.count + 1, t3.count);
assert!(t2.last_instant < t3.last_instant);
assert!(t2.last_access < t3.last_access);
// If column names successful should record access
match column_names {
true => {
assert_eq!(t3.count + 1, t4.count);
assert!(t3.last_instant < t4.last_instant);
assert!(t3.last_access < t4.last_access);
}
false => {
assert_eq!(t3, t4);
@ -613,7 +613,7 @@ mod tests {
match column_values {
true => {
assert_eq!(t4.count + 1, t5.count);
assert!(t4.last_instant < t5.last_instant);
assert!(t4.last_access < t5.last_access);
}
false => {
assert_eq!(t4, t5);

View File

@ -4,7 +4,6 @@ use arrow::array::{ArrayRef, StringArray, Time64NanosecondArray, TimestampNanose
use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit};
use arrow::error::Result;
use arrow::record_batch::RecordBatch;
use data_types::instant::to_approximate_datetime;
use itertools::Itertools;
use data_types::error::ErrorLogger;
@ -93,7 +92,7 @@ fn from_task_trackers(
.collect::<StringArray>();
let start_time = jobs
.iter()
.map(|job| Some(to_approximate_datetime(job.start_instant()).timestamp_nanos()))
.map(|job| Some(job.start_time().timestamp_nanos()))
.collect::<TimestampNanosecondArray>();
let cpu_time_used = jobs
.iter()

View File

@ -7,6 +7,7 @@ description = "Utilities for tracking resource utilisation within IOx"
[dependencies]
chrono = "0.4"
futures = "0.3"
hashbrown = "0.11"
lock_api = "0.4.4"

View File

@ -80,12 +80,10 @@
//! etc... between threads as any such functionality must perform the necessary
//! synchronisation to be well-formed.
use std::{
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
time::Instant,
use chrono::{DateTime, Utc};
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
use tokio_util::sync::CancellationToken;
@ -102,7 +100,7 @@ mod registry;
/// The state shared between all sibling tasks
#[derive(Debug)]
struct TrackerState {
start_instant: Instant,
start_time: DateTime<Utc>,
cancel_token: CancellationToken,
cpu_nanos: AtomicUsize,
wall_nanos: AtomicUsize,
@ -376,8 +374,8 @@ where
}
/// Returns the instant the tracker was created
pub fn start_instant(&self) -> Instant {
self.state.start_instant
pub fn start_time(&self) -> DateTime<Utc> {
self.state.start_time
}
/// Returns if this tracker has been cancelled
@ -433,7 +431,7 @@ impl Clone for TaskRegistration {
impl Default for TaskRegistration {
fn default() -> Self {
let state = Arc::new(TrackerState {
start_instant: Instant::now(),
start_time: Utc::now(),
cpu_nanos: AtomicUsize::new(0),
wall_nanos: AtomicUsize::new(0),
cancel_token: CancellationToken::new(),

View File

@ -93,9 +93,16 @@ impl<F: TryFuture> Future for TrackedFuture<F> {
#[pinned_drop]
impl<F: TryFuture> PinnedDrop for TrackedFuture<F> {
fn drop(self: Pin<&mut Self>) {
use std::convert::TryInto;
let state: &TrackerState = self.project().tracker;
let wall_nanos = state.start_instant.elapsed().as_nanos() as usize;
let elapsed: i64 = chrono::Utc::now()
.signed_duration_since(state.start_time)
.num_nanoseconds()
.unwrap();
let wall_nanos: usize = elapsed.try_into().unwrap_or_default();
state.wall_nanos.fetch_max(wall_nanos, Ordering::Relaxed);