From c33e5c22e61da2c1add659a5928c1d80b316b9cf Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> Date: Tue, 14 Sep 2021 17:04:58 +0100 Subject: [PATCH] feat: pull WriteBuffer consumer out of Db and onto Database (#2243) (#2525) * feat: pull WriteBuffer consumer out of Db and onto Database (#2243) * chore: restore WritingOnlyAllowedThroughWriteBuffer error * refactor: remove WriteBufferConfig * chore: fix docs * chore: move WriteBufferConsumer tests out of db.rs * chore: document WriteBufferFactory member functions * chore: fmt Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- server/src/database.rs | 117 +++-- server/src/db.rs | 755 ++--------------------------- server/src/db/replay.rs | 484 +++++++++--------- server/src/lib.rs | 33 +- server/src/utils.rs | 13 +- server/src/write_buffer.rs | 545 +++++++++++++++++++++ server/src/write_buffer/metrics.rs | 167 +++++++ write_buffer/src/config.rs | 217 +++------ write_buffer/src/mock.rs | 7 +- 9 files changed, 1209 insertions(+), 1129 deletions(-) create mode 100644 server/src/write_buffer.rs create mode 100644 server/src/write_buffer/metrics.rs diff --git a/server/src/database.rs b/server/src/database.rs index 1f439cb774..bfa0d8961c 100644 --- a/server/src/database.rs +++ b/server/src/database.rs @@ -1,3 +1,4 @@ +use crate::write_buffer::WriteBufferConsumer; use crate::{ db::{ load::{create_preserved_catalog, load_or_create_preserved_catalog}, @@ -6,6 +7,7 @@ use crate::{ rules::ProvidedDatabaseRules, ApplicationState, Db, }; +use data_types::database_rules::WriteBufferDirection; use data_types::{database_state::DatabaseStateCode, server_id::ServerId, DatabaseName}; use futures::{ future::{BoxFuture, Shared}, @@ -14,11 +16,11 @@ use futures::{ use internal_types::freezable::Freezable; use iox_object_store::IoxObjectStore; use observability_deps::tracing::{error, info, warn}; -use parking_lot::RwLock; +use parking_lot::{MappedRwLockReadGuard, RwLock, RwLockReadGuard}; use parquet_file::catalog::api::PreservedCatalog; use persistence_windows::checkpoint::ReplayPlan; use snafu::{ensure, OptionExt, ResultExt, Snafu}; -use std::{future::Future, ops::DerefMut, sync::Arc, time::Duration}; +use std::{future::Future, sync::Arc, time::Duration}; use tokio::{sync::Notify, task::JoinError}; use tokio_util::sync::CancellationToken; @@ -101,7 +103,7 @@ pub struct Database { } #[derive(Debug, Clone)] -/// Informatation about where a database is located on object store, +/// Information about where a database is located on object store, /// and how to perform startup activities. pub struct DatabaseConfig { pub name: DatabaseName<'static>, @@ -314,8 +316,9 @@ impl Database { // via WriteGuard let mut state = state.unfreeze(handle); - if let DatabaseState::Initialized(DatabaseStateInitialized { db, provided_rules }) = - state.deref_mut() + if let DatabaseState::Initialized(DatabaseStateInitialized { + db, provided_rules, .. + }) = &mut *state { db.update_rules(Arc::clone(new_provided_rules.rules())); *provided_rules = new_provided_rules; @@ -333,13 +336,14 @@ impl Database { self.shared.state.read().iox_object_store() } + pub fn initialized(&self) -> Option> { + RwLockReadGuard::try_map(self.shared.state.read(), |state| state.get_initialized()).ok() + } + /// Gets access to an initialized `Db` pub fn initialized_db(&self) -> Option> { - self.shared - .state - .read() - .get_initialized() - .map(|state| Arc::clone(&state.db)) + let initialized = self.initialized()?; + Some(Arc::clone(initialized.db())) } /// Returns Ok(()) when the Database is initialized, or the error @@ -444,7 +448,7 @@ impl Database { let db_name = &shared.config.name; current_state.replay_plan = Arc::new(None); let current_state = current_state - .advance() + .advance(shared.as_ref()) .await .map_err(Box::new) .context(SkipReplay { db_name })?; @@ -523,6 +527,20 @@ async fn background_worker(shared: Arc) { db.background_worker(shared.shutdown.clone()).await } + let write_buffer_consumer = shared + .state + .read() + .get_initialized() + .and_then(|initialized| initialized.write_buffer_consumer.clone()); + + if let Some(consumer) = write_buffer_consumer { + info!(db_name=%shared.config.name, "shutting down write buffer consumer"); + consumer.shutdown(); + if let Err(e) = consumer.join().await { + error!(db_name=%shared.config.name, %e, "error shutting down write buffer consumer") + } + } + info!(db_name=%shared.config.name, "draining tasks"); // Loop in case tasks are spawned during shutdown @@ -632,7 +650,7 @@ async fn initialize_database(shared: &DatabaseShared) { Ok(state) => DatabaseState::CatalogLoaded(state), Err(e) => DatabaseState::CatalogLoadError(state, Arc::new(e)), }, - DatabaseState::CatalogLoaded(state) => match state.advance().await { + DatabaseState::CatalogLoaded(state) => match state.advance(shared).await { Ok(state) => DatabaseState::Initialized(state), Err(e) => DatabaseState::ReplayError(state, Arc::new(e)), }, @@ -888,24 +906,27 @@ impl DatabaseStateRulesLoaded { .await .context(CatalogLoad)?; - let write_buffer = shared - .application - .write_buffer_factory() - .new_config( - shared.config.server_id, - self.provided_rules.rules().as_ref(), - ) - .await - .context(CreateWriteBuffer)?; + let rules = self.provided_rules.rules(); + let write_buffer_factory = shared.application.write_buffer_factory(); + let producer = match rules.write_buffer_connection.as_ref() { + Some(connection) if matches!(connection.direction, WriteBufferDirection::Write) => { + let producer = write_buffer_factory + .new_config_write(shared.config.name.as_str(), connection) + .await + .context(CreateWriteBuffer)?; + Some(producer) + } + _ => None, + }; let database_to_commit = DatabaseToCommit { server_id: shared.config.server_id, iox_object_store: Arc::clone(&self.iox_object_store), exec: Arc::clone(shared.application.executor()), - rules: Arc::clone(self.provided_rules.rules()), + rules: Arc::clone(rules), preserved_catalog, catalog, - write_buffer, + write_buffer_producer: producer, metrics_registry_v2: Arc::clone(shared.application.metric_registry_v2()), }; @@ -931,30 +952,66 @@ struct DatabaseStateCatalogLoaded { impl DatabaseStateCatalogLoaded { /// Perform replay - async fn advance(&self) -> Result { + async fn advance( + &self, + shared: &DatabaseShared, + ) -> Result { let db = Arc::clone(&self.db); - db.perform_replay(self.replay_plan.as_ref().as_ref()) - .await - .context(Replay)?; - // TODO: Pull write buffer and lifecycle out of Db db.unsuppress_persistence().await; - db.allow_write_buffer_read(); + + let rules = self.provided_rules.rules(); + let write_buffer_factory = shared.application.write_buffer_factory(); + let write_buffer_consumer = match rules.write_buffer_connection.as_ref() { + Some(connection) if matches!(connection.direction, WriteBufferDirection::Read) => { + let mut consumer = write_buffer_factory + .new_config_read( + shared.config.server_id, + shared.config.name.as_str(), + connection, + ) + .await + .context(CreateWriteBuffer)?; + + db.perform_replay(self.replay_plan.as_ref().as_ref(), consumer.as_mut()) + .await + .context(Replay)?; + + Some(Arc::new(WriteBufferConsumer::new( + consumer, + Arc::clone(&db), + shared.application.metric_registry().as_ref(), + ))) + } + _ => None, + }; Ok(DatabaseStateInitialized { db, + write_buffer_consumer, provided_rules: Arc::clone(&self.provided_rules), }) } } #[derive(Debug, Clone)] -struct DatabaseStateInitialized { +pub struct DatabaseStateInitialized { db: Arc, + write_buffer_consumer: Option>, provided_rules: Arc, } +impl DatabaseStateInitialized { + pub fn db(&self) -> &Arc { + &self.db + } + + pub fn write_buffer_consumer(&self) -> Option<&Arc> { + self.write_buffer_consumer.as_ref() + } +} + #[cfg(test)] mod tests { use chrono::Utc; diff --git a/server/src/db.rs b/server/src/db.rs index a3a2cba34c..19557725b0 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -1,23 +1,23 @@ //! This module contains the main IOx Database object which has the //! instances of the mutable buffer, read buffer, and object store -use async_trait::async_trait; -use chrono::{TimeZone, Utc}; -use futures::{stream::BoxStream, StreamExt}; -use parking_lot::{Mutex, RwLock}; -use rand_distr::{Distribution, Poisson}; -use snafu::{ensure, OptionExt, ResultExt, Snafu}; use std::{ any::Any, collections::HashMap, num::NonZeroUsize, sync::{ - atomic::{AtomicBool, AtomicUsize, Ordering}, + atomic::{AtomicUsize, Ordering}, Arc, }, time::{Duration, Instant}, }; +use async_trait::async_trait; +use chrono::{TimeZone, Utc}; +use parking_lot::{Mutex, RwLock}; +use rand_distr::{Distribution, Poisson}; +use snafu::{ensure, OptionExt, ResultExt, Snafu}; + use ::lifecycle::{LifecycleChunk, LockableChunk, LockablePartition}; use data_types::{ chunk_metadata::ChunkSummary, @@ -44,10 +44,7 @@ use query::{ QueryDatabase, }; use trace::ctx::SpanContext; -use write_buffer::{ - config::WriteBufferConfig, - core::{FetchHighWatermark, WriteBufferError}, -}; +use write_buffer::core::{WriteBufferReading, WriteBufferWriting}; pub(crate) use crate::db::chunk::DbChunk; use crate::{ @@ -98,11 +95,6 @@ pub enum Error { #[snafu(display("Cannot write to this database: no mutable buffer configured"))] DatabaseNotWriteable {}, - #[snafu(display( - "Cannot write to this database, configured to only read from the write buffer" - ))] - WritingOnlyAllowedThroughWriteBuffer {}, - #[snafu(display("Hard buffer size limit reached"))] HardLimitReached {}, @@ -178,101 +170,6 @@ pub enum Error { pub type Result = std::result::Result; -/// Metrics for data ingest via write buffer. -#[derive(Debug)] -struct WriteBufferIngestMetrics { - /// Metrics domain - domain: Arc, -} - -impl WriteBufferIngestMetrics { - fn new(domain: Arc) -> Self { - Self { domain } - } - - fn new_sequencer_metrics(&self, sequencer_id: u32) -> SequencerMetrics { - let attributes = vec![KeyValue::new("sequencer_id", sequencer_id.to_string())]; - - let red = self - .domain - .register_red_metric_with_attributes(Some("ingest"), attributes.clone()); - let bytes_read = self.domain.register_counter_metric_with_attributes( - "read", - Some("bytes"), - "Bytes read from sequencer", - attributes.clone(), - ); - let last_sequence_number = self.domain.register_gauge_metric_with_attributes( - "last_sequence_number", - None, - "Last consumed sequence number (e.g. Kafka offset)", - &attributes, - ); - let sequence_number_lag = self.domain.register_gauge_metric_with_attributes( - "sequence_number_lag", - None, - "The difference between the the last sequence number available (e.g. Kafka offset) and (= minus) last consumed sequence number", - &attributes, - ); - let last_min_ts = self.domain.register_gauge_metric_with_attributes( - "last_min_ts", - None, - "Minimum timestamp of last write as unix timestamp in nanoseconds", - &attributes, - ); - let last_max_ts = self.domain.register_gauge_metric_with_attributes( - "last_max_ts", - None, - "Maximum timestamp of last write as unix timestamp in nanoseconds", - &attributes, - ); - let last_ingest_ts = self.domain.register_gauge_metric_with_attributes( - "last_ingest_ts", - None, - "Last seen ingest timestamp as unix timestamp in nanoseconds", - &attributes, - ); - - SequencerMetrics { - red, - bytes_read, - last_sequence_number, - sequence_number_lag, - last_min_ts, - last_max_ts, - last_ingest_ts, - } - } -} - -/// Metrics for a single sequencer. -#[derive(Debug)] -struct SequencerMetrics { - /// Metrics for tracking ingest. - red: metrics::RedMetric, - - /// Bytes read from sequencer. - /// - /// This metrics is independent of the success / error state of the entries. - bytes_read: metrics::Counter, - - /// Last consumed sequence number (e.g. Kafka offset). - last_sequence_number: metrics::Gauge, - - // The difference between the the last sequence number available (e.g. Kafka offset) and (= minus) last consumed - // sequence number. - sequence_number_lag: metrics::Gauge, - - /// Minimum timestamp of last write as unix timestamp in nanoseconds. - last_min_ts: metrics::Gauge, - - /// Maximum timestamp of last write as unix timestamp in nanoseconds. - last_max_ts: metrics::Gauge, - - /// Last seen ingest timestamp as unix timestamp in nanoseconds. - last_ingest_ts: metrics::Gauge, -} - /// `Db` is an instance-local, queryable, possibly persisted, and possibly mutable data store /// /// It is responsible for: @@ -378,11 +275,9 @@ pub struct Db { /// Metric attributes metric_attributes: Vec, - /// Ingest metrics - ingest_metrics: WriteBufferIngestMetrics, - - /// Optionally connect to a write buffer for either buffering writes or reading buffered writes - write_buffer: Option, + /// Optional write buffer producer + /// TODO: Move onto Database + write_buffer_producer: Option>, /// Lock that prevents the cleanup job from deleting files that are written but not yet added to the preserved /// catalog. @@ -400,11 +295,6 @@ pub struct Db { /// - to keep the lifecycle state (e.g. the number of running compactions) around lifecycle_policy: tokio::sync::Mutex>>, - /// Flag to stop background worker from reading from the write buffer. - /// - /// TODO: Move write buffer read loop out of Db. - no_write_buffer_read: AtomicBool, - /// TESTING ONLY: Mocked `Instant::now()` for the background worker background_worker_now_override: Mutex>, } @@ -418,7 +308,10 @@ pub(crate) struct DatabaseToCommit { pub(crate) preserved_catalog: PreservedCatalog, pub(crate) catalog: Catalog, pub(crate) rules: Arc, - pub(crate) write_buffer: Option, + + /// TODO: Move onto Database + pub(crate) write_buffer_producer: Option>, + pub(crate) metrics_registry_v2: Arc, } @@ -432,10 +325,6 @@ impl Db { let metrics_registry = Arc::clone(&database_to_commit.catalog.metrics_registry); let metric_attributes = database_to_commit.catalog.metric_attributes.clone(); - let ingest_domain = metrics_registry - .register_domain_with_attributes("write_buffer", metric_attributes.clone()); - let ingest_metrics = WriteBufferIngestMetrics::new(Arc::new(ingest_domain)); - let catalog = Arc::new(database_to_commit.catalog); let catalog_access = QueryCatalogAccess::new( @@ -460,11 +349,9 @@ impl Db { worker_iterations_lifecycle: AtomicUsize::new(0), worker_iterations_cleanup: AtomicUsize::new(0), metric_attributes, - ingest_metrics, - write_buffer: database_to_commit.write_buffer, + write_buffer_producer: database_to_commit.write_buffer_producer, cleanup_lock: Default::default(), lifecycle_policy: tokio::sync::Mutex::new(None), - no_write_buffer_read: AtomicBool::new(true), background_worker_now_override: Default::default(), }; let this = Arc::new(this); @@ -483,13 +370,6 @@ impl Db { policy.unsuppress_persistence(); } - /// Allow continuous reads from the write buffer (if configured). - /// - /// TODO: Move write buffer read loop out of Db. - pub fn allow_write_buffer_read(&self) { - self.no_write_buffer_read.store(false, Ordering::SeqCst); - } - /// Return a handle to the executor used to run queries pub fn executor(&self) -> Arc { Arc::clone(&self.exec) @@ -565,7 +445,7 @@ impl Db { } } - fn partition( + pub fn partition( &self, table_name: &str, partition_key: &str, @@ -574,7 +454,7 @@ impl Db { Ok(Arc::clone(&partition)) } - fn chunk( + pub fn chunk( &self, table_name: &str, partition_key: &str, @@ -863,12 +743,18 @@ impl Db { /// /// When `replay_plan` is `None` then no real replay will be performed. Instead the write buffer streams will be set /// to the current high watermark and normal playback will continue from there. - pub async fn perform_replay(&self, replay_plan: Option<&ReplayPlan>) -> Result<()> { + pub async fn perform_replay( + &self, + replay_plan: Option<&ReplayPlan>, + consumer: &mut dyn WriteBufferReading, + ) -> Result<()> { use crate::db::replay::{perform_replay, seek_to_end}; if let Some(replay_plan) = replay_plan { - perform_replay(self, replay_plan).await.context(ReplayError) + perform_replay(self, replay_plan, consumer) + .await + .context(ReplayError) } else { - seek_to_end(self).await.context(ReplayError) + seek_to_end(self, consumer).await.context(ReplayError) } } @@ -942,210 +828,15 @@ impl Db { } }; - // streaming from the write buffer loop - let write_buffer = async { - if let Some(WriteBufferConfig::Reading(write_buffer)) = &self.write_buffer { - // wait for permission - while self.no_write_buffer_read.load(Ordering::SeqCst) { - tokio::time::sleep(Duration::from_millis(10)).await; - } - - let mut write_buffer = write_buffer - .try_lock() - .expect("no streams should exist at this point"); - let mut futures = vec![]; - for (sequencer_id, stream) in write_buffer.streams() { - let metrics = self.ingest_metrics.new_sequencer_metrics(sequencer_id); - let fut = self.stream_in_sequenced_entries( - sequencer_id, - stream.stream, - stream.fetch_high_watermark, - metrics, - ); - futures.push(fut); - } - - futures::future::join_all(futures).await; - } else { - futures::future::pending::<()>().await; - } - }; - // None of the futures need to perform drain logic on shutdown. // When the first one finishes, all of them are dropped tokio::select! { - _ = lifecycle_loop => error!("lifecycle loop exited - database worker bailing out"), - _ = write_buffer => error!("write buffer loop exited - database worker bailing out"), - _ = object_store_cleanup_loop => error!("object store cleanup exited - database worker bailing out"), - _ = shutdown.cancelled() => info!("database worker shutting down"), + _ = lifecycle_loop => error!("lifecycle loop exited - db worker bailing out"), + _ = object_store_cleanup_loop => error!("object store cleanup exited - db worker bailing out"), + _ = shutdown.cancelled() => info!("db worker shutting down"), } - info!("finished background worker"); - } - - /// This is used to take entries from a `Stream` and put them in the mutable buffer, such as - /// streaming entries from a write buffer. - async fn stream_in_sequenced_entries<'a>( - &'a self, - sequencer_id: u32, - mut stream: BoxStream<'a, Result>, - f_mark: FetchHighWatermark<'a>, - mut metrics: SequencerMetrics, - ) { - let db_name = self.rules.read().db_name().to_string(); - let mut watermark_last_updated: Option = None; - let mut watermark = 0; - - while let Some(sequenced_entry_result) = stream.next().await { - let red_observation = metrics.red.observation(); - - // get entry from sequencer - let sequenced_entry = match sequenced_entry_result { - Ok(sequenced_entry) => sequenced_entry, - Err(e) => { - debug!( - %e, - %db_name, - sequencer_id, - "Error converting write buffer data to SequencedEntry", - ); - red_observation.client_error(); - continue; - } - }; - let sequenced_entry = Arc::new(sequenced_entry); - - // store entry - let mut logged_hard_limit = false; - loop { - match self.store_sequenced_entry( - Arc::clone(&sequenced_entry), - filter_table_batch_keep_all, - ) { - Ok(_) => { - red_observation.ok(); - break; - } - Err(Error::HardLimitReached {}) => { - // wait a bit and retry - if !logged_hard_limit { - info!( - %db_name, - sequencer_id, - "Hard limit reached while reading from write buffer, waiting for compaction to catch up", - ); - logged_hard_limit = true; - } - tokio::time::sleep(Duration::from_millis(100)).await; - continue; - } - Err(e) => { - debug!( - %e, - %db_name, - sequencer_id, - "Error storing SequencedEntry from write buffer in database" - ); - red_observation.error(); - - // no retry - break; - } - } - } - - // maybe update sequencer watermark - // We are not updating this watermark every round because asking the sequencer for that watermark can be - // quite expensive. - let now = self.background_worker_now(); - if watermark_last_updated - .map(|ts| (now - ts) > Duration::from_secs(10)) - .unwrap_or(true) - { - match f_mark().await { - Ok(w) => { - watermark = w; - } - Err(e) => { - debug!( - %e, - %db_name, - sequencer_id, - "Error while reading sequencer watermark", - ) - } - } - watermark_last_updated = Some(now); - } - - // update: - // - bytes read - // - last sequence number - // - lag - // - min ts - // - max ts - // - ingest ts - let sequence = sequenced_entry - .sequence() - .expect("entry from write buffer must be sequenced"); - let producer_wallclock_timestamp = sequenced_entry - .producer_wallclock_timestamp() - .expect("entry from write buffer must have a producer wallclock time"); - let entry = sequenced_entry.entry(); - metrics.bytes_read.add(entry.data().len() as u64); - metrics - .last_sequence_number - .set(sequence.number as usize, &[]); - metrics.sequence_number_lag.set( - watermark.saturating_sub(sequence.number).saturating_sub(1) as usize, - &[], - ); - if let Some(min_ts) = entry - .partition_writes() - .map(|partition_writes| { - partition_writes - .iter() - .filter_map(|partition_write| { - partition_write - .table_batches() - .iter() - .filter_map(|table_batch| table_batch.min_max_time().ok()) - .map(|(min, _max)| min) - .max() - }) - .min() - }) - .flatten() - { - metrics - .last_min_ts - .set(min_ts.timestamp_nanos() as usize, &[]); - } - if let Some(max_ts) = entry - .partition_writes() - .map(|partition_writes| { - partition_writes - .iter() - .filter_map(|partition_write| { - partition_write - .table_batches() - .iter() - .filter_map(|table_batch| table_batch.min_max_time().ok()) - .map(|(_min, max)| max) - .max() - }) - .max() - }) - .flatten() - { - metrics - .last_max_ts - .set(max_ts.timestamp_nanos() as usize, &[]); - } - metrics - .last_ingest_ts - .set(producer_wallclock_timestamp.timestamp_nanos() as usize, &[]); - } + info!("finished db background worker"); } /// `Instant::now()` that is used by the background worker. Can be mocked for testing. @@ -1176,10 +867,10 @@ impl Db { let rules = self.rules.read(); rules.lifecycle_rules.immutable }; - debug!(%immutable, has_write_buffer=self.write_buffer.is_some(), "storing entry"); + debug!(%immutable, has_write_buffer_producer=self.write_buffer_producer.is_some(), "storing entry"); - match (self.write_buffer.as_ref(), immutable) { - (Some(WriteBufferConfig::Writing(write_buffer)), true) => { + match (self.write_buffer_producer.as_ref(), immutable) { + (Some(write_buffer), true) => { // If only the write buffer is configured, this is passing the data through to // the write buffer, and it's not an error. We ignore the returned metadata; it // will get picked up when data is read from the write buffer. @@ -1191,7 +882,7 @@ impl Db { .context(WriteBufferWritingError)?; Ok(()) } - (Some(WriteBufferConfig::Writing(write_buffer)), false) => { + (Some(write_buffer), false) => { // If using both write buffer and mutable buffer, we want to wait for the write // buffer to return success before adding the entry to the mutable buffer. @@ -1214,10 +905,6 @@ impl Db { // `SequencedEntry`. DatabaseNotWriteable {}.fail() } - (Some(WriteBufferConfig::Reading(_)), false) => { - // If configured to read entries from the write buffer, we shouldn't be here - WritingOnlyAllowedThroughWriteBuffer {}.fail() - } (None, false) => { // If no write buffer is configured, nothing is // sequencing entries so skip doing so here @@ -1436,7 +1123,7 @@ impl Db { } } -fn filter_table_batch_keep_all( +pub fn filter_table_batch_keep_all( _sequence: Option<&Sequence>, _partition_key: &str, _batch: &TableBatch<'_>, @@ -1587,7 +1274,6 @@ pub mod test_helpers { #[cfg(test)] mod tests { use std::{ - collections::BTreeMap, convert::TryFrom, iter::Iterator, num::{NonZeroU32, NonZeroU64, NonZeroUsize}, @@ -1602,7 +1288,7 @@ mod tests { use futures::{stream, StreamExt, TryStreamExt}; use tokio_util::sync::CancellationToken; - use ::test_helpers::{assert_contains, tracing::TracingCapture}; + use ::test_helpers::assert_contains; use arrow_util::{assert_batches_eq, assert_batches_sorted_eq}; use data_types::{ chunk_metadata::{ChunkAddr, ChunkStorage}, @@ -1610,7 +1296,7 @@ mod tests { partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics, TableSummary}, write_summary::TimestampSummary, }; - use entry::{test_helpers::lp_to_entry, Sequence}; + use entry::test_helpers::lp_to_entry; use internal_types::{schema::Schema, selection::Selection}; use iox_object_store::ParquetFilePath; use metric::{Attributes, CumulativeGauge, Metric, Observation}; @@ -1621,10 +1307,9 @@ mod tests { test_utils::{load_parquet_from_store_for_path, read_data_from_parquet_data}, }; use persistence_windows::min_max_sequence::MinMaxSequence; - use query::{frontend::sql::SqlQueryPlanner, QueryChunk, QueryDatabase}; + use query::{QueryChunk, QueryDatabase}; use write_buffer::mock::{ - MockBufferForReading, MockBufferForWriting, MockBufferForWritingThatAlwaysErrors, - MockBufferSharedState, + MockBufferForWriting, MockBufferForWritingThatAlwaysErrors, MockBufferSharedState, }; use crate::{ @@ -1674,7 +1359,7 @@ mod tests { let write_buffer = Arc::new(MockBufferForWriting::new(write_buffer_state.clone(), None).unwrap()); let test_db = TestDb::builder() - .write_buffer(WriteBufferConfig::Writing(Arc::clone(&write_buffer) as _)) + .write_buffer_producer(write_buffer) .lifecycle_rules(LifecycleRules { immutable: true, ..Default::default() @@ -1698,7 +1383,7 @@ mod tests { let write_buffer = Arc::new(MockBufferForWriting::new(write_buffer_state.clone(), None).unwrap()); let db = TestDb::builder() - .write_buffer(WriteBufferConfig::Writing(Arc::clone(&write_buffer) as _)) + .write_buffer_producer(write_buffer) .build() .await .db; @@ -1725,7 +1410,7 @@ mod tests { let write_buffer = Arc::new(MockBufferForWritingThatAlwaysErrors {}); let db = TestDb::builder() - .write_buffer(WriteBufferConfig::Writing(Arc::clone(&write_buffer) as _)) + .write_buffer_producer(write_buffer) .build() .await .db; @@ -1741,285 +1426,6 @@ mod tests { ); } - #[tokio::test] - async fn read_from_write_buffer_write_to_mutable_buffer() { - let write_buffer_state = - MockBufferSharedState::empty_with_n_sequencers(NonZeroU32::try_from(1).unwrap()); - let ingest_ts1 = Utc.timestamp_millis(42); - let ingest_ts2 = Utc.timestamp_millis(1337); - write_buffer_state.push_entry(SequencedEntry::new_from_sequence( - Sequence::new(0, 0), - ingest_ts1, - lp_to_entry("mem foo=1 10"), - )); - write_buffer_state.push_entry(SequencedEntry::new_from_sequence( - Sequence::new(0, 7), - ingest_ts2, - lp_to_entry("cpu bar=2 20\ncpu bar=3 30"), - )); - let write_buffer = MockBufferForReading::new(write_buffer_state, None).unwrap(); - - let test_db = TestDb::builder() - .write_buffer(WriteBufferConfig::Reading(Arc::new( - tokio::sync::Mutex::new(Box::new(write_buffer) as _), - ))) - .build() - .await; - let db = test_db.db; - - // do: start background task loop - let shutdown: CancellationToken = Default::default(); - let shutdown_captured = shutdown.clone(); - let db_captured = Arc::clone(&db); - let join_handle = - tokio::spawn(async move { db_captured.background_worker(shutdown_captured).await }); - db.allow_write_buffer_read(); - - let query = "select * from cpu"; - - // check: after a while the table should exist and a query plan should succeed - let t_0 = Instant::now(); - loop { - let planner = SqlQueryPlanner::default(); - let ctx = db.new_query_context(None); - let physical_plan = planner.query(query, &ctx); - - if physical_plan.is_ok() { - break; - } - - assert!(t_0.elapsed() < Duration::from_secs(10)); - tokio::time::sleep(Duration::from_millis(100)).await; - } - - // check: metrics - // We need to do that BEFORE shutting down the background loop because gauges would be dropped and resetted otherwise - let metrics = test_db.metric_registry; - metrics - .has_metric_family("write_buffer_ingest_requests_total") - .with_attributes(&[ - ("db_name", "placeholder"), - ("svr_id", "1"), - ("sequencer_id", "0"), - ("status", "ok"), - ]) - .counter() - .eq(2.0) - .unwrap(); - metrics - .has_metric_family("write_buffer_read_bytes_total") - .with_attributes(&[ - ("db_name", "placeholder"), - ("svr_id", "1"), - ("sequencer_id", "0"), - ]) - .counter() - .eq(528.0) - .unwrap(); - metrics - .has_metric_family("write_buffer_last_sequence_number") - .with_attributes(&[ - ("db_name", "placeholder"), - ("svr_id", "1"), - ("sequencer_id", "0"), - ]) - .gauge() - .eq(7.0) - .unwrap(); - metrics - .has_metric_family("write_buffer_sequence_number_lag") - .with_attributes(&[ - ("db_name", "placeholder"), - ("svr_id", "1"), - ("sequencer_id", "0"), - ]) - .gauge() - .eq(0.0) - .unwrap(); - metrics - .has_metric_family("write_buffer_last_min_ts") - .with_attributes(&[ - ("db_name", "placeholder"), - ("svr_id", "1"), - ("sequencer_id", "0"), - ]) - .gauge() - .eq(20.0) - .unwrap(); - metrics - .has_metric_family("write_buffer_last_max_ts") - .with_attributes(&[ - ("db_name", "placeholder"), - ("svr_id", "1"), - ("sequencer_id", "0"), - ]) - .gauge() - .eq(30.0) - .unwrap(); - metrics - .has_metric_family("write_buffer_last_ingest_ts") - .with_attributes(&[ - ("db_name", "placeholder"), - ("svr_id", "1"), - ("sequencer_id", "0"), - ]) - .gauge() - .eq(ingest_ts2.timestamp_nanos() as f64) - .unwrap(); - - // do: stop background task loop - shutdown.cancel(); - join_handle.await.unwrap(); - - // check: the expected results should be there - let batches = run_query(db, "select * from cpu order by time").await; - - let expected = vec![ - "+-----+--------------------------------+", - "| bar | time |", - "+-----+--------------------------------+", - "| 2 | 1970-01-01T00:00:00.000000020Z |", - "| 3 | 1970-01-01T00:00:00.000000030Z |", - "+-----+--------------------------------+", - ]; - assert_batches_eq!(expected, &batches); - } - - #[tokio::test] - async fn write_buffer_reads_wait_for_compaction() { - let tracing_capture = TracingCapture::new(); - - // setup write buffer - // these numbers are handtuned to trigger hard buffer limits w/o making the test too big - let n_entries = 50u64; - let write_buffer_state = - MockBufferSharedState::empty_with_n_sequencers(NonZeroU32::try_from(1).unwrap()); - for sequence_number in 0..n_entries { - let lp = format!( - "table_1,tag_partition_by=a foo=\"hello\",bar=1 {}", - sequence_number / 2 - ); - write_buffer_state.push_entry(SequencedEntry::new_from_sequence( - Sequence::new(0, sequence_number), - Utc::now(), - lp_to_entry(&lp), - )); - } - write_buffer_state.push_entry(SequencedEntry::new_from_sequence( - Sequence::new(0, n_entries), - Utc::now(), - lp_to_entry("table_2,partition_by=a foo=1 0"), - )); - let write_buffer = MockBufferForReading::new(write_buffer_state, None).unwrap(); - - // create DB - let partition_template = PartitionTemplate { - parts: vec![TemplatePart::Column("tag_partition_by".to_string())], - }; - let test_db = TestDb::builder() - .write_buffer(WriteBufferConfig::Reading(Arc::new( - tokio::sync::Mutex::new(Box::new(write_buffer) as _), - ))) - .lifecycle_rules(data_types::database_rules::LifecycleRules { - buffer_size_hard: Some(NonZeroUsize::new(10_000).unwrap()), - mub_row_threshold: NonZeroUsize::new(10).unwrap(), - ..Default::default() - }) - .partition_template(partition_template) - .build() - .await; - let db = test_db.db; - - // start background task loop - let shutdown: CancellationToken = Default::default(); - let shutdown_captured = shutdown.clone(); - let db_captured = Arc::clone(&db); - let join_handle = - tokio::spawn(async move { db_captured.background_worker(shutdown_captured).await }); - db.allow_write_buffer_read(); - - // after a while the table should exist - let t_0 = Instant::now(); - loop { - if db.table_schema("table_2").is_some() { - break; - } - - assert!(t_0.elapsed() < Duration::from_secs(10)); - tokio::time::sleep(Duration::from_millis(100)).await; - } - - // do: stop background task loop - shutdown.cancel(); - join_handle.await.unwrap(); - - // no rows should be dropped - let batches = run_query(db, "select sum(bar) as n from table_1").await; - let expected = vec!["+----+", "| n |", "+----+", "| 25 |", "+----+"]; - assert_batches_eq!(expected, &batches); - - // check that hard buffer limit was actually hit (otherwise this test is pointless/outdated) - assert_contains!(tracing_capture.to_string(), "Hard limit reached while reading from write buffer, waiting for compaction to catch up"); - } - - #[tokio::test] - async fn error_converting_data_from_write_buffer_to_sequenced_entry_is_reported() { - let write_buffer_state = - MockBufferSharedState::empty_with_n_sequencers(NonZeroU32::try_from(1).unwrap()); - write_buffer_state.push_error( - String::from("Something bad happened on the way to creating a SequencedEntry").into(), - 0, - ); - let write_buffer = MockBufferForReading::new(write_buffer_state, None).unwrap(); - - let test_db = TestDb::builder() - .write_buffer(WriteBufferConfig::Reading(Arc::new( - tokio::sync::Mutex::new(Box::new(write_buffer) as _), - ))) - .build() - .await; - - let db = Arc::new(test_db.db); - let metrics = test_db.metric_registry; - - // do: start background task loop - let shutdown: CancellationToken = Default::default(); - let shutdown_captured = shutdown.clone(); - let db_captured = Arc::clone(&db); - let join_handle = - tokio::spawn(async move { db_captured.background_worker(shutdown_captured).await }); - db.allow_write_buffer_read(); - - // check: after a while the error should be reported in the database's metrics - let t_0 = Instant::now(); - loop { - let family = metrics.try_has_metric_family("write_buffer_ingest_requests_total"); - - if let Ok(metric) = family { - if metric - .with_attributes(&[ - ("db_name", "placeholder"), - ("svr_id", "1"), - ("sequencer_id", "0"), - ("status", "client_error"), - ]) - .counter() - .eq(1.0) - .is_ok() - { - break; - } - } - - assert!(t_0.elapsed() < Duration::from_secs(10)); - tokio::time::sleep(Duration::from_millis(100)).await; - } - - // do: stop background task loop - shutdown.cancel(); - join_handle.await.unwrap(); - } - #[tokio::test] async fn cant_write_when_reading_from_write_buffer() { // Validate that writes are rejected if this database is reading from the write buffer @@ -2890,7 +2296,7 @@ mod tests { MockBufferSharedState::empty_with_n_sequencers(NonZeroU32::try_from(1).unwrap()); let write_buffer = Arc::new(MockBufferForWriting::new(write_buffer_state, None).unwrap()); let db = TestDb::builder() - .write_buffer(WriteBufferConfig::Writing(Arc::clone(&write_buffer) as _)) + .write_buffer_producer(write_buffer) .build() .await .db; @@ -2930,79 +2336,6 @@ mod tests { assert_eq!(open_max.timestamp_nanos(), 20); } - #[tokio::test] - async fn read_from_write_buffer_updates_persistence_windows() { - let entry = lp_to_entry("cpu bar=1 10"); - let partition_key = "1970-01-01T00"; - - let write_buffer_state = - MockBufferSharedState::empty_with_n_sequencers(NonZeroU32::try_from(2).unwrap()); - write_buffer_state.push_entry(SequencedEntry::new_from_sequence( - Sequence::new(0, 0), - Utc::now(), - entry.clone(), - )); - write_buffer_state.push_entry(SequencedEntry::new_from_sequence( - Sequence::new(1, 0), - Utc::now(), - entry.clone(), - )); - write_buffer_state.push_entry(SequencedEntry::new_from_sequence( - Sequence::new(1, 2), - Utc::now(), - entry.clone(), - )); - write_buffer_state.push_entry(SequencedEntry::new_from_sequence( - Sequence::new(0, 1), - Utc::now(), - entry, - )); - let write_buffer = MockBufferForReading::new(write_buffer_state, None).unwrap(); - - let db = TestDb::builder() - .write_buffer(WriteBufferConfig::Reading(Arc::new( - tokio::sync::Mutex::new(Box::new(write_buffer) as _), - ))) - .build() - .await - .db; - - // do: start background task loop - let shutdown: CancellationToken = Default::default(); - let shutdown_captured = shutdown.clone(); - let db_captured = Arc::clone(&db); - let join_handle = - tokio::spawn(async move { db_captured.background_worker(shutdown_captured).await }); - db.allow_write_buffer_read(); - - // check: after a while the persistence windows should have the expected data - let t_0 = Instant::now(); - let min_unpersisted = loop { - if let Ok(partition) = db.catalog.partition("cpu", partition_key) { - let partition = partition.write(); - let windows = partition.persistence_windows().unwrap(); - let min_unpersisted = windows.minimum_unpersisted_sequence(); - - if let Some(min_unpersisted) = min_unpersisted { - break min_unpersisted; - } - } - - assert!(t_0.elapsed() < Duration::from_secs(10)); - tokio::time::sleep(Duration::from_millis(100)).await; - }; - - // do: stop background task loop - shutdown.cancel(); - join_handle.await.unwrap(); - - let mut expected_unpersisted = BTreeMap::new(); - expected_unpersisted.insert(0, MinMaxSequence::new(0, 1)); - expected_unpersisted.insert(1, MinMaxSequence::new(0, 2)); - - assert_eq!(min_unpersisted, expected_unpersisted); - } - #[tokio::test] async fn test_chunk_timestamps() { let start = Utc::now(); diff --git a/server/src/db/replay.rs b/server/src/db/replay.rs index 757880ccd2..d285e5758c 100644 --- a/server/src/db/replay.rs +++ b/server/src/db/replay.rs @@ -14,7 +14,7 @@ use persistence_windows::{ persistence_windows::PersistenceWindows, }; use snafu::{ResultExt, Snafu}; -use write_buffer::config::WriteBufferConfig; +use write_buffer::core::WriteBufferReading; use crate::Db; @@ -81,64 +81,59 @@ pub type Result = std::result::Result; /// operation fails. In that case some of the sequencers in the write buffers might already be seeked and others not. /// The caller must NOT use the write buffer in that case without ensuring that it is put into some proper state, e.g. /// by retrying this function. -pub async fn seek_to_end(db: &Db) -> Result<()> { - if let Some(WriteBufferConfig::Reading(write_buffer)) = &db.write_buffer { - let mut write_buffer = write_buffer - .try_lock() - .expect("no streams should exist at this point"); +pub async fn seek_to_end(db: &Db, write_buffer: &mut dyn WriteBufferReading) -> Result<()> { + let mut watermarks = vec![]; + for (sequencer_id, stream) in write_buffer.streams() { + let watermark = (stream.fetch_high_watermark)() + .await + .context(SeekError { sequencer_id })?; + watermarks.push((sequencer_id, watermark)); + } - let mut watermarks = vec![]; - for (sequencer_id, stream) in write_buffer.streams() { - let watermark = (stream.fetch_high_watermark)() - .await - .context(SeekError { sequencer_id })?; - watermarks.push((sequencer_id, watermark)); - } + for (sequencer_id, watermark) in &watermarks { + write_buffer + .seek(*sequencer_id, *watermark) + .await + .context(SeekError { + sequencer_id: *sequencer_id, + })?; + } - for (sequencer_id, watermark) in &watermarks { - write_buffer - .seek(*sequencer_id, *watermark) - .await - .context(SeekError { - sequencer_id: *sequencer_id, - })?; - } + // remember max seen sequence numbers + let late_arrival_window = db.rules().lifecycle_rules.late_arrive_window(); + let sequencer_numbers: BTreeMap<_, _> = watermarks + .into_iter() + .filter(|(_sequencer_id, watermark)| *watermark > 0) + .map(|(sequencer_id, watermark)| { + ( + sequencer_id, + OptionalMinMaxSequence::new(None, watermark - 1), + ) + }) + .collect(); - // remember max seen sequence numbers - let late_arrival_window = db.rules().lifecycle_rules.late_arrive_window(); - let sequencer_numbers: BTreeMap<_, _> = watermarks - .into_iter() - .filter(|(_sequencer_id, watermark)| *watermark > 0) - .map(|(sequencer_id, watermark)| { - ( - sequencer_id, - OptionalMinMaxSequence::new(None, watermark - 1), - ) - }) - .collect(); - for partition in db.catalog.partitions() { - let mut partition = partition.write(); + for partition in db.catalog.partitions() { + let mut partition = partition.write(); - let dummy_checkpoint = PartitionCheckpoint::new( - Arc::from(partition.table_name()), - Arc::from(partition.key()), - sequencer_numbers.clone(), - Utc::now(), - ); + let dummy_checkpoint = PartitionCheckpoint::new( + Arc::from(partition.table_name()), + Arc::from(partition.key()), + sequencer_numbers.clone(), + Utc::now(), + ); - match partition.persistence_windows_mut() { - Some(windows) => { - windows.mark_seen_and_persisted(&dummy_checkpoint); - } - None => { - let mut windows = PersistenceWindows::new( - partition.addr().clone(), - late_arrival_window, - db.background_worker_now(), - ); - windows.mark_seen_and_persisted(&dummy_checkpoint); - partition.set_persistence_windows(windows); - } + match partition.persistence_windows_mut() { + Some(windows) => { + windows.mark_seen_and_persisted(&dummy_checkpoint); + } + None => { + let mut windows = PersistenceWindows::new( + partition.addr().clone(), + late_arrival_window, + db.background_worker_now(), + ); + windows.mark_seen_and_persisted(&dummy_checkpoint); + partition.set_persistence_windows(windows); } } } @@ -147,158 +142,156 @@ pub async fn seek_to_end(db: &Db) -> Result<()> { } /// Perform sequencer-driven replay for this DB. -pub async fn perform_replay(db: &Db, replay_plan: &ReplayPlan) -> Result<()> { - if let Some(WriteBufferConfig::Reading(write_buffer)) = &db.write_buffer { - let db_name = db.rules.read().db_name().to_string(); - info!(%db_name, "starting replay"); +pub async fn perform_replay( + db: &Db, + replay_plan: &ReplayPlan, + write_buffer: &mut dyn WriteBufferReading, +) -> Result<()> { + let db_name = db.rules.read().db_name().to_string(); + info!(%db_name, "starting replay"); - let mut write_buffer = write_buffer - .try_lock() - .expect("no streams should exist at this point"); - - // check if write buffer and replay plan agree on the set of sequencer ids - let sequencer_ids: BTreeSet<_> = write_buffer - .streams() - .into_iter() - .map(|(sequencer_id, _stream)| sequencer_id) - .collect(); - for sequencer_id in replay_plan.sequencer_ids() { - if !sequencer_ids.contains(&sequencer_id) { - return Err(Error::UnknownSequencer { - sequencer_id, - sequencer_ids: sequencer_ids.iter().copied().collect(), - }); - } + // check if write buffer and replay plan agree on the set of sequencer ids + let sequencer_ids: BTreeSet<_> = write_buffer + .streams() + .into_iter() + .map(|(sequencer_id, _stream)| sequencer_id) + .collect(); + for sequencer_id in replay_plan.sequencer_ids() { + if !sequencer_ids.contains(&sequencer_id) { + return Err(Error::UnknownSequencer { + sequencer_id, + sequencer_ids: sequencer_ids.iter().copied().collect(), + }); } + } - // determine replay ranges based on the plan - let replay_ranges: BTreeMap<_, _> = sequencer_ids - .into_iter() - .filter_map(|sequencer_id| { - replay_plan - .replay_range(sequencer_id) - .map(|min_max| (sequencer_id, min_max)) - }) - .collect(); + // determine replay ranges based on the plan + let replay_ranges: BTreeMap<_, _> = sequencer_ids + .into_iter() + .filter_map(|sequencer_id| { + replay_plan + .replay_range(sequencer_id) + .map(|min_max| (sequencer_id, min_max)) + }) + .collect(); - // seek write buffer according to the plan - for (sequencer_id, min_max) in &replay_ranges { - if let Some(min) = min_max.min() { - info!(%db_name, sequencer_id, sequence_number=min, "seek sequencer in preperation for replay"); - write_buffer - .seek(*sequencer_id, min) - .await - .context(SeekError { - sequencer_id: *sequencer_id, - })?; - } else { - let sequence_number = min_max.max() + 1; - info!(%db_name, sequencer_id, sequence_number, "seek sequencer that did not require replay"); - write_buffer - .seek(*sequencer_id, sequence_number) - .await - .context(SeekError { - sequencer_id: *sequencer_id, - })?; - } + // seek write buffer according to the plan + for (sequencer_id, min_max) in &replay_ranges { + if let Some(min) = min_max.min() { + info!(%db_name, sequencer_id, sequence_number=min, "seek sequencer in preperation for replay"); + write_buffer + .seek(*sequencer_id, min) + .await + .context(SeekError { + sequencer_id: *sequencer_id, + })?; + } else { + let sequence_number = min_max.max() + 1; + info!(%db_name, sequencer_id, sequence_number, "seek sequencer that did not require replay"); + write_buffer + .seek(*sequencer_id, sequence_number) + .await + .context(SeekError { + sequencer_id: *sequencer_id, + })?; } + } - // replay ranges - for (sequencer_id, mut stream) in write_buffer.streams() { - if let Some(min_max) = replay_ranges.get(&sequencer_id) { - if min_max.min().is_none() { - // no replay required - continue; + // replay ranges + for (sequencer_id, mut stream) in write_buffer.streams() { + if let Some(min_max) = replay_ranges.get(&sequencer_id) { + if min_max.min().is_none() { + // no replay required + continue; + } + info!( + %db_name, + sequencer_id, + sequence_number_min=min_max.min().expect("checked above"), + sequence_number_max=min_max.max(), + "replay sequencer", + ); + + while let Some(entry) = stream + .stream + .try_next() + .await + .context(EntryError { sequencer_id })? + { + let sequence = *entry.sequence().expect("entry must be sequenced"); + if sequence.number > min_max.max() { + return Err(Error::EntryLostError { + sequencer_id, + actual_sequence_number: sequence.number, + expected_sequence_number: min_max.max(), + }); } - info!( - %db_name, - sequencer_id, - sequence_number_min=min_max.min().expect("checked above"), - sequence_number_max=min_max.max(), - "replay sequencer", - ); - while let Some(entry) = stream - .stream - .try_next() - .await - .context(EntryError { sequencer_id })? - { - let sequence = *entry.sequence().expect("entry must be sequenced"); - if sequence.number > min_max.max() { - return Err(Error::EntryLostError { - sequencer_id, - actual_sequence_number: sequence.number, - expected_sequence_number: min_max.max(), - }); - } - - let entry = Arc::new(entry); - let mut logged_hard_limit = false; - let n_tries = 600; // 600*100ms = 60s - for n_try in 1..=n_tries { - match db.store_sequenced_entry( - Arc::clone(&entry), - |sequence, partition_key, table_batch| { - filter_entry(sequence, partition_key, table_batch, replay_plan) - }, - ) { - Ok(_) => { - break; - } - Err(crate::db::Error::HardLimitReached {}) if n_try < n_tries => { - if !logged_hard_limit { - info!( - %db_name, - sequencer_id, - n_try, - n_tries, - "Hard limit reached while replaying, waiting for compaction to catch up", - ); - logged_hard_limit = true; - } - tokio::time::sleep(Duration::from_millis(100)).await; - continue; - } - Err(e) => { - return Err(Error::StoreError { + let entry = Arc::new(entry); + let mut logged_hard_limit = false; + let n_tries = 600; // 600*100ms = 60s + for n_try in 1..=n_tries { + match db.store_sequenced_entry( + Arc::clone(&entry), + |sequence, partition_key, table_batch| { + filter_entry(sequence, partition_key, table_batch, replay_plan) + }, + ) { + Ok(_) => { + break; + } + Err(crate::db::Error::HardLimitReached {}) if n_try < n_tries => { + if !logged_hard_limit { + info!( + %db_name, sequencer_id, - source: Box::new(e), - }); + n_try, + n_tries, + "Hard limit reached while replaying, waiting for compaction to catch up", + ); + logged_hard_limit = true; } + tokio::time::sleep(Duration::from_millis(100)).await; + continue; + } + Err(e) => { + return Err(Error::StoreError { + sequencer_id, + source: Box::new(e), + }); } } + } - // done replaying? - if sequence.number == min_max.max() { - break; - } + // done replaying? + if sequence.number == min_max.max() { + break; } } } + } - // remember max seen sequence numbers even for partitions that were not touched during replay - let late_arrival_window = db.rules().lifecycle_rules.late_arrive_window(); - for (table_name, partition_key) in replay_plan.partitions() { - if let Ok(partition) = db.partition(&table_name, &partition_key) { - let mut partition = partition.write(); - let partition_checkpoint = replay_plan - .last_partition_checkpoint(&table_name, &partition_key) - .expect("replay plan inconsistent"); + // remember max seen sequence numbers even for partitions that were not touched during replay + let late_arrival_window = db.rules().lifecycle_rules.late_arrive_window(); + for (table_name, partition_key) in replay_plan.partitions() { + if let Ok(partition) = db.partition(&table_name, &partition_key) { + let mut partition = partition.write(); + let partition_checkpoint = replay_plan + .last_partition_checkpoint(&table_name, &partition_key) + .expect("replay plan inconsistent"); - match partition.persistence_windows_mut() { - Some(windows) => { - windows.mark_seen_and_persisted(partition_checkpoint); - } - None => { - let mut windows = PersistenceWindows::new( - partition.addr().clone(), - late_arrival_window, - db.background_worker_now(), - ); - windows.mark_seen_and_persisted(partition_checkpoint); - partition.set_persistence_windows(windows); - } + match partition.persistence_windows_mut() { + Some(windows) => { + windows.mark_seen_and_persisted(partition_checkpoint); + } + None => { + let mut windows = PersistenceWindows::new( + partition.addr().clone(), + late_arrival_window, + db.background_worker_now(), + ); + windows.mark_seen_and_persisted(partition_checkpoint); + partition.set_persistence_windows(windows); } } } @@ -442,12 +435,10 @@ mod tests { use test_helpers::{assert_contains, assert_not_contains, tracing::TracingCapture}; use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; - use write_buffer::{ - config::WriteBufferConfig, - mock::{MockBufferForReading, MockBufferSharedState}, - }; + use write_buffer::mock::{MockBufferForReading, MockBufferSharedState}; use crate::utils::TestDb; + use crate::write_buffer::WriteBufferConsumer; #[derive(Debug)] struct TestSequencedEntry { @@ -573,11 +564,13 @@ mod tests { let partition_template = PartitionTemplate { parts: vec![TemplatePart::Column("tag_partition_by".to_string())], }; + + let registry = metrics::MetricRegistry::new(); let write_buffer_state = MockBufferSharedState::empty_with_n_sequencers(self.n_sequencers); + let (mut test_db, mut shutdown, mut join_handle) = Self::create_test_db( Arc::clone(&object_store), - write_buffer_state.clone(), server_id, db_name, partition_template.clone(), @@ -586,6 +579,15 @@ mod tests { ) .await; + let mut maybe_consumer = Some(WriteBufferConsumer::new( + Box::new(MockBufferForReading::new(write_buffer_state.clone(), None).unwrap()), + Arc::clone(&test_db.db), + ®istry, + )); + + // This is used to carry the write buffer from replay to await + let mut maybe_write_buffer = None; + // ==================== do: main loop ==================== for (step, action_or_check) in self.steps.into_iter().enumerate() { println!("===== step {} =====\n{:?}", step + 1, action_or_check); @@ -597,6 +599,11 @@ mod tests { } } Step::Restart => { + if let Some(consumer) = maybe_consumer.take() { + consumer.shutdown(); + consumer.join().await.unwrap(); + } + // stop background worker shutdown.cancel(); join_handle.await.unwrap(); @@ -610,7 +617,6 @@ mod tests { // then create new one let (test_db_tmp, shutdown_tmp, join_handle_tmp) = Self::create_test_db( Arc::clone(&object_store), - write_buffer_state.clone(), server_id, db_name, partition_template.clone(), @@ -622,15 +628,26 @@ mod tests { shutdown = shutdown_tmp; join_handle = join_handle_tmp; } - Step::Replay => { - let db = &test_db.db; + Step::Replay | Step::SkipReplay => { + assert!(maybe_consumer.is_none()); + assert!(maybe_write_buffer.is_none()); - db.perform_replay(Some(&test_db.replay_plan)).await.unwrap(); - } - Step::SkipReplay => { - let db = &test_db.db; + let replay_plan = match action_or_check { + Step::Replay => Some(&test_db.replay_plan), + Step::SkipReplay => None, + _ => unreachable!(), + }; - db.perform_replay(None).await.unwrap(); + let mut write_buffer = + MockBufferForReading::new(write_buffer_state.clone(), None).unwrap(); + + test_db + .db + .perform_replay(replay_plan, &mut write_buffer) + .await + .unwrap(); + + maybe_write_buffer = Some(write_buffer); } Step::Persist(partitions) => { let db = &test_db.db; @@ -689,9 +706,21 @@ mod tests { Self::eval_checks(&checks, true, &test_db).await; } Step::Await(checks) => { + if maybe_consumer.is_none() { + let write_buffer = match maybe_write_buffer.take() { + Some(write_buffer) => write_buffer, + None => MockBufferForReading::new(write_buffer_state.clone(), None) + .unwrap(), + }; + maybe_consumer = Some(WriteBufferConsumer::new( + Box::new(write_buffer), + Arc::clone(&test_db.db), + ®istry, + )); + } + let db = &test_db.db; db.unsuppress_persistence().await; - db.allow_write_buffer_read(); // wait until checks pass let t_0 = Instant::now(); @@ -734,20 +763,15 @@ mod tests { async fn create_test_db( object_store: Arc, - write_buffer_state: MockBufferSharedState, server_id: ServerId, db_name: &'static str, partition_template: PartitionTemplate, catalog_transactions_until_checkpoint: NonZeroU64, now: Instant, ) -> (TestDb, CancellationToken, JoinHandle<()>) { - let write_buffer = MockBufferForReading::new(write_buffer_state, None).unwrap(); let test_db = TestDb::builder() .object_store(object_store) .server_id(server_id) - .write_buffer(WriteBufferConfig::Reading(Arc::new( - tokio::sync::Mutex::new(Box::new(write_buffer) as _), - ))) .lifecycle_rules(data_types::database_rules::LifecycleRules { buffer_size_hard: Some(NonZeroUsize::new(12_000).unwrap()), late_arrive_window_seconds: NonZeroU32::try_from(1).unwrap(), @@ -2588,16 +2612,10 @@ mod tests { // create write buffer w/ sequencer 0 and 1 let write_buffer_state = MockBufferSharedState::empty_with_n_sequencers(NonZeroU32::try_from(2).unwrap()); - let write_buffer = MockBufferForReading::new(write_buffer_state, None).unwrap(); + let mut write_buffer = MockBufferForReading::new(write_buffer_state, None).unwrap(); // create DB - let db = TestDb::builder() - .write_buffer(WriteBufferConfig::Reading(Arc::new( - tokio::sync::Mutex::new(Box::new(write_buffer) as _), - ))) - .build() - .await - .db; + let db = TestDb::builder().build().await.db; // construct replay plan for sequencers 0 and 2 let mut sequencer_numbers = BTreeMap::new(); @@ -2618,7 +2636,9 @@ mod tests { let replay_plan = replay_planner.build().unwrap(); // replay fails - let res = db.perform_replay(Some(&replay_plan)).await; + let res = db + .perform_replay(Some(&replay_plan), &mut write_buffer) + .await; assert_contains!( res.unwrap_err().to_string(), "Replay plan references unknown sequencer" @@ -2640,16 +2660,10 @@ mod tests { Utc::now(), lp_to_entry("cpu bar=1 10"), )); - let write_buffer = MockBufferForReading::new(write_buffer_state, None).unwrap(); + let mut write_buffer = MockBufferForReading::new(write_buffer_state, None).unwrap(); // create DB - let db = TestDb::builder() - .write_buffer(WriteBufferConfig::Reading(Arc::new( - tokio::sync::Mutex::new(Box::new(write_buffer) as _), - ))) - .build() - .await - .db; + let db = TestDb::builder().build().await.db; // construct replay plan to replay sequence numbers 0 and 1 let mut sequencer_numbers = BTreeMap::new(); @@ -2669,7 +2683,9 @@ mod tests { let replay_plan = replay_planner.build().unwrap(); // replay fails - let res = db.perform_replay(Some(&replay_plan)).await; + let res = db + .perform_replay(Some(&replay_plan), &mut write_buffer) + .await; assert_contains!( res.unwrap_err().to_string(), "Cannot replay: For sequencer 0 expected to find sequence 1 but replay jumped to 2" @@ -2699,19 +2715,14 @@ mod tests { Utc::now(), lp_to_entry("cpu bar=11 11"), )); - let write_buffer = MockBufferForReading::new(write_buffer_state.clone(), None).unwrap(); + let mut write_buffer = MockBufferForReading::new(write_buffer_state.clone(), None).unwrap(); // create DB - let test_db = TestDb::builder() - .write_buffer(WriteBufferConfig::Reading(Arc::new( - tokio::sync::Mutex::new(Box::new(write_buffer) as _), - ))) - .build() - .await; + let test_db = TestDb::builder().build().await; let db = &test_db.db; // seek - db.perform_replay(None).await.unwrap(); + db.perform_replay(None, &mut write_buffer).await.unwrap(); // add more data write_buffer_state.push_entry(SequencedEntry::new_from_sequence( @@ -2736,7 +2747,9 @@ mod tests { let db_captured = Arc::clone(db); let join_handle = tokio::spawn(async move { db_captured.background_worker(shutdown_captured).await }); - db.allow_write_buffer_read(); + + let consumer = + WriteBufferConsumer::new(Box::new(write_buffer), Arc::clone(db), &Default::default()); // wait until checks pass let checks = vec![Check::Query( @@ -2771,6 +2784,9 @@ mod tests { // stop background worker shutdown.cancel(); join_handle.await.unwrap(); + + consumer.shutdown(); + consumer.join().await.unwrap(); } #[test] diff --git a/server/src/lib.rs b/server/src/lib.rs index ac5071af1f..0c7c7480d7 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -114,6 +114,8 @@ pub mod rules; /// Utility modules used by benchmarks and tests pub mod utils; +mod write_buffer; + type DatabaseError = Box; #[derive(Debug, Snafu)] @@ -646,11 +648,16 @@ where Ok(Arc::clone(db)) } + /// Returns an active `Database` by name + pub fn active_database(&self, db_name: &DatabaseName<'_>) -> Result> { + let database = self.database(db_name)?; + ensure!(database.is_active(), DatabaseNotFound { db_name }); + Ok(database) + } + /// Returns an initialized `Db` by name pub fn db(&self, db_name: &DatabaseName<'_>) -> Result> { - let database = self.database(db_name)?; - - ensure!(database.is_active(), DatabaseNotFound { db_name }); + let database = self.active_database(db_name)?; database .initialized_db() @@ -930,7 +937,18 @@ where /// /// TODO: Push this out of `Server` into `Database` pub async fn write_entry_local(&self, db_name: &DatabaseName<'_>, entry: Entry) -> Result<()> { - let db = self.db(db_name)?; + let database = self.active_database(db_name)?; + let db = { + let initialized = database + .initialized() + .context(DatabaseNotInitialized { db_name })?; + + if initialized.write_buffer_consumer().is_some() { + return WritingOnlyAllowedThroughWriteBuffer { db_name }.fail(); + } + Arc::clone(initialized.db()) + }; + let bytes = entry.data().len() as u64; db.store_entry(entry).await.map_err(|e| { self.metrics.ingest_entries_bytes_total.add_with_attributes( @@ -942,11 +960,6 @@ where ); match e { db::Error::HardLimitReached {} => Error::HardLimitReached {}, - db::Error::WritingOnlyAllowedThroughWriteBuffer {} => { - Error::WritingOnlyAllowedThroughWriteBuffer { - db_name: db_name.into(), - } - } db::Error::WriteBufferWritingError { .. } => { Error::WriteBuffer { source: e, bytes } } @@ -1243,6 +1256,7 @@ where #[cfg(test)] mod tests { use super::*; + use ::write_buffer::config::WriteBufferConfigFactory; use arrow::record_batch::RecordBatch; use arrow_util::assert_batches_eq; use bytes::Bytes; @@ -1274,7 +1288,6 @@ mod tests { time::{Duration, Instant}, }; use test_helpers::assert_contains; - use write_buffer::config::WriteBufferConfigFactory; const ARBITRARY_DEFAULT_TIME: i64 = 456; diff --git a/server/src/utils.rs b/server/src/utils.rs index b1005a749d..28ea86ed19 100644 --- a/server/src/utils.rs +++ b/server/src/utils.rs @@ -14,7 +14,7 @@ use persistence_windows::checkpoint::ReplayPlan; use query::exec::ExecutorConfig; use query::{exec::Executor, QueryDatabase}; use std::{borrow::Cow, convert::TryFrom, num::NonZeroU32, sync::Arc, time::Duration}; -use write_buffer::config::WriteBufferConfig; +use write_buffer::core::WriteBufferWriting; // A wrapper around a Db and a metrics registry allowing for isolated testing // of a Db and its metrics. @@ -38,7 +38,7 @@ pub struct TestDbBuilder { object_store: Option>, db_name: Option>, worker_cleanup_avg_sleep: Option, - write_buffer: Option, + write_buffer_producer: Option>, lifecycle_rules: Option, partition_template: Option, } @@ -124,7 +124,7 @@ impl TestDbBuilder { iox_object_store, preserved_catalog, catalog, - write_buffer: self.write_buffer, + write_buffer_producer: self.write_buffer_producer, exec, metrics_registry_v2: Arc::clone(&metrics_registry_v2), }; @@ -160,8 +160,11 @@ impl TestDbBuilder { self } - pub fn write_buffer(mut self, write_buffer: WriteBufferConfig) -> Self { - self.write_buffer = Some(write_buffer); + pub fn write_buffer_producer( + mut self, + write_buffer_producer: Arc, + ) -> Self { + self.write_buffer_producer = Some(write_buffer_producer); self } diff --git a/server/src/write_buffer.rs b/server/src/write_buffer.rs new file mode 100644 index 0000000000..4b4213e551 --- /dev/null +++ b/server/src/write_buffer.rs @@ -0,0 +1,545 @@ +use std::future::Future; +use std::sync::Arc; +use std::time::{Duration, Instant}; + +use futures::future::{BoxFuture, Shared}; +use futures::stream::{BoxStream, FuturesUnordered}; +use futures::{FutureExt, StreamExt, TryFutureExt}; +use tokio::task::JoinError; +use tokio_util::sync::CancellationToken; + +use ::metrics::{KeyValue, MetricRegistry}; +use entry::SequencedEntry; +use observability_deps::tracing::{debug, error, info}; +use write_buffer::core::{FetchHighWatermark, WriteBufferError, WriteBufferReading}; + +use crate::Db; + +use self::metrics::SequencerMetrics; + +mod metrics; + +/// A `WriteBufferConsumer` is created from a `Db` and a `WriteBufferReading` and +/// sinks records from the inbound streams into the `Db` +#[derive(Debug)] +pub struct WriteBufferConsumer { + /// Future that resolves when the background worker exits + join: Shared>>>, + + /// A token that is used to trigger shutdown of the background worker + shutdown: CancellationToken, +} + +impl WriteBufferConsumer { + pub fn new( + mut write_buffer: Box, + db: Arc, + registry: &MetricRegistry, + ) -> Self { + let shutdown = CancellationToken::new(); + + let ingest_domain = registry.register_domain_with_attributes( + "write_buffer", + vec![KeyValue::new("db_name", db.rules().name.to_string())], + ); + + let ingest_metrics = metrics::WriteBufferIngestMetrics::new(Arc::new(ingest_domain)); + + let shutdown_captured = shutdown.clone(); + let join = tokio::spawn(async move { + let mut futures: FuturesUnordered<_> = write_buffer + .streams() + .into_iter() + .map(|(sequencer_id, stream)| { + let metrics = ingest_metrics.new_sequencer_metrics(sequencer_id); + stream_in_sequenced_entries( + Arc::clone(&db), + sequencer_id, + stream.stream, + stream.fetch_high_watermark, + metrics, + ) + }) + .collect(); + + tokio::select! { + _ = shutdown_captured.cancelled() => info!("write buffer shut down triggered"), + _ = futures.next() => error!("unexpected shutdown of write buffer consumer"), + } + }) + .map_err(Arc::new) + .boxed() + .shared(); + + Self { join, shutdown } + } + + /// Triggers shutdown of this `WriteBufferConsumer` + pub fn shutdown(&self) { + self.shutdown.cancel() + } + + /// Waits for the background worker of this `Database` to exit + pub fn join(&self) -> impl Future>> { + self.join.clone() + } +} + +/// This is used to take entries from a `Stream` and put them in the mutable buffer, such as +/// streaming entries from a write buffer. +async fn stream_in_sequenced_entries<'a>( + db: Arc, + sequencer_id: u32, + mut stream: BoxStream<'a, Result>, + f_mark: FetchHighWatermark<'a>, + mut metrics: SequencerMetrics, +) { + let db_name = db.rules().name.to_string(); + let mut watermark_last_updated: Option = None; + let mut watermark = 0_u64; + + while let Some(sequenced_entry_result) = stream.next().await { + let red_observation = metrics.red.observation(); + + // get entry from sequencer + let sequenced_entry = match sequenced_entry_result { + Ok(sequenced_entry) => sequenced_entry, + Err(e) => { + debug!( + %e, + %db_name, + sequencer_id, + "Error converting write buffer data to SequencedEntry", + ); + red_observation.client_error(); + continue; + } + }; + let sequenced_entry = Arc::new(sequenced_entry); + + // store entry + let mut logged_hard_limit = false; + loop { + match db.store_sequenced_entry( + Arc::clone(&sequenced_entry), + crate::db::filter_table_batch_keep_all, + ) { + Ok(_) => { + red_observation.ok(); + break; + } + Err(crate::db::Error::HardLimitReached {}) => { + // wait a bit and retry + if !logged_hard_limit { + info!( + %db_name, + sequencer_id, + "Hard limit reached while reading from write buffer, waiting for compaction to catch up", + ); + logged_hard_limit = true; + } + tokio::time::sleep(Duration::from_millis(100)).await; + continue; + } + Err(e) => { + debug!( + %e, + %db_name, + sequencer_id, + "Error storing SequencedEntry from write buffer in database" + ); + red_observation.error(); + + // no retry + break; + } + } + } + + // maybe update sequencer watermark + // We are not updating this watermark every round because asking the sequencer for that watermark can be + // quite expensive. + let now = Instant::now(); + if watermark_last_updated + .map(|ts| now.duration_since(ts) > Duration::from_secs(10)) + .unwrap_or(true) + { + match f_mark().await { + Ok(w) => { + watermark = w; + } + Err(e) => { + debug!( + %e, + %db_name, + sequencer_id, + "Error while reading sequencer watermark", + ) + } + } + watermark_last_updated = Some(now); + } + + std::mem::drop(red_observation); + + metrics.record_write(&sequenced_entry, watermark) + } +} + +#[cfg(test)] +mod tests { + use std::collections::BTreeMap; + use std::convert::TryFrom; + use std::num::{NonZeroU32, NonZeroUsize}; + + use chrono::{TimeZone, Utc}; + + use ::test_helpers::assert_contains; + use arrow_util::assert_batches_eq; + use data_types::database_rules::{PartitionTemplate, TemplatePart}; + use entry::test_helpers::lp_to_entry; + use entry::Sequence; + use persistence_windows::min_max_sequence::MinMaxSequence; + use query::exec::ExecutionContextProvider; + use query::frontend::sql::SqlQueryPlanner; + use query::QueryDatabase; + use test_helpers::tracing::TracingCapture; + use write_buffer::mock::{MockBufferForReading, MockBufferSharedState}; + + use crate::db::test_helpers::run_query; + use crate::utils::TestDb; + + use super::*; + + #[tokio::test] + async fn read_from_write_buffer_updates_persistence_windows() { + let entry = lp_to_entry("cpu bar=1 10"); + let partition_key = "1970-01-01T00"; + + let write_buffer_state = + MockBufferSharedState::empty_with_n_sequencers(NonZeroU32::try_from(2).unwrap()); + write_buffer_state.push_entry(SequencedEntry::new_from_sequence( + Sequence::new(0, 0), + Utc::now(), + entry.clone(), + )); + write_buffer_state.push_entry(SequencedEntry::new_from_sequence( + Sequence::new(1, 0), + Utc::now(), + entry.clone(), + )); + write_buffer_state.push_entry(SequencedEntry::new_from_sequence( + Sequence::new(1, 2), + Utc::now(), + entry.clone(), + )); + write_buffer_state.push_entry(SequencedEntry::new_from_sequence( + Sequence::new(0, 1), + Utc::now(), + entry, + )); + let db = TestDb::builder().build().await.db; + + // do: start background task loop + let shutdown: CancellationToken = Default::default(); + let shutdown_captured = shutdown.clone(); + let db_captured = Arc::clone(&db); + let join_handle = + tokio::spawn(async move { db_captured.background_worker(shutdown_captured).await }); + + let consumer = WriteBufferConsumer::new( + Box::new(MockBufferForReading::new(write_buffer_state, None).unwrap()), + Arc::clone(&db), + &Default::default(), + ); + + // check: after a while the persistence windows should have the expected data + let t_0 = Instant::now(); + let min_unpersisted = loop { + if let Ok(partition) = db.partition("cpu", partition_key) { + let partition = partition.write(); + let windows = partition.persistence_windows().unwrap(); + let min_unpersisted = windows.minimum_unpersisted_sequence(); + + if let Some(min_unpersisted) = min_unpersisted { + break min_unpersisted; + } + } + + assert!(t_0.elapsed() < Duration::from_secs(10)); + tokio::time::sleep(Duration::from_millis(100)).await; + }; + + // do: stop background task loop + shutdown.cancel(); + join_handle.await.unwrap(); + + consumer.shutdown(); + consumer.join().await.unwrap(); + + let mut expected_unpersisted = BTreeMap::new(); + expected_unpersisted.insert(0, MinMaxSequence::new(0, 1)); + expected_unpersisted.insert(1, MinMaxSequence::new(0, 2)); + + assert_eq!(min_unpersisted, expected_unpersisted); + } + + #[tokio::test] + async fn read_from_write_buffer_write_to_mutable_buffer() { + let write_buffer_state = + MockBufferSharedState::empty_with_n_sequencers(NonZeroU32::try_from(1).unwrap()); + let ingest_ts1 = Utc.timestamp_millis(42); + let ingest_ts2 = Utc.timestamp_millis(1337); + write_buffer_state.push_entry(SequencedEntry::new_from_sequence( + Sequence::new(0, 0), + ingest_ts1, + lp_to_entry("mem foo=1 10"), + )); + write_buffer_state.push_entry(SequencedEntry::new_from_sequence( + Sequence::new(0, 7), + ingest_ts2, + lp_to_entry("cpu bar=2 20\ncpu bar=3 30"), + )); + let test_db = TestDb::builder().build().await; + let db = test_db.db; + + // do: start background task loop + let shutdown: CancellationToken = Default::default(); + let shutdown_captured = shutdown.clone(); + let db_captured = Arc::clone(&db); + let join_handle = + tokio::spawn(async move { db_captured.background_worker(shutdown_captured).await }); + + let consumer = WriteBufferConsumer::new( + Box::new(MockBufferForReading::new(write_buffer_state, None).unwrap()), + Arc::clone(&db), + test_db.metric_registry.registry().as_ref(), + ); + + let query = "select * from cpu"; + + // check: after a while the table should exist and a query plan should succeed + let t_0 = Instant::now(); + loop { + let planner = SqlQueryPlanner::default(); + let ctx = db.new_query_context(None); + let physical_plan = planner.query(query, &ctx); + + if physical_plan.is_ok() { + break; + } + + assert!(t_0.elapsed() < Duration::from_secs(10)); + tokio::time::sleep(Duration::from_millis(100)).await; + } + + // check: metrics + // We need to do that BEFORE shutting down the background loop because gauges would be dropped and resetted otherwise + let metrics = test_db.metric_registry; + metrics + .has_metric_family("write_buffer_ingest_requests_total") + .with_attributes(&[ + ("db_name", "placeholder"), + ("sequencer_id", "0"), + ("status", "ok"), + ]) + .counter() + .eq(2.0) + .unwrap(); + metrics + .has_metric_family("write_buffer_read_bytes_total") + .with_attributes(&[("db_name", "placeholder"), ("sequencer_id", "0")]) + .counter() + .eq(528.0) + .unwrap(); + metrics + .has_metric_family("write_buffer_last_sequence_number") + .with_attributes(&[("db_name", "placeholder"), ("sequencer_id", "0")]) + .gauge() + .eq(7.0) + .unwrap(); + metrics + .has_metric_family("write_buffer_sequence_number_lag") + .with_attributes(&[("db_name", "placeholder"), ("sequencer_id", "0")]) + .gauge() + .eq(0.0) + .unwrap(); + metrics + .has_metric_family("write_buffer_last_min_ts") + .with_attributes(&[("db_name", "placeholder"), ("sequencer_id", "0")]) + .gauge() + .eq(20.0) + .unwrap(); + metrics + .has_metric_family("write_buffer_last_max_ts") + .with_attributes(&[("db_name", "placeholder"), ("sequencer_id", "0")]) + .gauge() + .eq(30.0) + .unwrap(); + metrics + .has_metric_family("write_buffer_last_ingest_ts") + .with_attributes(&[("db_name", "placeholder"), ("sequencer_id", "0")]) + .gauge() + .eq(ingest_ts2.timestamp_nanos() as f64) + .unwrap(); + + // do: stop background task loop + shutdown.cancel(); + join_handle.await.unwrap(); + + consumer.shutdown(); + consumer.join().await.unwrap(); + + // check: the expected results should be there + let batches = run_query(db, "select * from cpu order by time").await; + + let expected = vec![ + "+-----+--------------------------------+", + "| bar | time |", + "+-----+--------------------------------+", + "| 2 | 1970-01-01T00:00:00.000000020Z |", + "| 3 | 1970-01-01T00:00:00.000000030Z |", + "+-----+--------------------------------+", + ]; + assert_batches_eq!(expected, &batches); + } + + #[tokio::test] + async fn write_buffer_reads_wait_for_compaction() { + let tracing_capture = TracingCapture::new(); + + // setup write buffer + // these numbers are handtuned to trigger hard buffer limits w/o making the test too big + let n_entries = 50u64; + let write_buffer_state = + MockBufferSharedState::empty_with_n_sequencers(NonZeroU32::try_from(1).unwrap()); + for sequence_number in 0..n_entries { + let lp = format!( + "table_1,tag_partition_by=a foo=\"hello\",bar=1 {}", + sequence_number / 2 + ); + write_buffer_state.push_entry(SequencedEntry::new_from_sequence( + Sequence::new(0, sequence_number), + Utc::now(), + lp_to_entry(&lp), + )); + } + write_buffer_state.push_entry(SequencedEntry::new_from_sequence( + Sequence::new(0, n_entries), + Utc::now(), + lp_to_entry("table_2,partition_by=a foo=1 0"), + )); + + // create DB + let partition_template = PartitionTemplate { + parts: vec![TemplatePart::Column("tag_partition_by".to_string())], + }; + let test_db = TestDb::builder() + .lifecycle_rules(data_types::database_rules::LifecycleRules { + buffer_size_hard: Some(NonZeroUsize::new(10_000).unwrap()), + mub_row_threshold: NonZeroUsize::new(10).unwrap(), + ..Default::default() + }) + .partition_template(partition_template) + .build() + .await; + let db = test_db.db; + + // start background task loop + let shutdown: CancellationToken = Default::default(); + let shutdown_captured = shutdown.clone(); + let db_captured = Arc::clone(&db); + let join_handle = + tokio::spawn(async move { db_captured.background_worker(shutdown_captured).await }); + + let consumer = WriteBufferConsumer::new( + Box::new(MockBufferForReading::new(write_buffer_state, None).unwrap()), + Arc::clone(&db), + test_db.metric_registry.registry().as_ref(), + ); + + // after a while the table should exist + let t_0 = Instant::now(); + loop { + if db.table_schema("table_2").is_some() { + break; + } + + assert!(t_0.elapsed() < Duration::from_secs(10)); + tokio::time::sleep(Duration::from_millis(100)).await; + } + + // do: stop background task loop + shutdown.cancel(); + join_handle.await.unwrap(); + + consumer.shutdown(); + consumer.join().await.unwrap(); + + // no rows should be dropped + let batches = run_query(db, "select sum(bar) as n from table_1").await; + let expected = vec!["+----+", "| n |", "+----+", "| 25 |", "+----+"]; + assert_batches_eq!(expected, &batches); + + // check that hard buffer limit was actually hit (otherwise this test is pointless/outdated) + assert_contains!(tracing_capture.to_string(), "Hard limit reached while reading from write buffer, waiting for compaction to catch up"); + } + + #[tokio::test] + async fn error_converting_data_from_write_buffer_to_sequenced_entry_is_reported() { + let write_buffer_state = + MockBufferSharedState::empty_with_n_sequencers(NonZeroU32::try_from(1).unwrap()); + write_buffer_state.push_error( + String::from("Something bad happened on the way to creating a SequencedEntry").into(), + 0, + ); + let test_db = TestDb::builder().build().await; + + let db = Arc::new(test_db.db); + let metrics = test_db.metric_registry; + + // do: start background task loop + let shutdown: CancellationToken = Default::default(); + let shutdown_captured = shutdown.clone(); + let db_captured = Arc::clone(&db); + let join_handle = + tokio::spawn(async move { db_captured.background_worker(shutdown_captured).await }); + + let consumer = WriteBufferConsumer::new( + Box::new(MockBufferForReading::new(write_buffer_state, None).unwrap()), + Arc::clone(&db), + metrics.registry().as_ref(), + ); + + // check: after a while the error should be reported in the database's metrics + let t_0 = Instant::now(); + loop { + let family = metrics.try_has_metric_family("write_buffer_ingest_requests_total"); + + if let Ok(metric) = family { + if metric + .with_attributes(&[ + ("db_name", "placeholder"), + ("sequencer_id", "0"), + ("status", "client_error"), + ]) + .counter() + .eq(1.0) + .is_ok() + { + break; + } + } + + assert!(t_0.elapsed() < Duration::from_secs(10)); + tokio::time::sleep(Duration::from_millis(100)).await; + } + + // do: stop background task loop + shutdown.cancel(); + join_handle.await.unwrap(); + + consumer.shutdown(); + consumer.join().await.unwrap(); + } +} diff --git a/server/src/write_buffer/metrics.rs b/server/src/write_buffer/metrics.rs new file mode 100644 index 0000000000..4d4b81be24 --- /dev/null +++ b/server/src/write_buffer/metrics.rs @@ -0,0 +1,167 @@ +use metrics::KeyValue; +use std::sync::Arc; + +/// Metrics for data ingest via write buffer. +#[derive(Debug)] +pub struct WriteBufferIngestMetrics { + /// Metrics domain + domain: Arc, +} + +impl WriteBufferIngestMetrics { + pub fn new(domain: Arc) -> Self { + Self { domain } + } + + pub fn new_sequencer_metrics(&self, sequencer_id: u32) -> SequencerMetrics { + let attributes = vec![KeyValue::new("sequencer_id", sequencer_id.to_string())]; + + let red = self + .domain + .register_red_metric_with_attributes(Some("ingest"), attributes.clone()); + let bytes_read = self.domain.register_counter_metric_with_attributes( + "read", + Some("bytes"), + "Bytes read from sequencer", + attributes.clone(), + ); + let last_sequence_number = self.domain.register_gauge_metric_with_attributes( + "last_sequence_number", + None, + "Last consumed sequence number (e.g. Kafka offset)", + &attributes, + ); + let sequence_number_lag = self.domain.register_gauge_metric_with_attributes( + "sequence_number_lag", + None, + "The difference between the the last sequence number available (e.g. Kafka offset) and (= minus) last consumed sequence number", + &attributes, + ); + let last_min_ts = self.domain.register_gauge_metric_with_attributes( + "last_min_ts", + None, + "Minimum timestamp of last write as unix timestamp in nanoseconds", + &attributes, + ); + let last_max_ts = self.domain.register_gauge_metric_with_attributes( + "last_max_ts", + None, + "Maximum timestamp of last write as unix timestamp in nanoseconds", + &attributes, + ); + let last_ingest_ts = self.domain.register_gauge_metric_with_attributes( + "last_ingest_ts", + None, + "Last seen ingest timestamp as unix timestamp in nanoseconds", + &attributes, + ); + + SequencerMetrics { + red, + bytes_read, + last_sequence_number, + sequence_number_lag, + last_min_ts, + last_max_ts, + last_ingest_ts, + } + } +} + +/// Metrics for a single sequencer. +#[derive(Debug)] +pub struct SequencerMetrics { + /// Metrics for tracking ingest. + pub(super) red: metrics::RedMetric, + + /// Bytes read from sequencer. + /// + /// This metrics is independent of the success / error state of the entries. + bytes_read: metrics::Counter, + + /// Last consumed sequence number (e.g. Kafka offset). + last_sequence_number: metrics::Gauge, + + // The difference between the the last sequence number available (e.g. Kafka offset) and (= minus) last consumed + // sequence number. + sequence_number_lag: metrics::Gauge, + + /// Minimum timestamp of last write as unix timestamp in nanoseconds. + last_min_ts: metrics::Gauge, + + /// Maximum timestamp of last write as unix timestamp in nanoseconds. + last_max_ts: metrics::Gauge, + + /// Last seen ingest timestamp as unix timestamp in nanoseconds. + last_ingest_ts: metrics::Gauge, +} + +impl SequencerMetrics { + /// Record a succesful write + /// + /// Updates: + /// + /// - bytes read + /// - last sequence number + /// - lag + /// - min ts + /// - max ts + /// - ingest ts + pub fn record_write(&mut self, sequenced_entry: &entry::SequencedEntry, watermark: u64) { + let entry = sequenced_entry.entry(); + let producer_wallclock_timestamp = sequenced_entry + .producer_wallclock_timestamp() + .expect("entry from write buffer must have a producer wallclock time"); + + let sequence = sequenced_entry + .sequence() + .expect("entry from write buffer must be sequenced"); + + self.bytes_read.add(entry.data().len() as u64); + self.last_sequence_number.set(sequence.number as usize, &[]); + self.sequence_number_lag.set( + watermark.saturating_sub(sequence.number).saturating_sub(1) as usize, + &[], + ); + if let Some(min_ts) = entry + .partition_writes() + .map(|partition_writes| { + partition_writes + .iter() + .filter_map(|partition_write| { + partition_write + .table_batches() + .iter() + .filter_map(|table_batch| table_batch.min_max_time().ok()) + .map(|(min, _max)| min) + .max() + }) + .min() + }) + .flatten() + { + self.last_min_ts.set(min_ts.timestamp_nanos() as usize, &[]); + } + if let Some(max_ts) = entry + .partition_writes() + .map(|partition_writes| { + partition_writes + .iter() + .filter_map(|partition_write| { + partition_write + .table_batches() + .iter() + .filter_map(|table_batch| table_batch.min_max_time().ok()) + .map(|(_min, max)| max) + .max() + }) + .max() + }) + .flatten() + { + self.last_max_ts.set(max_ts.timestamp_nanos() as usize, &[]); + } + self.last_ingest_ts + .set(producer_wallclock_timestamp.timestamp_nanos() as usize, &[]); + } +} diff --git a/write_buffer/src/config.rs b/write_buffer/src/config.rs index 8475b66ffe..4c39832054 100644 --- a/write_buffer/src/config.rs +++ b/write_buffer/src/config.rs @@ -4,7 +4,7 @@ use std::{ }; use data_types::{ - database_rules::{DatabaseRules, WriteBufferConnection, WriteBufferDirection}, + database_rules::{WriteBufferConnection, WriteBufferDirection}, server_id::ServerId, }; @@ -29,7 +29,8 @@ enum Mock { AlwaysFailing, } -/// Factory that creates [`WriteBufferConfig`] from [`DatabaseRules`]. +/// Factory that creates [`WriteBufferReading`] and [`WriteBufferWriting`] +/// from [`WriteBufferConnection`]. #[derive(Debug)] pub struct WriteBufferConfigFactory { mocks: BTreeMap, @@ -77,30 +78,12 @@ impl WriteBufferConfigFactory { .ok_or_else::(|| format!("Unknown mock ID: {}", name).into()) } - /// Create new config. - pub async fn new_config( - &self, - server_id: ServerId, - rules: &DatabaseRules, - ) -> Result, WriteBufferError> { - let db_name = rules.db_name(); - - let cfg = match rules.write_buffer_connection.as_ref() { - Some(cfg) => match cfg.direction { - WriteBufferDirection::Write => Some(WriteBufferConfig::Writing( - self.new_config_write(db_name, cfg).await?, - )), - WriteBufferDirection::Read => Some(WriteBufferConfig::Reading(Arc::new( - tokio::sync::Mutex::new(self.new_config_read(server_id, db_name, cfg).await?), - ))), - }, - None => None, - }; - - Ok(cfg) - } - - async fn new_config_write( + /// Returns a new [`WriteBufferWriting`] for the provided [`WriteBufferConnection`] + /// + /// # Panics + /// When the provided connection is not [`WriteBufferDirection::Write`] + /// + pub async fn new_config_write( &self, db_name: &str, cfg: &WriteBufferConnection, @@ -137,7 +120,11 @@ impl WriteBufferConfigFactory { Ok(writer) } - async fn new_config_read( + /// Returns a new [`WriteBufferReading`] for the provided [`WriteBufferConnection`] + /// + /// # Panics + /// When the provided connection is not [`WriteBufferDirection::Read`] + pub async fn new_config_read( &self, server_id: ServerId, db_name: &str, @@ -196,47 +183,24 @@ mod tests { use super::*; - #[tokio::test] - async fn test_none() { - let factory = WriteBufferConfigFactory::new(); - - let server_id = ServerId::try_from(1).unwrap(); - - let mut rules = DatabaseRules::new(DatabaseName::new("foo").unwrap()); - rules.write_buffer_connection = None; - - assert!(factory - .new_config(server_id, &rules) - .await - .unwrap() - .is_none()); - } - #[tokio::test] async fn test_writing_kafka() { let conn = maybe_skip_kafka_integration!(); let factory = WriteBufferConfigFactory::new(); - let server_id = ServerId::try_from(1).unwrap(); - - let mut rules = DatabaseRules::new(DatabaseName::try_from(random_kafka_topic()).unwrap()); - rules.write_buffer_connection = Some(WriteBufferConnection { + let db_name = DatabaseName::try_from(random_kafka_topic()).unwrap(); + let cfg = WriteBufferConnection { direction: WriteBufferDirection::Write, type_: "kafka".to_string(), connection: conn, creation_config: Some(WriteBufferCreationConfig::default()), ..Default::default() - }); + }; - if let WriteBufferConfig::Writing(conn) = factory - .new_config(server_id, &rules) + let conn = factory + .new_config_write(db_name.as_str(), &cfg) .await - .unwrap() - .unwrap() - { - assert_eq!(conn.type_name(), "kafka"); - } else { - panic!("not a writing connection"); - } + .unwrap(); + assert_eq!(conn.type_name(), "kafka"); } #[tokio::test] @@ -245,26 +209,20 @@ mod tests { let factory = WriteBufferConfigFactory::new(); let server_id = ServerId::try_from(1).unwrap(); - let mut rules = DatabaseRules::new(DatabaseName::try_from(random_kafka_topic()).unwrap()); - rules.write_buffer_connection = Some(WriteBufferConnection { + let db_name = DatabaseName::try_from(random_kafka_topic()).unwrap(); + let cfg = WriteBufferConnection { direction: WriteBufferDirection::Read, type_: "kafka".to_string(), connection: conn, creation_config: Some(WriteBufferCreationConfig::default()), ..Default::default() - }); + }; - if let WriteBufferConfig::Reading(conn) = factory - .new_config(server_id, &rules) + let conn = factory + .new_config_read(server_id, db_name.as_str(), &cfg) .await - .unwrap() - .unwrap() - { - let conn = conn.lock().await; - assert_eq!(conn.type_name(), "kafka"); - } else { - panic!("not a reading connection"); - } + .unwrap(); + assert_eq!(conn.type_name(), "kafka"); } #[tokio::test] @@ -276,35 +234,31 @@ mod tests { let mock_name = "some_mock"; factory.register_mock(mock_name.to_string(), state); - let server_id = ServerId::try_from(1).unwrap(); - - let mut rules = DatabaseRules::new(DatabaseName::new("foo").unwrap()); - rules.write_buffer_connection = Some(WriteBufferConnection { + let db_name = DatabaseName::try_from(random_kafka_topic()).unwrap(); + let cfg = WriteBufferConnection { direction: WriteBufferDirection::Write, type_: "mock".to_string(), connection: mock_name.to_string(), ..Default::default() - }); + }; - if let WriteBufferConfig::Writing(conn) = factory - .new_config(server_id, &rules) + let conn = factory + .new_config_write(db_name.as_str(), &cfg) .await - .unwrap() - .unwrap() - { - assert_eq!(conn.type_name(), "mock"); - } else { - panic!("not a writing connection"); - } + .unwrap(); + assert_eq!(conn.type_name(), "mock"); // will error when state is unknown - rules.write_buffer_connection = Some(WriteBufferConnection { + let cfg = WriteBufferConnection { direction: WriteBufferDirection::Write, type_: "mock".to_string(), connection: "bar".to_string(), ..Default::default() - }); - let err = factory.new_config(server_id, &rules).await.unwrap_err(); + }; + let err = factory + .new_config_write(db_name.as_str(), &cfg) + .await + .unwrap_err(); assert!(err.to_string().starts_with("Unknown mock ID:")); } @@ -318,35 +272,31 @@ mod tests { factory.register_mock(mock_name.to_string(), state); let server_id = ServerId::try_from(1).unwrap(); - - let mut rules = DatabaseRules::new(DatabaseName::new("foo").unwrap()); - rules.write_buffer_connection = Some(WriteBufferConnection { + let db_name = DatabaseName::try_from(random_kafka_topic()).unwrap(); + let cfg = WriteBufferConnection { direction: WriteBufferDirection::Read, type_: "mock".to_string(), connection: mock_name.to_string(), ..Default::default() - }); + }; - if let WriteBufferConfig::Reading(conn) = factory - .new_config(server_id, &rules) + let conn = factory + .new_config_read(server_id, db_name.as_str(), &cfg) .await - .unwrap() - .unwrap() - { - let conn = conn.lock().await; - assert_eq!(conn.type_name(), "mock"); - } else { - panic!("not a reading connection"); - } + .unwrap(); + assert_eq!(conn.type_name(), "mock"); // will error when state is unknown - rules.write_buffer_connection = Some(WriteBufferConnection { + let cfg = WriteBufferConnection { direction: WriteBufferDirection::Read, type_: "mock".to_string(), connection: "bar".to_string(), ..Default::default() - }); - let err = factory.new_config(server_id, &rules).await.unwrap_err(); + }; + let err = factory + .new_config_read(server_id, db_name.as_str(), &cfg) + .await + .unwrap_err(); assert!(err.to_string().starts_with("Unknown mock ID:")); } @@ -357,35 +307,31 @@ mod tests { let mock_name = "some_mock"; factory.register_always_fail_mock(mock_name.to_string()); - let server_id = ServerId::try_from(1).unwrap(); - - let mut rules = DatabaseRules::new(DatabaseName::new("foo").unwrap()); - rules.write_buffer_connection = Some(WriteBufferConnection { + let db_name = DatabaseName::try_from(random_kafka_topic()).unwrap(); + let cfg = WriteBufferConnection { direction: WriteBufferDirection::Write, type_: "mock".to_string(), connection: mock_name.to_string(), ..Default::default() - }); + }; - if let WriteBufferConfig::Writing(conn) = factory - .new_config(server_id, &rules) + let conn = factory + .new_config_write(db_name.as_str(), &cfg) .await - .unwrap() - .unwrap() - { - assert_eq!(conn.type_name(), "mock_failing"); - } else { - panic!("not a writing connection"); - } + .unwrap(); + assert_eq!(conn.type_name(), "mock_failing"); // will error when state is unknown - rules.write_buffer_connection = Some(WriteBufferConnection { + let cfg = WriteBufferConnection { direction: WriteBufferDirection::Write, type_: "mock".to_string(), connection: "bar".to_string(), ..Default::default() - }); - let err = factory.new_config(server_id, &rules).await.unwrap_err(); + }; + let err = factory + .new_config_write(db_name.as_str(), &cfg) + .await + .unwrap_err(); assert!(err.to_string().starts_with("Unknown mock ID:")); } @@ -398,34 +344,31 @@ mod tests { let server_id = ServerId::try_from(1).unwrap(); - let mut rules = DatabaseRules::new(DatabaseName::new("foo").unwrap()); - rules.write_buffer_connection = Some(WriteBufferConnection { + let db_name = DatabaseName::new("foo").unwrap(); + let cfg = WriteBufferConnection { direction: WriteBufferDirection::Read, type_: "mock".to_string(), connection: mock_name.to_string(), ..Default::default() - }); + }; - if let WriteBufferConfig::Reading(conn) = factory - .new_config(server_id, &rules) + let conn = factory + .new_config_read(server_id, db_name.as_str(), &cfg) .await - .unwrap() - .unwrap() - { - let conn = conn.lock().await; - assert_eq!(conn.type_name(), "mock_failing"); - } else { - panic!("not a reading connection"); - } + .unwrap(); + assert_eq!(conn.type_name(), "mock_failing"); // will error when state is unknown - rules.write_buffer_connection = Some(WriteBufferConnection { + let cfg = WriteBufferConnection { direction: WriteBufferDirection::Read, type_: "mock".to_string(), connection: "bar".to_string(), ..Default::default() - }); - let err = factory.new_config(server_id, &rules).await.unwrap_err(); + }; + let err = factory + .new_config_read(server_id, db_name.as_str(), &cfg) + .await + .unwrap_err(); assert!(err.to_string().starts_with("Unknown mock ID:")); } diff --git a/write_buffer/src/mock.rs b/write_buffer/src/mock.rs index de227b84e4..a54130898f 100644 --- a/write_buffer/src/mock.rs +++ b/write_buffer/src/mock.rs @@ -27,6 +27,9 @@ struct EntryResVec { entries: Vec>, /// A list of Waker waiting for a new entry to be pushed + /// + /// Note: this is a list because it is possible to create + /// two streams consuming from the same shared state wait_list: Vec, } @@ -49,8 +52,8 @@ impl EntryResVec { /// Register a waker to be notified when a new entry is pushed pub fn register_waker(&mut self, waker: &Waker) { - for waker in &self.wait_list { - if waker.will_wake(waker) { + for wait_waker in &self.wait_list { + if wait_waker.will_wake(waker) { return; } }