refactor: Extract ProcessClock into its own type

pull/24376/head
Carol (Nichols || Goulding) 2021-05-12 11:21:10 -04:00
parent b749353d21
commit 425aacc391
2 changed files with 175 additions and 145 deletions

View File

@ -11,7 +11,6 @@ use catalog::{
chunk::{Chunk as CatalogChunk, ChunkState}, chunk::{Chunk as CatalogChunk, ChunkState},
Catalog, Catalog,
}; };
use chrono::Utc;
pub(crate) use chunk::DbChunk; pub(crate) use chunk::DbChunk;
use data_types::{ use data_types::{
chunk::ChunkSummary, chunk::ChunkSummary,
@ -25,7 +24,7 @@ use datafusion::{
catalog::{catalog::CatalogProvider, schema::SchemaProvider}, catalog::{catalog::CatalogProvider, schema::SchemaProvider},
physical_plan::SendableRecordBatchStream, physical_plan::SendableRecordBatchStream,
}; };
use entry::{ClockValue, Entry, OwnedSequencedEntry, SequencedEntry}; use entry::{Entry, OwnedSequencedEntry, SequencedEntry};
use internal_types::{arrow::sort::sort_record_batch, selection::Selection}; use internal_types::{arrow::sort::sort_record_batch, selection::Selection};
use lifecycle::LifecycleManager; use lifecycle::LifecycleManager;
use metrics::{KeyValue, MetricObserver, MetricObserverBuilder, MetricRegistry}; use metrics::{KeyValue, MetricObserver, MetricObserverBuilder, MetricRegistry};
@ -39,10 +38,10 @@ use read_buffer::Chunk as ReadBufferChunk;
use snafu::{ensure, ResultExt, Snafu}; use snafu::{ensure, ResultExt, Snafu};
use std::{ use std::{
any::Any, any::Any,
convert::{TryFrom, TryInto}, convert::TryInto,
num::NonZeroUsize, num::NonZeroUsize,
sync::{ sync::{
atomic::{AtomicU64, AtomicUsize, Ordering}, atomic::{AtomicUsize, Ordering},
Arc, Arc,
}, },
}; };
@ -53,6 +52,7 @@ pub mod catalog;
mod chunk; mod chunk;
mod lifecycle; mod lifecycle;
pub mod pred; pub mod pred;
mod process_clock;
mod streams; mod streams;
mod system_tables; mod system_tables;
@ -305,7 +305,7 @@ pub struct Db {
/// Process clock used in establishing a partial ordering of operations via a Lamport Clock. /// Process clock used in establishing a partial ordering of operations via a Lamport Clock.
/// ///
/// Value is nanoseconds since the Unix Epoch. /// Value is nanoseconds since the Unix Epoch.
process_clock: AtomicU64, process_clock: process_clock::ProcessClock,
/// Number of iterations of the worker loop for this Db /// Number of iterations of the worker loop for this Db
worker_iterations: AtomicUsize, worker_iterations: AtomicUsize,
@ -469,7 +469,7 @@ impl Db {
SystemSchemaProvider::new(&db_name, Arc::clone(&catalog), Arc::clone(&jobs)); SystemSchemaProvider::new(&db_name, Arc::clone(&catalog), Arc::clone(&jobs));
let system_tables = Arc::new(system_tables); let system_tables = Arc::new(system_tables);
let process_clock = AtomicU64::new(now_nanos()); let process_clock = process_clock::ProcessClock::new();
Self { Self {
rules, rules,
@ -1057,37 +1057,6 @@ impl Db {
tracker tracker
} }
/// Returns the next process clock value, which will be the maximum of the system time in
/// nanoseconds or the previous process clock value plus 1. Every operation that needs a
/// process clock value should be incrementing it as well, so there should never be a read of
/// the process clock without an accompanying increment of at least 1 nanosecond.
pub fn next_process_clock(&self) -> ClockValue {
let next = loop {
if let Ok(next) = self.try_update_process_clock() {
break next;
}
};
ClockValue::try_from(next).expect("process clock should not be 0")
}
fn try_update_process_clock(&self) -> Result<u64, u64> {
let now = now_nanos();
let current_process_clock = self.process_clock.load(Ordering::SeqCst);
let next_candidate = current_process_clock + 1;
let next = now.max(next_candidate);
self.process_clock
.compare_exchange(
current_process_clock,
next,
Ordering::SeqCst,
Ordering::SeqCst,
)
.map(|_| next)
}
/// Return chunk summary information for all chunks in the specified /// Return chunk summary information for all chunks in the specified
/// partition across all storage systems /// partition across all storage systems
pub fn partition_chunk_summaries(&self, partition_key: &str) -> Vec<ChunkSummary> { pub fn partition_chunk_summaries(&self, partition_key: &str) -> Vec<ChunkSummary> {
@ -1158,7 +1127,7 @@ impl Db {
pub fn store_entry(&self, entry: Entry) -> Result<()> { pub fn store_entry(&self, entry: Entry) -> Result<()> {
let sequenced_entry = Arc::new( let sequenced_entry = Arc::new(
OwnedSequencedEntry::new_from_entry_bytes( OwnedSequencedEntry::new_from_entry_bytes(
self.next_process_clock(), self.process_clock.next(),
self.server_id, self.server_id,
entry.data(), entry.data(),
) )
@ -1342,15 +1311,6 @@ impl CatalogProvider for Db {
} }
} }
// Convenience function for getting the current time in a `u64` represented as nanoseconds since
// the epoch
fn now_nanos() -> u64 {
Utc::now()
.timestamp_nanos()
.try_into()
.expect("current time since the epoch should be positive")
}
pub mod test_helpers { pub mod test_helpers {
use super::*; use super::*;
use entry::test_helpers::lp_to_entries; use entry::test_helpers::lp_to_entries;
@ -2912,102 +2872,4 @@ mod tests {
.gt(0.07) .gt(0.07)
.unwrap(); .unwrap();
} }
#[test]
fn process_clock_defaults_to_current_time_in_ns() {
let before = now_nanos();
let db = Arc::new(TestDb::builder().build().db);
let db_process_clock = db.process_clock.load(Ordering::SeqCst);
let after = now_nanos();
assert!(
before < db_process_clock,
"expected {} to be less than {}",
before,
db_process_clock
);
assert!(
db_process_clock < after,
"expected {} to be less than {}",
db_process_clock,
after
);
}
#[test]
fn process_clock_incremented_and_set_on_sequenced_entry() {
let before = now_nanos();
let before = ClockValue::try_from(before).unwrap();
let db = Arc::new(TestDb::builder().write_buffer(true).build().db);
let entry = lp_to_entry("cpu bar=1 10");
db.store_entry(entry).unwrap();
let between = now_nanos();
let between = ClockValue::try_from(between).unwrap();
let entry = lp_to_entry("cpu foo=2 10");
db.store_entry(entry).unwrap();
let after = now_nanos();
let after = ClockValue::try_from(after).unwrap();
let sequenced_entries = db
.write_buffer
.as_ref()
.unwrap()
.lock()
.writes_since(before);
assert_eq!(sequenced_entries.len(), 2);
assert!(
sequenced_entries[0].clock_value() < between,
"expected {:?} to be before {:?}",
sequenced_entries[0].clock_value(),
between
);
assert!(
between < sequenced_entries[1].clock_value(),
"expected {:?} to be before {:?}",
between,
sequenced_entries[1].clock_value(),
);
assert!(
sequenced_entries[1].clock_value() < after,
"expected {:?} to be before {:?}",
sequenced_entries[1].clock_value(),
after
);
}
#[test]
fn next_process_clock_always_increments() {
// Process clock defaults to the current time
let db = Arc::new(TestDb::builder().write_buffer(true).build().db);
// Set the process clock value to a time in the future, so that when compared to the
// current time, the process clock value will be greater
let later: u64 = (Utc::now() + chrono::Duration::weeks(4))
.timestamp_nanos()
.try_into()
.unwrap();
db.process_clock.store(later, Ordering::SeqCst);
// Every call to next_process_clock should increment at least 1, even in this case
// where the system time will be less than the process clock
assert_eq!(
db.next_process_clock(),
ClockValue::try_from(later + 1).unwrap()
);
assert_eq!(
db.next_process_clock(),
ClockValue::try_from(later + 2).unwrap()
);
}
} }

View File

@ -0,0 +1,168 @@
//! Process clock used in establishing a partial ordering of operations via a Lamport Clock.
use chrono::Utc;
use entry::ClockValue;
use std::{
convert::{TryFrom, TryInto},
sync::atomic::{AtomicU64, Ordering},
};
#[derive(Debug)]
pub struct ProcessClock {
inner: AtomicU64,
}
impl ProcessClock {
/// Create a new process clock value initialized to the current system time.
pub fn new() -> Self {
Self {
inner: AtomicU64::new(now_nanos()),
}
}
/// Returns the next process clock value, which will be the maximum of the system time in
/// nanoseconds or the previous process clock value plus 1. Every operation that needs a
/// process clock value should be incrementing it as well, so there should never be a read of
/// the process clock without an accompanying increment of at least 1 nanosecond.
pub fn next(&self) -> ClockValue {
let next = loop {
if let Ok(next) = self.try_update() {
break next;
}
};
ClockValue::try_from(next).expect("process clock should not be 0")
}
fn try_update(&self) -> Result<u64, u64> {
let now = now_nanos();
let current_process_clock = self.inner.load(Ordering::SeqCst);
let next_candidate = current_process_clock + 1;
let next = now.max(next_candidate);
self.inner
.compare_exchange(
current_process_clock,
next,
Ordering::SeqCst,
Ordering::SeqCst,
)
.map(|_| next)
}
}
// Convenience function for getting the current time in a `u64` represented as nanoseconds since
// the epoch
fn now_nanos() -> u64 {
Utc::now()
.timestamp_nanos()
.try_into()
.expect("current time since the epoch should be positive")
}
#[cfg(test)]
mod tests {
use super::*;
use crate::query_tests::utils::TestDb;
use entry::test_helpers::lp_to_entry;
use std::sync::Arc;
#[test]
fn process_clock_defaults_to_current_time_in_ns() {
let before = now_nanos();
let db = Arc::new(TestDb::builder().build().db);
let db_process_clock = db.process_clock.inner.load(Ordering::SeqCst);
let after = now_nanos();
assert!(
before < db_process_clock,
"expected {} to be less than {}",
before,
db_process_clock
);
assert!(
db_process_clock < after,
"expected {} to be less than {}",
db_process_clock,
after
);
}
#[test]
fn process_clock_incremented_and_set_on_sequenced_entry() {
let before = now_nanos();
let before = ClockValue::try_from(before).unwrap();
let db = Arc::new(TestDb::builder().write_buffer(true).build().db);
let entry = lp_to_entry("cpu bar=1 10");
db.store_entry(entry).unwrap();
let between = now_nanos();
let between = ClockValue::try_from(between).unwrap();
let entry = lp_to_entry("cpu foo=2 10");
db.store_entry(entry).unwrap();
let after = now_nanos();
let after = ClockValue::try_from(after).unwrap();
let sequenced_entries = db
.write_buffer
.as_ref()
.unwrap()
.lock()
.writes_since(before);
assert_eq!(sequenced_entries.len(), 2);
assert!(
sequenced_entries[0].clock_value() < between,
"expected {:?} to be before {:?}",
sequenced_entries[0].clock_value(),
between
);
assert!(
between < sequenced_entries[1].clock_value(),
"expected {:?} to be before {:?}",
between,
sequenced_entries[1].clock_value(),
);
assert!(
sequenced_entries[1].clock_value() < after,
"expected {:?} to be before {:?}",
sequenced_entries[1].clock_value(),
after
);
}
#[test]
fn next_process_clock_always_increments() {
// Process clock defaults to the current time
let db = Arc::new(TestDb::builder().write_buffer(true).build().db);
// Set the process clock value to a time in the future, so that when compared to the
// current time, the process clock value will be greater
let later: u64 = (Utc::now() + chrono::Duration::weeks(4))
.timestamp_nanos()
.try_into()
.unwrap();
db.process_clock.inner.store(later, Ordering::SeqCst);
// Every call to next_process_clock should increment at least 1, even in this case
// where the system time will be less than the process clock
assert_eq!(
db.process_clock.next(),
ClockValue::try_from(later + 1).unwrap()
);
assert_eq!(
db.process_clock.next(),
ClockValue::try_from(later + 2).unwrap()
);
}
}