diff --git a/server/src/db.rs b/server/src/db.rs index c503ed4fd4..2d261ee80e 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -24,7 +24,7 @@ use datafusion::{ catalog::{catalog::CatalogProvider, schema::SchemaProvider}, physical_plan::SendableRecordBatchStream, }; -use entry::{ClockValue, ClockValueError, Entry, OwnedSequencedEntry, SequencedEntry}; +use entry::{Entry, OwnedSequencedEntry, SequencedEntry}; use internal_types::{arrow::sort::sort_record_batch, selection::Selection}; use lifecycle::LifecycleManager; use metrics::{KeyValue, MetricObserver, MetricObserverBuilder, MetricRegistry}; @@ -38,10 +38,10 @@ use read_buffer::Chunk as ReadBufferChunk; use snafu::{ensure, ResultExt, Snafu}; use std::{ any::Any, - convert::{TryFrom, TryInto}, + convert::TryInto, num::NonZeroUsize, sync::{ - atomic::{AtomicU64, AtomicUsize, Ordering}, + atomic::{AtomicUsize, Ordering}, Arc, }, }; @@ -52,6 +52,7 @@ pub mod catalog; mod chunk; mod lifecycle; pub mod pred; +mod process_clock; mod streams; mod system_tables; @@ -198,16 +199,11 @@ pub enum Error { source: internal_types::schema::Error, }, - #[snafu(display("Invalid Clock Value: {}", source))] - InvalidClockValue { source: ClockValueError }, - #[snafu(display("Error sending Sequenced Entry to Write Buffer: {}", source))] WriteBufferError { source: buffer::Error }, } pub type Result = std::result::Result; -const STARTING_SEQUENCE: u64 = 1; - /// This is the main IOx Database object. It is the root object of any /// specific InfluxDB IOx instance /// @@ -306,8 +302,10 @@ pub struct Db { /// The system schema provider system_tables: Arc, - /// Used to allocated sequence numbers for writes - sequence: AtomicU64, + /// Process clock used in establishing a partial ordering of operations via a Lamport Clock. + /// + /// Value is nanoseconds since the Unix Epoch. + process_clock: process_clock::ProcessClock, /// Number of iterations of the worker loop for this Db worker_iterations: AtomicUsize, @@ -471,6 +469,8 @@ impl Db { SystemSchemaProvider::new(&db_name, Arc::clone(&catalog), Arc::clone(&jobs)); let system_tables = Arc::new(system_tables); + let process_clock = process_clock::ProcessClock::new(); + Self { rules, server_id, @@ -483,7 +483,7 @@ impl Db { metrics_registry: metrics, system_tables, memory_registries, - sequence: AtomicU64::new(STARTING_SEQUENCE), + process_clock, worker_iterations: AtomicUsize::new(0), } } @@ -1057,11 +1057,6 @@ impl Db { tracker } - /// Returns the next write sequence number - pub fn next_sequence(&self) -> u64 { - self.sequence.fetch_add(1, Ordering::SeqCst) - } - /// Return chunk summary information for all chunks in the specified /// partition across all storage systems pub fn partition_chunk_summaries(&self, partition_key: &str) -> Vec { @@ -1132,7 +1127,7 @@ impl Db { pub fn store_entry(&self, entry: Entry) -> Result<()> { let sequenced_entry = Arc::new( OwnedSequencedEntry::new_from_entry_bytes( - ClockValue::try_from(self.next_sequence()).context(InvalidClockValue)?, + self.process_clock.next(), self.server_id, entry.data(), ) @@ -1381,8 +1376,8 @@ mod tests { type Error = Box; type Result = std::result::Result; - #[tokio::test] - async fn write_no_mutable_buffer() { + #[test] + fn write_no_mutable_buffer() { // Validate that writes are rejected if there is no mutable buffer let db = make_db().db; db.rules.write().lifecycle_rules.immutable = true; @@ -2040,8 +2035,8 @@ mod tests { assert_batches_eq!(expected, &record_batches); } - #[tokio::test] - async fn write_updates_last_write_at() { + #[test] + fn write_updates_last_write_at() { let db = Arc::new(make_db().db); let before_create = Utc::now(); @@ -2105,8 +2100,8 @@ mod tests { assert!(chunk.time_closed().unwrap() < after_rollover); } - #[tokio::test] - async fn test_chunk_closing() { + #[test] + fn test_chunk_closing() { let db = Arc::new(make_db().db); db.rules.write().lifecycle_rules.mutable_size_threshold = Some(NonZeroUsize::new(2).unwrap()); @@ -2126,8 +2121,8 @@ mod tests { assert!(matches!(chunks[1].read().state(), ChunkState::Closed(_))); } - #[tokio::test] - async fn chunks_sorted_by_times() { + #[test] + fn chunks_sorted_by_times() { let db = Arc::new(make_db().db); write_lp(&db, "cpu val=1 1"); write_lp(&db, "mem val=2 400000000000001"); @@ -2648,8 +2643,8 @@ mod tests { assert_eq!(read_parquet_file_chunk_ids(&db, partition_key), vec![0]); } - #[tokio::test] - async fn write_hard_limit() { + #[test] + fn write_hard_limit() { let db = Arc::new(make_db().db); db.rules.write().lifecycle_rules.buffer_size_hard = Some(NonZeroUsize::new(10).unwrap()); @@ -2663,8 +2658,8 @@ mod tests { )); } - #[tokio::test] - async fn write_goes_to_write_buffer_if_configured() { + #[test] + fn write_goes_to_write_buffer_if_configured() { let db = Arc::new(TestDb::builder().write_buffer(true).build().db); assert_eq!(db.write_buffer.as_ref().unwrap().lock().size(), 0); diff --git a/server/src/db/process_clock.rs b/server/src/db/process_clock.rs new file mode 100644 index 0000000000..3e173cfbb1 --- /dev/null +++ b/server/src/db/process_clock.rs @@ -0,0 +1,215 @@ +//! Process clock used in establishing a partial ordering of operations via a Lamport Clock. + +use chrono::Utc; +use entry::ClockValue; +use std::{ + convert::{TryFrom, TryInto}, + sync::atomic::{AtomicU64, Ordering}, +}; + +#[derive(Debug)] +pub struct ProcessClock { + inner: AtomicU64, +} + +impl ProcessClock { + /// Create a new process clock value initialized to the current system time. + pub fn new() -> Self { + Self { + inner: AtomicU64::new(system_clock_now()), + } + } + + /// Returns the next process clock value, which will be the maximum of the system time in + /// nanoseconds or the previous process clock value plus 1. Every operation that needs a + /// process clock value should be incrementing it as well, so there should never be a read of + /// the process clock without an accompanying increment of at least 1 nanosecond. + /// + /// We expect that updates to the process clock are not so frequent and the system is slow + /// enough that the returned value will be incremented by at least 1. + pub fn next(&self) -> ClockValue { + let next = loop { + if let Ok(next) = self.try_update() { + break next; + } + }; + + ClockValue::try_from(next).expect("process clock should not be 0") + } + + fn try_update(&self) -> Result { + let now = system_clock_now(); + let current_process_clock = self.inner.load(Ordering::SeqCst); + let next_candidate = current_process_clock + 1; + + let next = now.max(next_candidate); + + self.inner + .compare_exchange( + current_process_clock, + next, + Ordering::SeqCst, + Ordering::SeqCst, + ) + .map(|_| next) + } +} + +// Convenience function for getting the current time in a `u64` represented as nanoseconds since +// the epoch +// +// While this might jump backwards, the logic above that takes the maximum of the current process +// clock and the value returned from this function should ensure that the process clock is +// strictly increasing. +fn system_clock_now() -> u64 { + Utc::now() + .timestamp_nanos() + .try_into() + .expect("current time since the epoch should be positive") +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::query_tests::utils::TestDb; + use entry::test_helpers::lp_to_entry; + use std::{sync::Arc, thread, time::Duration}; + + #[test] + fn process_clock_defaults_to_current_time_in_ns() { + let before = system_clock_now(); + + let db = Arc::new(TestDb::builder().build().db); + let db_process_clock = db.process_clock.inner.load(Ordering::SeqCst); + + let after = system_clock_now(); + + assert!( + before < db_process_clock, + "expected {} to be less than {}", + before, + db_process_clock + ); + assert!( + db_process_clock < after, + "expected {} to be less than {}", + db_process_clock, + after + ); + } + + #[test] + fn process_clock_incremented_and_set_on_sequenced_entry() { + let before = system_clock_now(); + let before = ClockValue::try_from(before).unwrap(); + + let db = Arc::new(TestDb::builder().write_buffer(true).build().db); + + let entry = lp_to_entry("cpu bar=1 10"); + db.store_entry(entry).unwrap(); + + let between = system_clock_now(); + let between = ClockValue::try_from(between).unwrap(); + + let entry = lp_to_entry("cpu foo=2 10"); + db.store_entry(entry).unwrap(); + + let after = system_clock_now(); + let after = ClockValue::try_from(after).unwrap(); + + let sequenced_entries = db + .write_buffer + .as_ref() + .unwrap() + .lock() + .writes_since(before); + assert_eq!(sequenced_entries.len(), 2); + + assert!( + sequenced_entries[0].clock_value() < between, + "expected {:?} to be before {:?}", + sequenced_entries[0].clock_value(), + between + ); + + assert!( + between < sequenced_entries[1].clock_value(), + "expected {:?} to be before {:?}", + between, + sequenced_entries[1].clock_value(), + ); + + assert!( + sequenced_entries[1].clock_value() < after, + "expected {:?} to be before {:?}", + sequenced_entries[1].clock_value(), + after + ); + } + + #[test] + fn next_process_clock_always_increments() { + // Process clock defaults to the current time + let db = Arc::new(TestDb::builder().write_buffer(true).build().db); + + // Set the process clock value to a time in the future, so that when compared to the + // current time, the process clock value will be greater + let later: u64 = (Utc::now() + chrono::Duration::weeks(4)) + .timestamp_nanos() + .try_into() + .unwrap(); + + db.process_clock.inner.store(later, Ordering::SeqCst); + + // Every call to next_process_clock should increment at least 1, even in this case + // where the system time will be less than the process clock + assert_eq!( + db.process_clock.next(), + ClockValue::try_from(later + 1).unwrap() + ); + assert_eq!( + db.process_clock.next(), + ClockValue::try_from(later + 2).unwrap() + ); + } + + #[test] + fn process_clock_multithreaded_access_always_increments() { + let pc = Arc::new(ProcessClock::new()); + + let handles: Vec<_> = (0..10) + .map(|thread_num| { + let pc = Arc::clone(&pc); + thread::spawn(move || { + let mut pc_val_before = pc.next(); + for iteration in 0..10 { + let pc_val_after = pc.next(); + + // This might be useful for debugging if this test fails + println!( + "thread {} in iteration {} testing {:?} < {:?}", + thread_num, iteration, pc_val_before, pc_val_after + ); + + // Process clock should always increase + assert!( + pc_val_before < pc_val_after, + "expected {:?} to be less than {:?}", + pc_val_before, + pc_val_after + ); + + pc_val_before = pc_val_after; + + // encourage yielding + thread::sleep(Duration::from_millis(1)); + } + }) + }) + .collect(); + + for h in handles { + h.join().unwrap(); + } + } +}