Merge pull request #2168 from influxdata/crepererum/compact_while_replay2

fix: compaction during replay, hard buffer limits during Kafka ingest
pull/24376/head
kodiakhq[bot] 2021-08-04 10:12:00 +00:00 committed by GitHub
commit 2727ccfbe3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 483 additions and 112 deletions

View File

@ -37,20 +37,44 @@ where
/// Background tasks spawned by this `LifecyclePolicy` /// Background tasks spawned by this `LifecyclePolicy`
trackers: Vec<TaskTracker<ChunkLifecycleAction>>, trackers: Vec<TaskTracker<ChunkLifecycleAction>>,
/// Do not allow persistence even when the database rules would allow that.
///
/// This can be helpful during some phases of the database startup process.
suppress_persistence: bool,
} }
impl<M> LifecyclePolicy<M> impl<M> LifecyclePolicy<M>
where where
M: LifecycleDb, M: LifecycleDb,
{ {
/// Create new policy.
///
/// Persistence is allowed if the database rules allow it.
pub fn new(db: M) -> Self { pub fn new(db: M) -> Self {
Self { Self {
db, db,
trackers: vec![], trackers: vec![],
active_compactions: 0, active_compactions: 0,
suppress_persistence: false,
} }
} }
/// Create new policy that suppresses persistence even when the database rules allow it.
pub fn new_suppress_persistence(db: M) -> Self {
Self {
db,
trackers: vec![],
active_compactions: 0,
suppress_persistence: true,
}
}
/// Stop suppressing persistence and allow it if the database rules allow it.
pub fn unsuppress_persistence(&mut self) {
self.suppress_persistence = false;
}
/// Check if database exceeds memory limits and free memory if necessary /// Check if database exceeds memory limits and free memory if necessary
/// ///
/// The policy will first try to unload persisted chunks in order of creation /// The policy will first try to unload persisted chunks in order of creation
@ -485,7 +509,7 @@ where
// if the criteria for persistence have been satisfied, // if the criteria for persistence have been satisfied,
// but persistence cannot proceed because of in-progress // but persistence cannot proceed because of in-progress
// compactions // compactions
let stall_compaction_persisting = if rules.persist { let stall_compaction_persisting = if rules.persist && !self.suppress_persistence {
let persisting = let persisting =
self.maybe_persist_chunks(&db_name, partition, &rules, now_instant); self.maybe_persist_chunks(&db_name, partition, &rules, now_instant);
if persisting { if persisting {
@ -586,7 +610,7 @@ where
impl<M> Debug for LifecyclePolicy<M> impl<M> Debug for LifecyclePolicy<M>
where where
M: LifecycleDb + Copy, M: LifecycleDb,
{ {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "LifecyclePolicy{{..}}") write!(f, "LifecyclePolicy{{..}}")
@ -834,6 +858,12 @@ mod tests {
for chunk in &chunks { for chunk in &chunks {
partition.chunks.remove(&chunk.addr.chunk_id); partition.chunks.remove(&chunk.addr.chunk_id);
new_chunk.row_count += chunk.row_count; new_chunk.row_count += chunk.row_count;
new_chunk.min_timestamp = match (new_chunk.min_timestamp, chunk.min_timestamp) {
(Some(ts1), Some(ts2)) => Some(ts1.min(ts2)),
(Some(ts), None) => Some(ts),
(None, Some(ts)) => Some(ts),
(None, None) => None,
};
} }
partition partition
@ -1585,6 +1615,49 @@ mod tests {
); );
} }
#[test]
fn test_suppress_persistence() {
let rules = LifecycleRules {
persist: true,
persist_row_threshold: NonZeroUsize::new(1_000).unwrap(),
late_arrive_window_seconds: NonZeroU32::new(10).unwrap(),
persist_age_threshold_seconds: NonZeroU32::new(10).unwrap(),
max_active_compactions: MaxActiveCompactions(NonZeroU32::new(10).unwrap()),
..Default::default()
};
let now = Instant::now();
let partitions = vec![
// Sufficient rows => could persist but should be suppressed
TestPartition::new(vec![
TestChunk::new(2, 0, ChunkStorage::ClosedMutableBuffer)
.with_min_timestamp(from_secs(10)),
TestChunk::new(3, 0, ChunkStorage::ReadBuffer).with_min_timestamp(from_secs(5)),
])
.with_persistence(1_000, now, from_secs(20)),
];
let db = TestDb::from_partitions(rules, partitions);
let mut lifecycle = LifecyclePolicy::new_suppress_persistence(&db);
lifecycle.check_for_work(from_secs(0), now);
assert_eq!(*db.events.read(), vec![MoverEvents::Compact(vec![2, 3]),]);
lifecycle.check_for_work(from_secs(0), now);
assert_eq!(*db.events.read(), vec![MoverEvents::Compact(vec![2, 3]),]);
lifecycle.unsuppress_persistence();
lifecycle.check_for_work(from_secs(0), now);
assert_eq!(
*db.events.read(),
vec![
MoverEvents::Compact(vec![2, 3]),
MoverEvents::Persist(vec![4])
]
);
}
#[test] #[test]
fn test_moves_open() { fn test_moves_open() {
let rules = LifecycleRules { let rules = LifecycleRules {

View File

@ -9,6 +9,7 @@ use data_types::{
}; };
use metrics::MetricRegistry; use metrics::MetricRegistry;
use object_store::{path::ObjectStorePath, ObjectStore, ObjectStoreApi}; use object_store::{path::ObjectStorePath, ObjectStore, ObjectStoreApi};
use parking_lot::Mutex;
use parquet_file::catalog::PreservedCatalog; use parquet_file::catalog::PreservedCatalog;
use persistence_windows::checkpoint::ReplayPlan; use persistence_windows::checkpoint::ReplayPlan;
use write_buffer::config::WriteBufferConfig; use write_buffer::config::WriteBufferConfig;
@ -320,20 +321,29 @@ enum DatabaseState {
Replay { Replay {
db: Arc<Db>, db: Arc<Db>,
replay_plan: ReplayPlan, replay_plan: ReplayPlan,
background_worker_join_handle: Mutex<Option<JoinHandle<()>>>,
background_worker_shutdown: CancellationToken,
}, },
/// Fully initialized database. /// Fully initialized database.
Initialized { Initialized {
db: Arc<Db>, db: Arc<Db>,
handle: Option<JoinHandle<()>>, background_worker_join_handle: Mutex<Option<JoinHandle<()>>>,
shutdown: CancellationToken, background_worker_shutdown: CancellationToken,
}, },
} }
impl DatabaseState { impl DatabaseState {
fn join(&mut self) -> Option<JoinHandle<()>> { fn join(&mut self) -> Option<JoinHandle<()>> {
match self { match self {
DatabaseState::Initialized { handle, .. } => handle.take(), DatabaseState::Replay {
background_worker_join_handle: handle,
..
} => handle.lock().take(),
DatabaseState::Initialized {
background_worker_join_handle: handle,
..
} => handle.lock().take(),
_ => None, _ => None,
} }
} }
@ -396,15 +406,22 @@ impl DatabaseState {
impl Drop for DatabaseState { impl Drop for DatabaseState {
fn drop(&mut self) { fn drop(&mut self) {
if let DatabaseState::Initialized { if let DatabaseState::Replay {
handle, shutdown, .. background_worker_join_handle,
background_worker_shutdown,
..
}
| DatabaseState::Initialized {
background_worker_join_handle,
background_worker_shutdown,
..
} = self } = self
{ {
if handle.is_some() { if background_worker_join_handle.lock().is_some() {
// Join should be called on `DatabaseState` prior to dropping, for example, by // Join should be called on `DatabaseState` prior to dropping, for example, by
// calling drain() on the owning `Config` // calling drain() on the owning `Config`
warn!("DatabaseState dropped without waiting for background task to complete"); warn!("DatabaseState dropped without waiting for background task to complete");
shutdown.cancel(); background_worker_shutdown.cancel();
} }
} }
} }
@ -543,12 +560,31 @@ impl<'a> DatabaseHandle<'a> {
write_buffer, write_buffer,
}; };
let db = Arc::new(Db::new( let db = Db::new(database_to_commit, Arc::clone(application.job_registry()));
database_to_commit,
Arc::clone(application.job_registry()),
));
self.state = Some(Arc::new(DatabaseState::Replay { db, replay_plan })); if self.config.shutdown.is_cancelled() {
error!("server is shutting down");
return ServerShuttingDown.fail();
}
let shutdown = self.config.shutdown.child_token();
let shutdown_captured = shutdown.clone();
let db_captured = Arc::clone(&db);
let rules = db.rules();
let handle = Mutex::new(Some(tokio::spawn(async move {
db_captured
.background_worker(shutdown_captured)
.instrument(tracing::info_span!("db_worker", database=%rules.name))
.await
})));
self.state = Some(Arc::new(DatabaseState::Replay {
db,
replay_plan,
background_worker_join_handle: handle,
background_worker_shutdown: shutdown,
}));
Ok(()) Ok(())
} }
@ -563,28 +599,17 @@ impl<'a> DatabaseHandle<'a> {
/// Advance database state to [`Initialized`](DatabaseStateCode::Initialized). /// Advance database state to [`Initialized`](DatabaseStateCode::Initialized).
pub fn advance_init(&mut self) -> Result<()> { pub fn advance_init(&mut self) -> Result<()> {
match self.state().as_ref() { match self.state().as_ref() {
DatabaseState::Replay { db, .. } => { DatabaseState::Replay {
if self.config.shutdown.is_cancelled() { db,
error!("server is shutting down"); background_worker_join_handle,
return ServerShuttingDown.fail(); background_worker_shutdown,
} ..
} => {
let shutdown = self.config.shutdown.child_token(); let handle = background_worker_join_handle.lock().take();
let shutdown_captured = shutdown.clone();
let db_captured = Arc::clone(db);
let rules = db.rules();
let handle = Some(tokio::spawn(async move {
db_captured
.background_worker(shutdown_captured)
.instrument(tracing::info_span!("db_worker", database=%rules.name))
.await
}));
self.state = Some(Arc::new(DatabaseState::Initialized { self.state = Some(Arc::new(DatabaseState::Initialized {
db: Arc::clone(db), db: Arc::clone(db),
handle, background_worker_join_handle: Mutex::new(handle),
shutdown, background_worker_shutdown: background_worker_shutdown.clone(),
})); }));
Ok(()) Ok(())
@ -894,7 +919,10 @@ mod test {
.unwrap() .unwrap()
.as_ref() .as_ref()
{ {
DatabaseState::Initialized { shutdown, .. } => shutdown.clone(), DatabaseState::Initialized {
background_worker_shutdown,
..
} => background_worker_shutdown.clone(),
_ => panic!("wrong state"), _ => panic!("wrong state"),
}; };

View File

@ -11,7 +11,7 @@ use crate::{
table::TableSchemaUpsertHandle, table::TableSchemaUpsertHandle,
Catalog, TableNameFilter, Catalog, TableNameFilter,
}, },
lifecycle::{ArcDb, LockableCatalogChunk, LockableCatalogPartition}, lifecycle::{LockableCatalogChunk, LockableCatalogPartition, WeakDb},
}, },
JobRegistry, JobRegistry,
}; };
@ -45,7 +45,7 @@ use std::{
any::Any, any::Any,
collections::HashMap, collections::HashMap,
sync::{ sync::{
atomic::{AtomicUsize, Ordering}, atomic::{AtomicBool, AtomicUsize, Ordering},
Arc, Arc,
}, },
time::{Duration, Instant}, time::{Duration, Instant},
@ -367,6 +367,20 @@ pub struct Db {
/// The cleanup job needs exclusive access and hence will acquire a write-guard. Creating parquet files and creating /// The cleanup job needs exclusive access and hence will acquire a write-guard. Creating parquet files and creating
/// catalog transaction only needs shared access and hence will acquire a read-guard. /// catalog transaction only needs shared access and hence will acquire a read-guard.
cleanup_lock: Arc<tokio::sync::RwLock<()>>, cleanup_lock: Arc<tokio::sync::RwLock<()>>,
/// Lifecycle policy.
///
/// Optional because it will be created after `Arc<Self>`.
///
/// This is stored here for the following reasons:
/// - to control the persistence suppression via a [`Db::unsuppress_persistence`]
/// - to keep the lifecycle state (e.g. the number of running compactions) around
lifecycle_policy: tokio::sync::Mutex<Option<::lifecycle::LifecyclePolicy<WeakDb>>>,
/// Flag to stop background worker from reading from the write buffer.
///
/// TODO: Move write buffer read loop out of Db.
no_write_buffer_read: AtomicBool,
} }
/// All the information needed to commit a database /// All the information needed to commit a database
@ -382,7 +396,7 @@ pub(crate) struct DatabaseToCommit {
} }
impl Db { impl Db {
pub(crate) fn new(database_to_commit: DatabaseToCommit, jobs: Arc<JobRegistry>) -> Self { pub(crate) fn new(database_to_commit: DatabaseToCommit, jobs: Arc<JobRegistry>) -> Arc<Self> {
let db_name = database_to_commit.rules.name.clone(); let db_name = database_to_commit.rules.name.clone();
let rules = RwLock::new(database_to_commit.rules); let rules = RwLock::new(database_to_commit.rules);
@ -408,7 +422,7 @@ impl Db {
let process_clock = process_clock::ProcessClock::new(); let process_clock = process_clock::ProcessClock::new();
Self { let this = Self {
rules, rules,
server_id, server_id,
store, store,
@ -425,7 +439,30 @@ impl Db {
ingest_metrics, ingest_metrics,
write_buffer: database_to_commit.write_buffer, write_buffer: database_to_commit.write_buffer,
cleanup_lock: Default::default(), cleanup_lock: Default::default(),
lifecycle_policy: tokio::sync::Mutex::new(None),
no_write_buffer_read: AtomicBool::new(true),
};
let this = Arc::new(this);
*this.lifecycle_policy.try_lock().expect("not used yet") = Some(
::lifecycle::LifecyclePolicy::new_suppress_persistence(WeakDb(Arc::downgrade(&this))),
);
this
} }
/// Allow persistence if database rules all it.
pub async fn unsuppress_persistence(&self) {
let mut guard = self.lifecycle_policy.lock().await;
let policy = guard
.as_mut()
.expect("lifecycle policy should be initialized");
policy.unsuppress_persistence();
}
/// Allow continuous reads from the write buffer (if configured).
///
/// TODO: Move write buffer read loop out of Db.
pub fn allow_write_buffer_read(&self) {
self.no_write_buffer_read.store(false, Ordering::SeqCst);
} }
/// Return a handle to the executor used to run queries /// Return a handle to the executor used to run queries
@ -763,13 +800,15 @@ impl Db {
tokio::join!( tokio::join!(
// lifecycle policy loop // lifecycle policy loop
async { async {
let mut policy = ::lifecycle::LifecyclePolicy::new(ArcDb(Arc::clone(self)));
while !shutdown.is_cancelled() { while !shutdown.is_cancelled() {
self.worker_iterations_lifecycle self.worker_iterations_lifecycle
.fetch_add(1, Ordering::Relaxed); .fetch_add(1, Ordering::Relaxed);
tokio::select! { tokio::select! {
_ = policy.check_for_work(Utc::now(), std::time::Instant::now()) => {}, _ = async {
let mut guard = self.lifecycle_policy.lock().await;
let policy = guard.as_mut().expect("lifecycle policy should be initialized");
policy.check_for_work(Utc::now(), std::time::Instant::now()).await
} => {},
_ = shutdown.cancelled() => break, _ = shutdown.cancelled() => break,
} }
} }
@ -801,6 +840,16 @@ impl Db {
// streaming from the write buffer loop // streaming from the write buffer loop
async { async {
if let Some(WriteBufferConfig::Reading(write_buffer)) = &self.write_buffer { if let Some(WriteBufferConfig::Reading(write_buffer)) = &self.write_buffer {
// wait for permission
tokio::select! {
_ = async {
while self.no_write_buffer_read.load(Ordering::SeqCst) {
tokio::time::sleep(Duration::from_millis(10)).await;
}
} => {},
_ = shutdown.cancelled() => return,
}
let mut write_buffer = write_buffer let mut write_buffer = write_buffer
.try_lock() .try_lock()
.expect("no streams should exist at this point"); .expect("no streams should exist at this point");
@ -808,6 +857,7 @@ impl Db {
for (sequencer_id, stream) in write_buffer.streams() { for (sequencer_id, stream) in write_buffer.streams() {
let metrics = self.ingest_metrics.new_sequencer_metrics(sequencer_id); let metrics = self.ingest_metrics.new_sequencer_metrics(sequencer_id);
let fut = self.stream_in_sequenced_entries( let fut = self.stream_in_sequenced_entries(
sequencer_id,
stream.stream, stream.stream,
stream.fetch_high_watermark, stream.fetch_high_watermark,
metrics, metrics,
@ -830,10 +880,12 @@ impl Db {
/// streaming entries from a write buffer. /// streaming entries from a write buffer.
async fn stream_in_sequenced_entries<'a>( async fn stream_in_sequenced_entries<'a>(
&'a self, &'a self,
sequencer_id: u32,
mut stream: BoxStream<'a, Result<SequencedEntry, WriteBufferError>>, mut stream: BoxStream<'a, Result<SequencedEntry, WriteBufferError>>,
f_mark: FetchHighWatermark<'a>, f_mark: FetchHighWatermark<'a>,
mut metrics: SequencerMetrics, mut metrics: SequencerMetrics,
) { ) {
let db_name = self.rules.read().db_name().to_string();
let mut watermark_last_updated: Option<Instant> = None; let mut watermark_last_updated: Option<Instant> = None;
let mut watermark = 0; let mut watermark = 0;
@ -844,7 +896,12 @@ impl Db {
let sequenced_entry = match sequenced_entry_result { let sequenced_entry = match sequenced_entry_result {
Ok(sequenced_entry) => sequenced_entry, Ok(sequenced_entry) => sequenced_entry,
Err(e) => { Err(e) => {
debug!(?e, "Error converting write buffer data to SequencedEntry"); debug!(
%e,
%db_name,
sequencer_id,
"Error converting write buffer data to SequencedEntry",
);
red_observation.client_error(); red_observation.client_error();
continue; continue;
} }
@ -852,16 +909,38 @@ impl Db {
let sequenced_entry = Arc::new(sequenced_entry); let sequenced_entry = Arc::new(sequenced_entry);
// store entry // store entry
let mut logged_hard_limit = false;
loop {
match self.store_sequenced_entry(Arc::clone(&sequenced_entry)) { match self.store_sequenced_entry(Arc::clone(&sequenced_entry)) {
Ok(_) => { Ok(_) => {
red_observation.ok(); red_observation.ok();
break;
}
Err(Error::HardLimitReached {}) => {
// wait a bit and retry
if !logged_hard_limit {
info!(
%db_name,
sequencer_id,
"Hard limit reached while reading from write buffer, waiting for compaction to catch up",
);
logged_hard_limit = true;
}
tokio::time::sleep(Duration::from_millis(100)).await;
continue;
} }
Err(e) => { Err(e) => {
debug!( debug!(
?e, %e,
%db_name,
sequencer_id,
"Error storing SequencedEntry from write buffer in database" "Error storing SequencedEntry from write buffer in database"
); );
red_observation.error(); red_observation.error();
// no retry
break;
}
} }
} }
@ -877,7 +956,12 @@ impl Db {
watermark = w; watermark = w;
} }
Err(e) => { Err(e) => {
debug!(%e, "Error while reading sequencer watermark") debug!(
%e,
%db_name,
sequencer_id,
"Error while reading sequencer watermark",
)
} }
} }
watermark_last_updated = Some(Instant::now()); watermark_last_updated = Some(Instant::now());
@ -1352,7 +1436,7 @@ mod tests {
}, },
utils::{make_db, TestDb}, utils::{make_db, TestDb},
}; };
use ::test_helpers::assert_contains; use ::test_helpers::{assert_contains, tracing::TracingCapture};
use arrow::record_batch::RecordBatch; use arrow::record_batch::RecordBatch;
use arrow_util::{assert_batches_eq, assert_batches_sorted_eq}; use arrow_util::{assert_batches_eq, assert_batches_sorted_eq};
use bytes::Bytes; use bytes::Bytes;
@ -1522,6 +1606,7 @@ mod tests {
let db_captured = Arc::clone(&db); let db_captured = Arc::clone(&db);
let join_handle = let join_handle =
tokio::spawn(async move { db_captured.background_worker(shutdown_captured).await }); tokio::spawn(async move { db_captured.background_worker(shutdown_captured).await });
db.allow_write_buffer_read();
let query = "select * from cpu"; let query = "select * from cpu";
@ -1636,6 +1721,82 @@ mod tests {
assert_batches_eq!(expected, &batches); assert_batches_eq!(expected, &batches);
} }
#[tokio::test]
async fn write_buffer_reads_wait_for_compaction() {
let tracing_capture = TracingCapture::new();
// setup write buffer
// these numbers are handtuned to trigger hard buffer limits w/o making the test too big
let n_entries = 50u64;
let write_buffer_state = MockBufferSharedState::empty_with_n_sequencers(1);
for sequence_number in 0..n_entries {
let lp = format!(
"table_1,tag_partition_by=a foo=\"hello\",bar=1 {}",
sequence_number / 2
);
write_buffer_state.push_entry(SequencedEntry::new_from_sequence(
Sequence::new(0, sequence_number),
Utc::now(),
lp_to_entry(&lp),
));
}
write_buffer_state.push_entry(SequencedEntry::new_from_sequence(
Sequence::new(0, n_entries),
Utc::now(),
lp_to_entry("table_2,partition_by=a foo=1 0"),
));
let write_buffer = MockBufferForReading::new(write_buffer_state);
// create DB
let partition_template = PartitionTemplate {
parts: vec![TemplatePart::Column("tag_partition_by".to_string())],
};
let test_db = TestDb::builder()
.write_buffer(WriteBufferConfig::Reading(Arc::new(
tokio::sync::Mutex::new(Box::new(write_buffer) as _),
)))
.lifecycle_rules(data_types::database_rules::LifecycleRules {
buffer_size_hard: Some(NonZeroUsize::new(10_000).unwrap()),
mub_row_threshold: NonZeroUsize::new(10).unwrap(),
..Default::default()
})
.partition_template(partition_template)
.build()
.await;
let db = test_db.db;
// start background task loop
let shutdown: CancellationToken = Default::default();
let shutdown_captured = shutdown.clone();
let db_captured = Arc::clone(&db);
let join_handle =
tokio::spawn(async move { db_captured.background_worker(shutdown_captured).await });
db.allow_write_buffer_read();
// after a while the table should exist
let t_0 = Instant::now();
loop {
if db.table_schema("table_2").is_some() {
break;
}
assert!(t_0.elapsed() < Duration::from_secs(10));
tokio::time::sleep(Duration::from_millis(100)).await;
}
// do: stop background task loop
shutdown.cancel();
join_handle.await.unwrap();
// no rows should be dropped
let batches = run_query(db, "select sum(bar) as n from table_1").await;
let expected = vec!["+----+", "| n |", "+----+", "| 25 |", "+----+"];
assert_batches_eq!(expected, &batches);
// check that hard buffer limit was actually hit (otherwise this test is pointless/outdated)
assert_contains!(tracing_capture.to_string(), "Hard limit reached while reading from write buffer, waiting for compaction to catch up");
}
#[tokio::test] #[tokio::test]
async fn error_converting_data_from_write_buffer_to_sequenced_entry_is_reported() { async fn error_converting_data_from_write_buffer_to_sequenced_entry_is_reported() {
let write_buffer_state = MockBufferSharedState::empty_with_n_sequencers(1); let write_buffer_state = MockBufferSharedState::empty_with_n_sequencers(1);
@ -1661,6 +1822,7 @@ mod tests {
let db_captured = Arc::clone(&db); let db_captured = Arc::clone(&db);
let join_handle = let join_handle =
tokio::spawn(async move { db_captured.background_worker(shutdown_captured).await }); tokio::spawn(async move { db_captured.background_worker(shutdown_captured).await });
db.allow_write_buffer_read();
// check: after a while the error should be reported in the database's metrics // check: after a while the error should be reported in the database's metrics
let t_0 = Instant::now(); let t_0 = Instant::now();
@ -2770,6 +2932,7 @@ mod tests {
let db_captured = Arc::clone(&db); let db_captured = Arc::clone(&db);
let join_handle = let join_handle =
tokio::spawn(async move { db_captured.background_worker(shutdown_captured).await }); tokio::spawn(async move { db_captured.background_worker(shutdown_captured).await });
db.allow_write_buffer_read();
// check: after a while the persistence windows should have the expected data // check: after a while the persistence windows should have the expected data
let t_0 = Instant::now(); let t_0 = Instant::now();

View File

@ -25,7 +25,12 @@ use lifecycle::{
use observability_deps::tracing::{info, trace}; use observability_deps::tracing::{info, trace};
use persistence_windows::persistence_windows::FlushHandle; use persistence_windows::persistence_windows::FlushHandle;
use query::QueryChunkMeta; use query::QueryChunkMeta;
use std::{fmt::Display, sync::Arc, time::Instant}; use std::{
convert::TryInto,
fmt::Display,
sync::{Arc, Weak},
time::Instant,
};
use tracker::{RwLock, TaskTracker}; use tracker::{RwLock, TaskTracker};
pub(crate) use compact::compact_chunks; pub(crate) use compact::compact_chunks;
@ -43,17 +48,9 @@ mod persist;
mod unload; mod unload;
mod write; mod write;
/// A newtype wrapper around `Arc<Db>` to workaround trait orphan rules /// A newtype wrapper around `Weak<Db>` to workaround trait orphan rules
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct ArcDb(pub(super) Arc<Db>); pub struct WeakDb(pub(super) Weak<Db>);
impl std::ops::Deref for ArcDb {
type Target = Db;
fn deref(&self) -> &Self::Target {
&self.0
}
}
/// ///
/// A `LockableCatalogChunk` combines a `CatalogChunk` with its owning `Db` /// A `LockableCatalogChunk` combines a `CatalogChunk` with its owning `Db`
@ -232,28 +229,42 @@ impl LockablePartition for LockableCatalogPartition {
} }
} }
impl LifecycleDb for ArcDb { impl LifecycleDb for WeakDb {
type Chunk = LockableCatalogChunk; type Chunk = LockableCatalogChunk;
type Partition = LockableCatalogPartition; type Partition = LockableCatalogPartition;
fn buffer_size(&self) -> usize { fn buffer_size(&self) -> usize {
self.catalog.metrics().memory().total() self.0
.upgrade()
.map(|db| db.catalog.metrics().memory().total())
.unwrap_or_default()
} }
fn rules(&self) -> LifecycleRules { fn rules(&self) -> LifecycleRules {
self.rules.read().lifecycle_rules.clone() self.0
.upgrade()
.map(|db| db.rules.read().lifecycle_rules.clone())
.unwrap_or_default()
} }
fn partitions(&self) -> Vec<Self::Partition> { fn partitions(&self) -> Vec<Self::Partition> {
self.catalog self.0
.upgrade()
.map(|db| {
db.catalog
.partitions() .partitions()
.into_iter() .into_iter()
.map(|partition| LockableCatalogPartition::new(Arc::clone(&self.0), partition)) .map(|partition| LockableCatalogPartition::new(Arc::clone(&db), partition))
.collect() .collect()
})
.unwrap_or_default()
} }
fn name(&self) -> DatabaseName<'static> { fn name(&self) -> DatabaseName<'static> {
self.rules.read().name.clone() self.0
.upgrade()
.map(|db| db.rules.read().name.clone())
.unwrap_or_else(|| "gone".to_string().try_into().unwrap())
} }
} }

View File

@ -1,6 +1,7 @@
use std::{ use std::{
collections::{BTreeMap, BTreeSet}, collections::{BTreeMap, BTreeSet},
sync::Arc, sync::Arc,
time::Duration,
}; };
use futures::TryStreamExt; use futures::TryStreamExt;
@ -187,9 +188,35 @@ pub async fn perform_replay(db: &Db, replay_plan: &ReplayPlan) -> Result<()> {
} }
let entry = Arc::new(entry); let entry = Arc::new(entry);
db.store_sequenced_entry(entry) let mut logged_hard_limit = false;
.map_err(Box::new) let n_tries = 600; // 600*100ms = 60s
.context(StoreError { sequencer_id })?; for n_try in 1..=n_tries {
match db.store_sequenced_entry(Arc::clone(&entry)) {
Ok(_) => {
break;
}
Err(crate::db::Error::HardLimitReached {}) if n_try < n_tries => {
if !logged_hard_limit {
info!(
%db_name,
sequencer_id,
n_try,
n_tries,
"Hard limit reached while replaying, waiting for compaction to catch up",
);
logged_hard_limit = true;
}
tokio::time::sleep(Duration::from_millis(100)).await;
continue;
}
Err(e) => {
return Err(Error::StoreError {
sequencer_id,
source: Box::new(e),
});
}
}
}
// done replaying? // done replaying?
if sequence.number == min_max.max() { if sequence.number == min_max.max() {
@ -209,7 +236,7 @@ mod tests {
use std::{ use std::{
convert::TryFrom, convert::TryFrom,
num::NonZeroU32, num::{NonZeroU32, NonZeroUsize},
sync::Arc, sync::Arc,
time::{Duration, Instant}, time::{Duration, Instant},
}; };
@ -230,7 +257,8 @@ mod tests {
min_max_sequence::OptionalMinMaxSequence, min_max_sequence::OptionalMinMaxSequence,
}; };
use query::{exec::ExecutorType, frontend::sql::SqlQueryPlanner, QueryChunk}; use query::{exec::ExecutorType, frontend::sql::SqlQueryPlanner, QueryChunk};
use test_helpers::assert_contains; use test_helpers::{assert_contains, tracing::TracingCapture};
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use write_buffer::{ use write_buffer::{
config::WriteBufferConfig, config::WriteBufferConfig,
@ -286,6 +314,9 @@ mod tests {
Ingest(Vec<TestSequencedEntry>), Ingest(Vec<TestSequencedEntry>),
/// Restart DB /// Restart DB
///
/// Background loop is started as well but neither persistence nor write buffer reads are allowed until
/// [`Await`](Self::Await) is used.
Restart, Restart,
/// Perform replay /// Perform replay
@ -301,7 +332,7 @@ mod tests {
/// Wait that background loop generates desired state (all checks pass). /// Wait that background loop generates desired state (all checks pass).
/// ///
/// Background loop is started before the check and stopped afterwards. /// Persistence and write buffer reads are enabled in preparation to this step.
Await(Vec<Check>), Await(Vec<Check>),
} }
@ -313,17 +344,13 @@ mod tests {
/// What to do in which order. /// What to do in which order.
/// ///
/// # Serialization /// # Serialization
/// The execution of the entire test is purely serial with the exception of [`Await`](Step::Await) (see /// Every step is finished and
/// next section). That means that nothing happens concurrently during each step. Every step is finished and
/// checked for errors before the next is started (e.g. [`Replay`](Step::Replay) is fully executed and /// checked for errors before the next is started (e.g. [`Replay`](Step::Replay) is fully executed and
/// it is ensured that there were no errors before a subsequent [`Assert`](Step::Assert) is evaluated). /// it is ensured that there were no errors before a subsequent [`Assert`](Step::Assert) is evaluated).
/// The database background worker is NOT active during any non-[`Await`](Step::Await)
/// ///
/// # Await /// # Await / Background Worker
/// Sometimes the background worker is needed to perform something, e.g. to consume some data from the write /// The database background worker is started during when the DB is created but persistence and reads from the
/// buffer. In that case [`Await`](Step::Await) can be used. During this check (and only during this /// write buffer are disabled until [`Await`](Step::Await) is used.
/// check) the background worker is active and the checks passed to [`Await`](Step::Await) are
/// evaluated until they succeed. The background worker is stopped before the next test step is evaluated.
steps: Vec<Step>, steps: Vec<Step>,
} }
@ -340,7 +367,7 @@ mod tests {
}; };
let write_buffer_state = let write_buffer_state =
MockBufferSharedState::empty_with_n_sequencers(self.n_sequencers); MockBufferSharedState::empty_with_n_sequencers(self.n_sequencers);
let mut test_db = Self::create_test_db( let (mut test_db, mut shutdown, mut join_handle) = Self::create_test_db(
Arc::clone(&object_store), Arc::clone(&object_store),
write_buffer_state.clone(), write_buffer_state.clone(),
server_id, server_id,
@ -360,11 +387,15 @@ mod tests {
} }
} }
Step::Restart => { Step::Restart => {
// first drop old DB // stop background worker
shutdown.cancel();
join_handle.await.unwrap();
// drop old DB
drop(test_db); drop(test_db);
// then create new one // then create new one
test_db = Self::create_test_db( let (test_db_tmp, shutdown_tmp, join_handle_tmp) = Self::create_test_db(
Arc::clone(&object_store), Arc::clone(&object_store),
write_buffer_state.clone(), write_buffer_state.clone(),
server_id, server_id,
@ -372,11 +403,14 @@ mod tests {
partition_template.clone(), partition_template.clone(),
) )
.await; .await;
test_db = test_db_tmp;
shutdown = shutdown_tmp;
join_handle = join_handle_tmp;
} }
Step::Replay => { Step::Replay => {
test_db let db = &test_db.db;
.db
.perform_replay(&test_db.replay_plan, false) db.perform_replay(&test_db.replay_plan, false)
.await .await
.unwrap(); .unwrap();
} }
@ -413,14 +447,8 @@ mod tests {
} }
Step::Await(checks) => { Step::Await(checks) => {
let db = &test_db.db; let db = &test_db.db;
db.unsuppress_persistence().await;
// start background worker db.allow_write_buffer_read();
let shutdown: CancellationToken = Default::default();
let shutdown_captured = shutdown.clone();
let db_captured = Arc::clone(db);
let join_handle = tokio::spawn(async move {
db_captured.background_worker(shutdown_captured).await
});
// wait until checks pass // wait until checks pass
let t_0 = Instant::now(); let t_0 = Instant::now();
@ -439,10 +467,6 @@ mod tests {
} }
tokio::time::sleep(Duration::from_millis(100)).await; tokio::time::sleep(Duration::from_millis(100)).await;
} }
// stop background worker
shutdown.cancel();
join_handle.await.unwrap();
} }
} }
} }
@ -471,22 +495,33 @@ mod tests {
server_id: ServerId, server_id: ServerId,
db_name: &'static str, db_name: &'static str,
partition_template: PartitionTemplate, partition_template: PartitionTemplate,
) -> TestDb { ) -> (TestDb, CancellationToken, JoinHandle<()>) {
let write_buffer = MockBufferForReading::new(write_buffer_state); let write_buffer = MockBufferForReading::new(write_buffer_state);
TestDb::builder() let test_db = TestDb::builder()
.object_store(object_store) .object_store(object_store)
.server_id(server_id) .server_id(server_id)
.write_buffer(WriteBufferConfig::Reading(Arc::new( .write_buffer(WriteBufferConfig::Reading(Arc::new(
tokio::sync::Mutex::new(Box::new(write_buffer) as _), tokio::sync::Mutex::new(Box::new(write_buffer) as _),
))) )))
.lifecycle_rules(data_types::database_rules::LifecycleRules { .lifecycle_rules(data_types::database_rules::LifecycleRules {
buffer_size_hard: Some(NonZeroUsize::new(10_000).unwrap()),
late_arrive_window_seconds: NonZeroU32::try_from(1).unwrap(), late_arrive_window_seconds: NonZeroU32::try_from(1).unwrap(),
mub_row_threshold: NonZeroUsize::new(10).unwrap(),
..Default::default() ..Default::default()
}) })
.partition_template(partition_template) .partition_template(partition_template)
.db_name(db_name) .db_name(db_name)
.build() .build()
.await .await;
// start background worker
let shutdown: CancellationToken = Default::default();
let shutdown_captured = shutdown.clone();
let db_captured = Arc::clone(&test_db.db);
let join_handle =
tokio::spawn(async move { db_captured.background_worker(shutdown_captured).await });
(test_db, shutdown, join_handle)
} }
/// Evaluates given checks. /// Evaluates given checks.
@ -1228,6 +1263,59 @@ mod tests {
.await; .await;
} }
#[tokio::test]
async fn replay_compacts() {
let tracing_capture = TracingCapture::new();
// these numbers are handtuned to trigger hard buffer limits w/o making the test too big
let n_entries = 50u64;
let sequenced_entries: Vec<_> = (0..n_entries)
.map(|sequence_number| {
let lp = format!(
"table_1,tag_partition_by=a foo=\"hello\",bar=10 {}",
sequence_number / 2
);
let lp: &'static str = Box::leak(Box::new(lp));
TestSequencedEntry {
sequencer_id: 0,
sequence_number,
lp,
}
})
.collect();
ReplayTest {
n_sequencers: 1,
steps: vec![
Step::Ingest(sequenced_entries),
Step::Ingest(vec![TestSequencedEntry {
sequencer_id: 0,
sequence_number: n_entries,
lp: "table_2,tag_partition_by=a bar=11 10",
}]),
Step::Await(vec![Check::Partitions(vec![
("table_1", "tag_partition_by_a"),
("table_2", "tag_partition_by_a"),
])]),
Step::Persist(vec![("table_2", "tag_partition_by_a")]),
Step::Restart,
Step::Assert(vec![Check::Partitions(vec![(
"table_2",
"tag_partition_by_a",
)])]),
Step::Replay,
],
}
.run()
.await;
// check that hard buffer limit was actually hit (otherwise this test is pointless/outdated)
assert_contains!(
tracing_capture.to_string(),
"Hard limit reached while replaying, waiting for compaction to catch up"
);
}
#[tokio::test] #[tokio::test]
async fn replay_fail_sequencers_change() { async fn replay_fail_sequencers_change() {
// create write buffer w/ sequencer 0 and 1 // create write buffer w/ sequencer 0 and 1
@ -1374,6 +1462,7 @@ mod tests {
let db_captured = Arc::clone(db); let db_captured = Arc::clone(db);
let join_handle = let join_handle =
tokio::spawn(async move { db_captured.background_worker(shutdown_captured).await }); tokio::spawn(async move { db_captured.background_worker(shutdown_captured).await });
db.allow_write_buffer_read();
// wait until checks pass // wait until checks pass
let checks = vec![Check::Query( let checks = vec![Check::Query(

View File

@ -353,6 +353,8 @@ async fn try_advance_database_init_process(
.await .await
.map_err(Box::new) .map_err(Box::new)
.context(ReplayError)?; .context(ReplayError)?;
db.unsuppress_persistence().await;
db.allow_write_buffer_read();
handle handle
.advance_init() .advance_init()

View File

@ -593,6 +593,11 @@ where
db_reservation.advance_replay(preserved_catalog, catalog, replay_plan, write_buffer)?; db_reservation.advance_replay(preserved_catalog, catalog, replay_plan, write_buffer)?;
// no actual replay required // no actual replay required
let db = db_reservation
.db_any_state()
.expect("DB should exist at this point");
db.unsuppress_persistence().await;
db.allow_write_buffer_read();
db_reservation.advance_init()?; db_reservation.advance_init()?;
// ready to commit // ready to commit

View File

@ -107,7 +107,7 @@ impl TestDbBuilder {
TestDb { TestDb {
metric_registry: metrics::TestMetricRegistry::new(metrics_registry), metric_registry: metrics::TestMetricRegistry::new(metrics_registry),
db: Arc::new(Db::new(database_to_commit, Arc::new(JobRegistry::new()))), db: Db::new(database_to_commit, Arc::new(JobRegistry::new())),
replay_plan, replay_plan,
} }
} }