From 425aacc391ffdc29fb7bf9266485afd93efb871a Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Wed, 12 May 2021 11:21:10 -0400 Subject: [PATCH] refactor: Extract ProcessClock into its own type --- server/src/db.rs | 152 ++--------------------------- server/src/db/process_clock.rs | 168 +++++++++++++++++++++++++++++++++ 2 files changed, 175 insertions(+), 145 deletions(-) create mode 100644 server/src/db/process_clock.rs diff --git a/server/src/db.rs b/server/src/db.rs index 3db6bb26ec..914f06de6b 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -11,7 +11,6 @@ use catalog::{ chunk::{Chunk as CatalogChunk, ChunkState}, Catalog, }; -use chrono::Utc; pub(crate) use chunk::DbChunk; use data_types::{ chunk::ChunkSummary, @@ -25,7 +24,7 @@ use datafusion::{ catalog::{catalog::CatalogProvider, schema::SchemaProvider}, physical_plan::SendableRecordBatchStream, }; -use entry::{ClockValue, Entry, OwnedSequencedEntry, SequencedEntry}; +use entry::{Entry, OwnedSequencedEntry, SequencedEntry}; use internal_types::{arrow::sort::sort_record_batch, selection::Selection}; use lifecycle::LifecycleManager; use metrics::{KeyValue, MetricObserver, MetricObserverBuilder, MetricRegistry}; @@ -39,10 +38,10 @@ use read_buffer::Chunk as ReadBufferChunk; use snafu::{ensure, ResultExt, Snafu}; use std::{ any::Any, - convert::{TryFrom, TryInto}, + convert::TryInto, num::NonZeroUsize, sync::{ - atomic::{AtomicU64, AtomicUsize, Ordering}, + atomic::{AtomicUsize, Ordering}, Arc, }, }; @@ -53,6 +52,7 @@ pub mod catalog; mod chunk; mod lifecycle; pub mod pred; +mod process_clock; mod streams; mod system_tables; @@ -305,7 +305,7 @@ pub struct Db { /// Process clock used in establishing a partial ordering of operations via a Lamport Clock. /// /// Value is nanoseconds since the Unix Epoch. - process_clock: AtomicU64, + process_clock: process_clock::ProcessClock, /// Number of iterations of the worker loop for this Db worker_iterations: AtomicUsize, @@ -469,7 +469,7 @@ impl Db { SystemSchemaProvider::new(&db_name, Arc::clone(&catalog), Arc::clone(&jobs)); let system_tables = Arc::new(system_tables); - let process_clock = AtomicU64::new(now_nanos()); + let process_clock = process_clock::ProcessClock::new(); Self { rules, @@ -1057,37 +1057,6 @@ impl Db { tracker } - /// Returns the next process clock value, which will be the maximum of the system time in - /// nanoseconds or the previous process clock value plus 1. Every operation that needs a - /// process clock value should be incrementing it as well, so there should never be a read of - /// the process clock without an accompanying increment of at least 1 nanosecond. - pub fn next_process_clock(&self) -> ClockValue { - let next = loop { - if let Ok(next) = self.try_update_process_clock() { - break next; - } - }; - - ClockValue::try_from(next).expect("process clock should not be 0") - } - - fn try_update_process_clock(&self) -> Result { - let now = now_nanos(); - let current_process_clock = self.process_clock.load(Ordering::SeqCst); - let next_candidate = current_process_clock + 1; - - let next = now.max(next_candidate); - - self.process_clock - .compare_exchange( - current_process_clock, - next, - Ordering::SeqCst, - Ordering::SeqCst, - ) - .map(|_| next) - } - /// Return chunk summary information for all chunks in the specified /// partition across all storage systems pub fn partition_chunk_summaries(&self, partition_key: &str) -> Vec { @@ -1158,7 +1127,7 @@ impl Db { pub fn store_entry(&self, entry: Entry) -> Result<()> { let sequenced_entry = Arc::new( OwnedSequencedEntry::new_from_entry_bytes( - self.next_process_clock(), + self.process_clock.next(), self.server_id, entry.data(), ) @@ -1342,15 +1311,6 @@ impl CatalogProvider for Db { } } -// Convenience function for getting the current time in a `u64` represented as nanoseconds since -// the epoch -fn now_nanos() -> u64 { - Utc::now() - .timestamp_nanos() - .try_into() - .expect("current time since the epoch should be positive") -} - pub mod test_helpers { use super::*; use entry::test_helpers::lp_to_entries; @@ -2912,102 +2872,4 @@ mod tests { .gt(0.07) .unwrap(); } - - #[test] - fn process_clock_defaults_to_current_time_in_ns() { - let before = now_nanos(); - - let db = Arc::new(TestDb::builder().build().db); - let db_process_clock = db.process_clock.load(Ordering::SeqCst); - - let after = now_nanos(); - - assert!( - before < db_process_clock, - "expected {} to be less than {}", - before, - db_process_clock - ); - assert!( - db_process_clock < after, - "expected {} to be less than {}", - db_process_clock, - after - ); - } - - #[test] - fn process_clock_incremented_and_set_on_sequenced_entry() { - let before = now_nanos(); - let before = ClockValue::try_from(before).unwrap(); - - let db = Arc::new(TestDb::builder().write_buffer(true).build().db); - - let entry = lp_to_entry("cpu bar=1 10"); - db.store_entry(entry).unwrap(); - - let between = now_nanos(); - let between = ClockValue::try_from(between).unwrap(); - - let entry = lp_to_entry("cpu foo=2 10"); - db.store_entry(entry).unwrap(); - - let after = now_nanos(); - let after = ClockValue::try_from(after).unwrap(); - - let sequenced_entries = db - .write_buffer - .as_ref() - .unwrap() - .lock() - .writes_since(before); - assert_eq!(sequenced_entries.len(), 2); - - assert!( - sequenced_entries[0].clock_value() < between, - "expected {:?} to be before {:?}", - sequenced_entries[0].clock_value(), - between - ); - - assert!( - between < sequenced_entries[1].clock_value(), - "expected {:?} to be before {:?}", - between, - sequenced_entries[1].clock_value(), - ); - - assert!( - sequenced_entries[1].clock_value() < after, - "expected {:?} to be before {:?}", - sequenced_entries[1].clock_value(), - after - ); - } - - #[test] - fn next_process_clock_always_increments() { - // Process clock defaults to the current time - let db = Arc::new(TestDb::builder().write_buffer(true).build().db); - - // Set the process clock value to a time in the future, so that when compared to the - // current time, the process clock value will be greater - let later: u64 = (Utc::now() + chrono::Duration::weeks(4)) - .timestamp_nanos() - .try_into() - .unwrap(); - - db.process_clock.store(later, Ordering::SeqCst); - - // Every call to next_process_clock should increment at least 1, even in this case - // where the system time will be less than the process clock - assert_eq!( - db.next_process_clock(), - ClockValue::try_from(later + 1).unwrap() - ); - assert_eq!( - db.next_process_clock(), - ClockValue::try_from(later + 2).unwrap() - ); - } } diff --git a/server/src/db/process_clock.rs b/server/src/db/process_clock.rs new file mode 100644 index 0000000000..d147df2602 --- /dev/null +++ b/server/src/db/process_clock.rs @@ -0,0 +1,168 @@ +//! Process clock used in establishing a partial ordering of operations via a Lamport Clock. + +use chrono::Utc; +use entry::ClockValue; +use std::{ + convert::{TryFrom, TryInto}, + sync::atomic::{AtomicU64, Ordering}, +}; + +#[derive(Debug)] +pub struct ProcessClock { + inner: AtomicU64, +} + +impl ProcessClock { + /// Create a new process clock value initialized to the current system time. + pub fn new() -> Self { + Self { + inner: AtomicU64::new(now_nanos()), + } + } + + /// Returns the next process clock value, which will be the maximum of the system time in + /// nanoseconds or the previous process clock value plus 1. Every operation that needs a + /// process clock value should be incrementing it as well, so there should never be a read of + /// the process clock without an accompanying increment of at least 1 nanosecond. + pub fn next(&self) -> ClockValue { + let next = loop { + if let Ok(next) = self.try_update() { + break next; + } + }; + + ClockValue::try_from(next).expect("process clock should not be 0") + } + + fn try_update(&self) -> Result { + let now = now_nanos(); + let current_process_clock = self.inner.load(Ordering::SeqCst); + let next_candidate = current_process_clock + 1; + + let next = now.max(next_candidate); + + self.inner + .compare_exchange( + current_process_clock, + next, + Ordering::SeqCst, + Ordering::SeqCst, + ) + .map(|_| next) + } +} + +// Convenience function for getting the current time in a `u64` represented as nanoseconds since +// the epoch +fn now_nanos() -> u64 { + Utc::now() + .timestamp_nanos() + .try_into() + .expect("current time since the epoch should be positive") +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::query_tests::utils::TestDb; + use entry::test_helpers::lp_to_entry; + use std::sync::Arc; + + #[test] + fn process_clock_defaults_to_current_time_in_ns() { + let before = now_nanos(); + + let db = Arc::new(TestDb::builder().build().db); + let db_process_clock = db.process_clock.inner.load(Ordering::SeqCst); + + let after = now_nanos(); + + assert!( + before < db_process_clock, + "expected {} to be less than {}", + before, + db_process_clock + ); + assert!( + db_process_clock < after, + "expected {} to be less than {}", + db_process_clock, + after + ); + } + + #[test] + fn process_clock_incremented_and_set_on_sequenced_entry() { + let before = now_nanos(); + let before = ClockValue::try_from(before).unwrap(); + + let db = Arc::new(TestDb::builder().write_buffer(true).build().db); + + let entry = lp_to_entry("cpu bar=1 10"); + db.store_entry(entry).unwrap(); + + let between = now_nanos(); + let between = ClockValue::try_from(between).unwrap(); + + let entry = lp_to_entry("cpu foo=2 10"); + db.store_entry(entry).unwrap(); + + let after = now_nanos(); + let after = ClockValue::try_from(after).unwrap(); + + let sequenced_entries = db + .write_buffer + .as_ref() + .unwrap() + .lock() + .writes_since(before); + assert_eq!(sequenced_entries.len(), 2); + + assert!( + sequenced_entries[0].clock_value() < between, + "expected {:?} to be before {:?}", + sequenced_entries[0].clock_value(), + between + ); + + assert!( + between < sequenced_entries[1].clock_value(), + "expected {:?} to be before {:?}", + between, + sequenced_entries[1].clock_value(), + ); + + assert!( + sequenced_entries[1].clock_value() < after, + "expected {:?} to be before {:?}", + sequenced_entries[1].clock_value(), + after + ); + } + + #[test] + fn next_process_clock_always_increments() { + // Process clock defaults to the current time + let db = Arc::new(TestDb::builder().write_buffer(true).build().db); + + // Set the process clock value to a time in the future, so that when compared to the + // current time, the process clock value will be greater + let later: u64 = (Utc::now() + chrono::Duration::weeks(4)) + .timestamp_nanos() + .try_into() + .unwrap(); + + db.process_clock.inner.store(later, Ordering::SeqCst); + + // Every call to next_process_clock should increment at least 1, even in this case + // where the system time will be less than the process clock + assert_eq!( + db.process_clock.next(), + ClockValue::try_from(later + 1).unwrap() + ); + assert_eq!( + db.process_clock.next(), + ClockValue::try_from(later + 2).unwrap() + ); + } +}