feat: add instant to datetime conversion (#2078)

* feat: add instant to datetime conversion

* chore: review feedback
pull/24376/head
Raphael Taylor-Davies 2021-07-21 12:43:27 +01:00 committed by GitHub
parent 58108b79ec
commit ffe6e62aee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 78 additions and 42 deletions

1
Cargo.lock generated
View File

@ -826,6 +826,7 @@ dependencies = [
"influxdb_line_protocol",
"num_cpus",
"observability_deps",
"once_cell",
"percent-encoding",
"regex",
"serde",

View File

@ -15,6 +15,7 @@ regex = "1.4"
serde = { version = "1.0", features = ["rc", "derive"] }
snafu = "0.6"
observability_deps = { path = "../observability_deps" }
once_cell = { version = "1.4.0", features = ["parking_lot"] }
[dev-dependencies] # In alphabetical order
test_helpers = { path = "../test_helpers" }

53
data_types/src/instant.rs Normal file
View File

@ -0,0 +1,53 @@
use chrono::{DateTime, Utc};
use once_cell::sync::OnceCell;
use std::time::Instant;
/// Stores an Instant and DateTime<Utc> captured as close as possible together
static INSTANCE: OnceCell<(DateTime<Utc>, Instant)> = OnceCell::new();
/// Provides a conversion from Instant to DateTime<Utc> for display purposes
///
/// It is an approximation as if the system clock changes, the returned DateTime will not be
/// the same as the DateTime that would have been recorded at the time the Instant was created.
///
/// The conversion does, however, preserve the monotonic property of Instant, i.e. a larger
/// Instant will have a larger returned DateTime.
///
/// This should ONLY be used for display purposes, the results should not be used to
/// drive logic, nor persisted
pub fn to_approximate_datetime(instant: Instant) -> DateTime<Utc> {
let (ref_date, ref_instant) = *INSTANCE.get_or_init(|| (Utc::now(), Instant::now()));
if ref_instant > instant {
ref_date
- chrono::Duration::from_std(ref_instant.duration_since(instant))
.expect("date overflow")
} else {
ref_date
+ chrono::Duration::from_std(instant.duration_since(ref_instant))
.expect("date overflow")
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_to_datetime() {
// Seed global state
to_approximate_datetime(Instant::now());
let (ref_date, ref_instant) = *INSTANCE.get().unwrap();
assert_eq!(
to_approximate_datetime(ref_instant + std::time::Duration::from_nanos(78)),
ref_date + chrono::Duration::nanoseconds(78)
);
assert_eq!(
to_approximate_datetime(ref_instant - std::time::Duration::from_nanos(23)),
ref_date - chrono::Duration::nanoseconds(23)
);
}
}

View File

@ -13,13 +13,14 @@
pub mod chunk_metadata;
pub mod consistent_hasher;
mod database_name;
pub use database_name::*;
pub mod database_rules;
pub mod database_state;
pub mod error;
pub mod instant;
pub mod job;
pub mod names;
pub mod partition_metadata;
pub mod server_id;
pub mod timestamp;
pub mod write_summary;
pub use database_name::*;

View File

@ -13,6 +13,7 @@ use internal_types::guard::{ReadGuard, ReadLock};
use crate::checkpoint::PartitionCheckpoint;
use crate::min_max_sequence::MinMaxSequence;
use data_types::instant::to_approximate_datetime;
const DEFAULT_CLOSED_WINDOW_PERIOD: Duration = Duration::from_secs(30);
@ -45,15 +46,8 @@ pub struct PersistenceWindows {
late_arrival_period: Duration,
closed_window_period: Duration,
/// The datetime this PersistenceWindows was created
///
/// `PersistenceWindows` internally uses monotonic `Instant`, however,
/// these cannot be rendered. To provide a stable rendering of Wall timestamp,
/// a single timestamp is recorded at creation time
created_at_time: DateTime<Utc>,
/// The instant this PersistenceWindows was created
created_at_instant: Instant,
created_at: Instant,
/// The last instant passed to PersistenceWindows::add_range
last_instant: Instant,
@ -116,7 +110,6 @@ impl PersistenceWindows {
let closed_window_count = late_arrival_seconds / closed_window_seconds;
let created_at_time = Utc::now();
let created_at_instant = Instant::now();
Self {
@ -126,8 +119,7 @@ impl PersistenceWindows {
addr,
late_arrival_period,
closed_window_period,
created_at_time,
created_at_instant,
created_at: created_at_instant,
last_instant: created_at_instant,
max_sequence_numbers: Default::default(),
}
@ -362,25 +354,12 @@ impl PersistenceWindows {
/// These are approximate because persistence may partially flush a window, which will
/// update the min row timestamp but not the row count
pub fn summaries(&self) -> impl Iterator<Item = WriteSummary> + '_ {
self.windows().map(move |window| {
let window_age = chrono::Duration::from_std(
window.created_at.duration_since(self.created_at_instant),
)
.expect("duration overflow");
let time_of_first_write = self.created_at_time + window_age;
let window_duration =
chrono::Duration::from_std(window.last_instant.duration_since(window.created_at))
.expect("duration overflow");
WriteSummary {
time_of_first_write,
time_of_last_write: time_of_first_write + window_duration,
min_timestamp: window.min_time,
max_timestamp: window.max_time,
row_count: window.row_count,
}
self.windows().map(move |window| WriteSummary {
time_of_first_write: to_approximate_datetime(window.created_at),
time_of_last_write: to_approximate_datetime(window.last_instant),
min_timestamp: window.min_time,
max_timestamp: window.max_time,
row_count: window.row_count,
})
}
@ -1333,7 +1312,8 @@ mod tests {
fn test_summaries() {
let late_arrival_period = Duration::from_secs(100);
let mut w = make_windows(late_arrival_period);
let instant = w.created_at_instant;
let instant = w.created_at;
let created_at_time = to_approximate_datetime(w.created_at);
// Window 1
w.add_range(
@ -1387,17 +1367,17 @@ mod tests {
summaries,
vec![
WriteSummary {
time_of_first_write: w.created_at_time + chrono::Duration::milliseconds(1),
time_of_last_write: w.created_at_time + chrono::Duration::milliseconds(50),
time_of_first_write: created_at_time + chrono::Duration::milliseconds(1),
time_of_last_write: created_at_time + chrono::Duration::milliseconds(50),
min_timestamp: Utc.timestamp_nanos(1),
max_timestamp: Utc.timestamp_nanos(340),
row_count: 21
},
WriteSummary {
time_of_first_write: w.created_at_time
time_of_first_write: created_at_time
+ closed_duration
+ chrono::Duration::milliseconds(1),
time_of_last_write: w.created_at_time
time_of_last_write: created_at_time
+ closed_duration
+ chrono::Duration::milliseconds(1),
min_timestamp: Utc.timestamp_nanos(89),
@ -1405,8 +1385,8 @@ mod tests {
row_count: 3
},
WriteSummary {
time_of_first_write: w.created_at_time + closed_duration * 3,
time_of_last_write: w.created_at_time + closed_duration * 3,
time_of_first_write: created_at_time + closed_duration * 3,
time_of_last_write: created_at_time + closed_duration * 3,
min_timestamp: Utc.timestamp_nanos(3),
max_timestamp: Utc.timestamp_nanos(4),
row_count: 8
@ -1424,8 +1404,8 @@ mod tests {
summaries,
vec![
WriteSummary {
time_of_first_write: w.created_at_time + chrono::Duration::milliseconds(1),
time_of_last_write: w.created_at_time
time_of_first_write: created_at_time + chrono::Duration::milliseconds(1),
time_of_last_write: created_at_time
+ closed_duration
+ chrono::Duration::milliseconds(1),
min_timestamp: Utc.timestamp_nanos(1),
@ -1433,8 +1413,8 @@ mod tests {
row_count: 24
},
WriteSummary {
time_of_first_write: w.created_at_time + closed_duration * 3,
time_of_last_write: w.created_at_time + closed_duration * 3,
time_of_first_write: created_at_time + closed_duration * 3,
time_of_last_write: created_at_time + closed_duration * 3,
min_timestamp: Utc.timestamp_nanos(3),
max_timestamp: Utc.timestamp_nanos(4),
row_count: 8