diff --git a/Cargo.lock b/Cargo.lock index 9f24fa9634..5b44d0f837 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1258,7 +1258,6 @@ name = "generated_types" version = "0.1.0" dependencies = [ "bytes", - "chrono", "data_types", "num_cpus", "observability_deps", @@ -1799,7 +1798,6 @@ checksum = "90c11140ffea82edce8dcd74137ce9324ec24b3cf0175fc9d7e29164da9915b8" name = "internal_types" version = "0.1.0" dependencies = [ - "chrono", "futures", "parking_lot", "snafu", @@ -1971,12 +1969,12 @@ dependencies = [ name = "lifecycle" version = "0.1.0" dependencies = [ - "chrono", "data_types", "futures", "hashbrown 0.11.2", "internal_types", "observability_deps", + "parking_lot", "time 0.1.0", "tokio", "tracker", @@ -2206,7 +2204,6 @@ dependencies = [ "arrow", "arrow_util", "async-trait", - "chrono", "data_types", "entry", "hashbrown 0.11.2", @@ -3353,7 +3350,6 @@ version = "0.1.0" dependencies = [ "arrow", "arrow_util", - "chrono", "criterion", "croaring", "data_types", @@ -4644,7 +4640,6 @@ dependencies = [ name = "tracker" version = "0.1.0" dependencies = [ - "chrono", "futures", "hashbrown 0.11.2", "lock_api", @@ -4652,6 +4647,7 @@ dependencies = [ "observability_deps", "parking_lot", "pin-project", + "time 0.1.0", "tokio", "tokio-util", ] @@ -4982,7 +4978,6 @@ name = "write_buffer" version = "0.1.0" dependencies = [ "async-trait", - "chrono", "data_types", "dotenv", "entry", diff --git a/entry/Cargo.toml b/entry/Cargo.toml index 36ad18780e..ba241cc335 100644 --- a/entry/Cargo.toml +++ b/entry/Cargo.toml @@ -6,7 +6,7 @@ edition = "2018" description = "The entry format used by the write buffer" [dependencies] -chrono = { version = "0.4", features = ["serde"] } +chrono = "0.4" data_types = { path = "../data_types" } # See docs/regenerating_flatbuffers.md about updating generated code when updating the # version of the flatbuffers crate diff --git a/generated_types/Cargo.toml b/generated_types/Cargo.toml index 614432c792..b9f6695ac9 100644 --- a/generated_types/Cargo.toml +++ b/generated_types/Cargo.toml @@ -18,7 +18,6 @@ tonic = "0.5" time = { path = "../time" } [dev-dependencies] -chrono = { version = "0.4", features = ["serde"] } num_cpus = "1.13.0" [build-dependencies] # In alphabetical order diff --git a/internal_types/Cargo.toml b/internal_types/Cargo.toml index 3e925b508e..89a0e3865e 100644 --- a/internal_types/Cargo.toml +++ b/internal_types/Cargo.toml @@ -7,7 +7,6 @@ description = "InfluxDB IOx internal types, shared between IOx instances" readme = "README.md" [dependencies] -chrono = "0.4" parking_lot = "0.11" snafu = "0.6" time = { path = "../time" } diff --git a/lifecycle/Cargo.toml b/lifecycle/Cargo.toml index 3ccf784f13..8c827855ef 100644 --- a/lifecycle/Cargo.toml +++ b/lifecycle/Cargo.toml @@ -6,12 +6,12 @@ edition = "2018" description = "Implements the IOx data lifecycle" [dependencies] -chrono = "0.4" data_types = { path = "../data_types" } futures = "0.3" hashbrown = "0.11" internal_types = { path = "../internal_types" } observability_deps = { path = "../observability_deps" } +parking_lot = "0.11" time = { path = "../time" } tokio = { version = "1.11", features = ["macros", "time"] } tracker = { path = "../tracker" } diff --git a/lifecycle/src/lib.rs b/lifecycle/src/lib.rs index d1a0a2e681..20c57b5ccc 100644 --- a/lifecycle/src/lib.rs +++ b/lifecycle/src/lib.rs @@ -8,20 +8,20 @@ clippy::clone_on_ref_ptr )] -use chrono::{DateTime, Utc}; use data_types::{ chunk_metadata::{ChunkAddr, ChunkId, ChunkLifecycleAction, ChunkOrder, ChunkStorage}, database_rules::LifecycleRules, DatabaseName, }; use internal_types::access::AccessMetrics; +use std::sync::Arc; use tracker::TaskTracker; mod guard; pub use guard::*; mod policy; pub use policy::*; -use time::Time; +use time::{Time, TimeProvider}; /// A trait that encapsulates the database logic that is automated by `LifecyclePolicy` pub trait LifecycleDb { @@ -40,6 +40,9 @@ pub trait LifecycleDb { /// Return the database name. fn name(&self) -> DatabaseName<'static>; + + /// Return the time provider for this database + fn time_provider(&self) -> Arc; } /// A `LockablePartition` is a wrapper around a `LifecyclePartition` that allows @@ -170,7 +173,7 @@ pub trait LifecycleChunk { fn clear_lifecycle_action(&mut self); /// Returns the min timestamp contained within this chunk - fn min_timestamp(&self) -> DateTime; + fn min_timestamp(&self) -> Time; /// Returns the access metrics for this chunk fn access_metrics(&self) -> AccessMetrics; diff --git a/lifecycle/src/policy.rs b/lifecycle/src/policy.rs index 85817ff145..25e737c5b8 100644 --- a/lifecycle/src/policy.rs +++ b/lifecycle/src/policy.rs @@ -2,7 +2,6 @@ use crate::{ LifecycleChunk, LifecycleDb, LifecyclePartition, LifecycleWriteGuard, LockableChunk, LockablePartition, PersistHandle, }; -use chrono::{DateTime, Utc}; use data_types::{ chunk_metadata::{ChunkId, ChunkLifecycleAction, ChunkStorage}, database_rules::LifecycleRules, @@ -11,12 +10,12 @@ use data_types::{ use futures::future::BoxFuture; use internal_types::access::AccessMetrics; use observability_deps::tracing::{debug, info, trace, warn}; -use std::{convert::TryInto, fmt::Debug}; +use std::{fmt::Debug, time::Duration}; use time::Time; use tracker::TaskTracker; /// Number of seconds to wait before retrying a failed lifecycle action -pub const LIFECYCLE_ACTION_BACKOFF_SECONDS: i64 = 10; +pub const LIFECYCLE_ACTION_BACKOFF: Duration = Duration::from_secs(10); /// A `LifecyclePolicy` is created with a `LifecycleDb` /// @@ -215,7 +214,7 @@ where &mut self, partition: &P, rules: &LifecycleRules, - now: DateTime, + now: Time, ) { let mut rows_left = rules.persist_row_threshold.get(); @@ -338,7 +337,7 @@ where db_name: &DatabaseName<'static>, partition: &P, rules: &LifecycleRules, - now: DateTime, + now: Time, ) -> bool { // TODO: Encapsulate locking into a CatalogTransaction type let partition = partition.read(); @@ -348,13 +347,13 @@ where return false; } - let persistable_age_seconds: u32 = partition + let persistable_age_seconds = partition .minimum_unpersisted_age() .and_then(|minimum_unpersisted_age| { - (now - minimum_unpersisted_age.date_time()) - .num_seconds() - .try_into() - .ok() + Some( + now.checked_duration_since(minimum_unpersisted_age)? + .as_secs(), + ) }) .unwrap_or_default(); @@ -368,7 +367,7 @@ where if persistable_row_count >= rules.persist_row_threshold.get() { info!(%db_name, %partition, persistable_row_count, "persisting partition as exceeds row threshold"); - } else if persistable_age_seconds >= rules.persist_age_threshold_seconds.get() { + } else if persistable_age_seconds >= rules.persist_age_threshold_seconds.get() as u64 { info!(%db_name, %partition, persistable_age_seconds, "persisting partition as exceeds age threshold"); } else { debug!(%db_name, %partition, persistable_row_count, "partition not eligible for persist"); @@ -417,7 +416,7 @@ where &mut self, db_name: &DatabaseName<'static>, partition: &P, - now: DateTime, + now: Time, ) { let partition = partition.read(); for chunk in LockablePartition::chunks(&partition) { @@ -425,9 +424,9 @@ where if let Some(lifecycle_action) = chunk.lifecycle_action() { if lifecycle_action.is_complete() && now - .signed_duration_since(lifecycle_action.start_time()) - .num_seconds() - >= LIFECYCLE_ACTION_BACKOFF_SECONDS + .checked_duration_since(lifecycle_action.start_time()) + .unwrap_or_default() + >= LIFECYCLE_ACTION_BACKOFF { info!(%db_name, chunk=%chunk.addr(), action=?lifecycle_action.metadata(), "clearing failed lifecycle action"); chunk.upgrade().clear_lifecycle_action(); @@ -439,12 +438,13 @@ where /// The core policy logic /// /// Returns a future that resolves when this method should be called next - pub fn check_for_work(&mut self, now: DateTime) -> BoxFuture<'static, ()> { + pub fn check_for_work(&mut self) -> BoxFuture<'static, ()> { // Any time-consuming work should be spawned as tokio tasks and not // run directly within this loop // TODO: Add loop iteration count and duration metrics + let now = self.db.time_provider().now(); let db_name = self.db.name(); let rules = self.db.rules(); let partitions = self.db.partitions(); @@ -561,29 +561,20 @@ where } } -/// Returns the number of seconds between two times -/// -/// Computes a - b -#[inline] -fn elapsed_seconds(a: DateTime, b: DateTime) -> u32 { - let seconds = (a - b).num_seconds(); - if seconds < 0 { - 0 // This can occur as DateTime is not monotonic - } else { - seconds.try_into().unwrap_or(u32::max_value()) - } -} - /// Returns if the chunk is sufficiently cold and old to move /// /// Note: Does not check the chunk is the correct state -fn can_move(rules: &LifecycleRules, chunk: &C, now: DateTime) -> bool { +fn can_move(rules: &LifecycleRules, chunk: &C, now: Time) -> bool { if chunk.row_count() >= rules.mub_row_threshold.get() { return true; } - elapsed_seconds(now, chunk.time_of_last_write().date_time()) - >= rules.late_arrive_window_seconds.get() + let elapsed = now + .checked_duration_since(chunk.time_of_last_write()) + .unwrap_or_default() + .as_secs(); + + elapsed >= rules.late_arrive_window_seconds.get() as u64 } /// An action to free up memory @@ -657,7 +648,7 @@ where // Chunk's data is entirely after the time we are flushing // up to, and thus there is reason to include it in the // plan - if chunk.min_timestamp() > flush_ts.date_time() { + if chunk.min_timestamp() > flush_ts { // Ignore chunk for now, but we might need it later to close chunk order gaps debug!( chunk=%chunk.addr(), @@ -702,9 +693,9 @@ mod tests { ChunkLifecycleAction, LifecycleReadGuard, LifecycleWriteGuard, LockableChunk, LockablePartition, PersistHandle, }; - use chrono::TimeZone; use data_types::chunk_metadata::{ChunkAddr, ChunkId, ChunkOrder, ChunkStorage}; use data_types::database_rules::MaxActiveCompactions::MaxActiveCompactions; + use parking_lot::Mutex; use std::time::Duration; use std::{ cmp::max, @@ -713,7 +704,8 @@ mod tests { num::{NonZeroU32, NonZeroUsize}, sync::Arc, }; - use tracker::{RwLock, TaskId, TaskRegistration, TaskRegistry}; + use time::{MockProvider, TimeProvider}; + use tracker::{RwLock, TaskRegistry}; #[derive(Debug, Eq, PartialEq)] enum MoverEvents { @@ -753,7 +745,7 @@ mod tests { struct TestChunk { addr: ChunkAddr, row_count: usize, - min_timestamp: Option>, + min_timestamp: Option