Merge pull request #1444 from influxdata/cn/wb-clock

feat: Add a process clock to Db and use it for Sequenced Entries
pull/24376/head
kodiakhq[bot] 2021-05-14 13:04:26 +00:00 committed by GitHub
commit d98d66e718
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 239 additions and 29 deletions

View File

@ -24,7 +24,7 @@ use datafusion::{
catalog::{catalog::CatalogProvider, schema::SchemaProvider},
physical_plan::SendableRecordBatchStream,
};
use entry::{ClockValue, ClockValueError, Entry, OwnedSequencedEntry, SequencedEntry};
use entry::{Entry, OwnedSequencedEntry, SequencedEntry};
use internal_types::{arrow::sort::sort_record_batch, selection::Selection};
use lifecycle::LifecycleManager;
use metrics::{KeyValue, MetricObserver, MetricObserverBuilder, MetricRegistry};
@ -38,10 +38,10 @@ use read_buffer::Chunk as ReadBufferChunk;
use snafu::{ensure, ResultExt, Snafu};
use std::{
any::Any,
convert::{TryFrom, TryInto},
convert::TryInto,
num::NonZeroUsize,
sync::{
atomic::{AtomicU64, AtomicUsize, Ordering},
atomic::{AtomicUsize, Ordering},
Arc,
},
};
@ -52,6 +52,7 @@ pub mod catalog;
mod chunk;
mod lifecycle;
pub mod pred;
mod process_clock;
mod streams;
mod system_tables;
@ -198,16 +199,11 @@ pub enum Error {
source: internal_types::schema::Error,
},
#[snafu(display("Invalid Clock Value: {}", source))]
InvalidClockValue { source: ClockValueError },
#[snafu(display("Error sending Sequenced Entry to Write Buffer: {}", source))]
WriteBufferError { source: buffer::Error },
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
const STARTING_SEQUENCE: u64 = 1;
/// This is the main IOx Database object. It is the root object of any
/// specific InfluxDB IOx instance
///
@ -306,8 +302,10 @@ pub struct Db {
/// The system schema provider
system_tables: Arc<SystemSchemaProvider>,
/// Used to allocated sequence numbers for writes
sequence: AtomicU64,
/// Process clock used in establishing a partial ordering of operations via a Lamport Clock.
///
/// Value is nanoseconds since the Unix Epoch.
process_clock: process_clock::ProcessClock,
/// Number of iterations of the worker loop for this Db
worker_iterations: AtomicUsize,
@ -471,6 +469,8 @@ impl Db {
SystemSchemaProvider::new(&db_name, Arc::clone(&catalog), Arc::clone(&jobs));
let system_tables = Arc::new(system_tables);
let process_clock = process_clock::ProcessClock::new();
Self {
rules,
server_id,
@ -483,7 +483,7 @@ impl Db {
metrics_registry: metrics,
system_tables,
memory_registries,
sequence: AtomicU64::new(STARTING_SEQUENCE),
process_clock,
worker_iterations: AtomicUsize::new(0),
}
}
@ -1057,11 +1057,6 @@ impl Db {
tracker
}
/// Returns the next write sequence number
pub fn next_sequence(&self) -> u64 {
self.sequence.fetch_add(1, Ordering::SeqCst)
}
/// Return chunk summary information for all chunks in the specified
/// partition across all storage systems
pub fn partition_chunk_summaries(&self, partition_key: &str) -> Vec<ChunkSummary> {
@ -1132,7 +1127,7 @@ impl Db {
pub fn store_entry(&self, entry: Entry) -> Result<()> {
let sequenced_entry = Arc::new(
OwnedSequencedEntry::new_from_entry_bytes(
ClockValue::try_from(self.next_sequence()).context(InvalidClockValue)?,
self.process_clock.next(),
self.server_id,
entry.data(),
)
@ -1381,8 +1376,8 @@ mod tests {
type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
type Result<T, E = Error> = std::result::Result<T, E>;
#[tokio::test]
async fn write_no_mutable_buffer() {
#[test]
fn write_no_mutable_buffer() {
// Validate that writes are rejected if there is no mutable buffer
let db = make_db().db;
db.rules.write().lifecycle_rules.immutable = true;
@ -2040,8 +2035,8 @@ mod tests {
assert_batches_eq!(expected, &record_batches);
}
#[tokio::test]
async fn write_updates_last_write_at() {
#[test]
fn write_updates_last_write_at() {
let db = Arc::new(make_db().db);
let before_create = Utc::now();
@ -2105,8 +2100,8 @@ mod tests {
assert!(chunk.time_closed().unwrap() < after_rollover);
}
#[tokio::test]
async fn test_chunk_closing() {
#[test]
fn test_chunk_closing() {
let db = Arc::new(make_db().db);
db.rules.write().lifecycle_rules.mutable_size_threshold =
Some(NonZeroUsize::new(2).unwrap());
@ -2126,8 +2121,8 @@ mod tests {
assert!(matches!(chunks[1].read().state(), ChunkState::Closed(_)));
}
#[tokio::test]
async fn chunks_sorted_by_times() {
#[test]
fn chunks_sorted_by_times() {
let db = Arc::new(make_db().db);
write_lp(&db, "cpu val=1 1");
write_lp(&db, "mem val=2 400000000000001");
@ -2648,8 +2643,8 @@ mod tests {
assert_eq!(read_parquet_file_chunk_ids(&db, partition_key), vec![0]);
}
#[tokio::test]
async fn write_hard_limit() {
#[test]
fn write_hard_limit() {
let db = Arc::new(make_db().db);
db.rules.write().lifecycle_rules.buffer_size_hard = Some(NonZeroUsize::new(10).unwrap());
@ -2663,8 +2658,8 @@ mod tests {
));
}
#[tokio::test]
async fn write_goes_to_write_buffer_if_configured() {
#[test]
fn write_goes_to_write_buffer_if_configured() {
let db = Arc::new(TestDb::builder().write_buffer(true).build().db);
assert_eq!(db.write_buffer.as_ref().unwrap().lock().size(), 0);

View File

@ -0,0 +1,215 @@
//! Process clock used in establishing a partial ordering of operations via a Lamport Clock.
use chrono::Utc;
use entry::ClockValue;
use std::{
convert::{TryFrom, TryInto},
sync::atomic::{AtomicU64, Ordering},
};
#[derive(Debug)]
pub struct ProcessClock {
inner: AtomicU64,
}
impl ProcessClock {
/// Create a new process clock value initialized to the current system time.
pub fn new() -> Self {
Self {
inner: AtomicU64::new(system_clock_now()),
}
}
/// Returns the next process clock value, which will be the maximum of the system time in
/// nanoseconds or the previous process clock value plus 1. Every operation that needs a
/// process clock value should be incrementing it as well, so there should never be a read of
/// the process clock without an accompanying increment of at least 1 nanosecond.
///
/// We expect that updates to the process clock are not so frequent and the system is slow
/// enough that the returned value will be incremented by at least 1.
pub fn next(&self) -> ClockValue {
let next = loop {
if let Ok(next) = self.try_update() {
break next;
}
};
ClockValue::try_from(next).expect("process clock should not be 0")
}
fn try_update(&self) -> Result<u64, u64> {
let now = system_clock_now();
let current_process_clock = self.inner.load(Ordering::SeqCst);
let next_candidate = current_process_clock + 1;
let next = now.max(next_candidate);
self.inner
.compare_exchange(
current_process_clock,
next,
Ordering::SeqCst,
Ordering::SeqCst,
)
.map(|_| next)
}
}
// Convenience function for getting the current time in a `u64` represented as nanoseconds since
// the epoch
//
// While this might jump backwards, the logic above that takes the maximum of the current process
// clock and the value returned from this function should ensure that the process clock is
// strictly increasing.
fn system_clock_now() -> u64 {
Utc::now()
.timestamp_nanos()
.try_into()
.expect("current time since the epoch should be positive")
}
#[cfg(test)]
mod tests {
use super::*;
use crate::query_tests::utils::TestDb;
use entry::test_helpers::lp_to_entry;
use std::{sync::Arc, thread, time::Duration};
#[test]
fn process_clock_defaults_to_current_time_in_ns() {
let before = system_clock_now();
let db = Arc::new(TestDb::builder().build().db);
let db_process_clock = db.process_clock.inner.load(Ordering::SeqCst);
let after = system_clock_now();
assert!(
before < db_process_clock,
"expected {} to be less than {}",
before,
db_process_clock
);
assert!(
db_process_clock < after,
"expected {} to be less than {}",
db_process_clock,
after
);
}
#[test]
fn process_clock_incremented_and_set_on_sequenced_entry() {
let before = system_clock_now();
let before = ClockValue::try_from(before).unwrap();
let db = Arc::new(TestDb::builder().write_buffer(true).build().db);
let entry = lp_to_entry("cpu bar=1 10");
db.store_entry(entry).unwrap();
let between = system_clock_now();
let between = ClockValue::try_from(between).unwrap();
let entry = lp_to_entry("cpu foo=2 10");
db.store_entry(entry).unwrap();
let after = system_clock_now();
let after = ClockValue::try_from(after).unwrap();
let sequenced_entries = db
.write_buffer
.as_ref()
.unwrap()
.lock()
.writes_since(before);
assert_eq!(sequenced_entries.len(), 2);
assert!(
sequenced_entries[0].clock_value() < between,
"expected {:?} to be before {:?}",
sequenced_entries[0].clock_value(),
between
);
assert!(
between < sequenced_entries[1].clock_value(),
"expected {:?} to be before {:?}",
between,
sequenced_entries[1].clock_value(),
);
assert!(
sequenced_entries[1].clock_value() < after,
"expected {:?} to be before {:?}",
sequenced_entries[1].clock_value(),
after
);
}
#[test]
fn next_process_clock_always_increments() {
// Process clock defaults to the current time
let db = Arc::new(TestDb::builder().write_buffer(true).build().db);
// Set the process clock value to a time in the future, so that when compared to the
// current time, the process clock value will be greater
let later: u64 = (Utc::now() + chrono::Duration::weeks(4))
.timestamp_nanos()
.try_into()
.unwrap();
db.process_clock.inner.store(later, Ordering::SeqCst);
// Every call to next_process_clock should increment at least 1, even in this case
// where the system time will be less than the process clock
assert_eq!(
db.process_clock.next(),
ClockValue::try_from(later + 1).unwrap()
);
assert_eq!(
db.process_clock.next(),
ClockValue::try_from(later + 2).unwrap()
);
}
#[test]
fn process_clock_multithreaded_access_always_increments() {
let pc = Arc::new(ProcessClock::new());
let handles: Vec<_> = (0..10)
.map(|thread_num| {
let pc = Arc::clone(&pc);
thread::spawn(move || {
let mut pc_val_before = pc.next();
for iteration in 0..10 {
let pc_val_after = pc.next();
// This might be useful for debugging if this test fails
println!(
"thread {} in iteration {} testing {:?} < {:?}",
thread_num, iteration, pc_val_before, pc_val_after
);
// Process clock should always increase
assert!(
pc_val_before < pc_val_after,
"expected {:?} to be less than {:?}",
pc_val_before,
pc_val_after
);
pc_val_before = pc_val_after;
// encourage yielding
thread::sleep(Duration::from_millis(1));
}
})
})
.collect();
for h in handles {
h.join().unwrap();
}
}
}