feat: pull WriteBuffer consumer out of Db and onto Database (#2243) (#2525)

* feat: pull WriteBuffer consumer out of Db and onto Database (#2243)

* chore: restore WritingOnlyAllowedThroughWriteBuffer error

* refactor: remove WriteBufferConfig

* chore: fix docs

* chore: move WriteBufferConsumer tests out of db.rs

* chore: document WriteBufferFactory member functions

* chore: fmt

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Raphael Taylor-Davies 2021-09-14 17:04:58 +01:00 committed by GitHub
parent 9514a566f7
commit c33e5c22e6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 1209 additions and 1129 deletions

View File

@ -1,3 +1,4 @@
use crate::write_buffer::WriteBufferConsumer;
use crate::{
db::{
load::{create_preserved_catalog, load_or_create_preserved_catalog},
@ -6,6 +7,7 @@ use crate::{
rules::ProvidedDatabaseRules,
ApplicationState, Db,
};
use data_types::database_rules::WriteBufferDirection;
use data_types::{database_state::DatabaseStateCode, server_id::ServerId, DatabaseName};
use futures::{
future::{BoxFuture, Shared},
@ -14,11 +16,11 @@ use futures::{
use internal_types::freezable::Freezable;
use iox_object_store::IoxObjectStore;
use observability_deps::tracing::{error, info, warn};
use parking_lot::RwLock;
use parking_lot::{MappedRwLockReadGuard, RwLock, RwLockReadGuard};
use parquet_file::catalog::api::PreservedCatalog;
use persistence_windows::checkpoint::ReplayPlan;
use snafu::{ensure, OptionExt, ResultExt, Snafu};
use std::{future::Future, ops::DerefMut, sync::Arc, time::Duration};
use std::{future::Future, sync::Arc, time::Duration};
use tokio::{sync::Notify, task::JoinError};
use tokio_util::sync::CancellationToken;
@ -101,7 +103,7 @@ pub struct Database {
}
#[derive(Debug, Clone)]
/// Informatation about where a database is located on object store,
/// Information about where a database is located on object store,
/// and how to perform startup activities.
pub struct DatabaseConfig {
pub name: DatabaseName<'static>,
@ -314,8 +316,9 @@ impl Database {
// via WriteGuard
let mut state = state.unfreeze(handle);
if let DatabaseState::Initialized(DatabaseStateInitialized { db, provided_rules }) =
state.deref_mut()
if let DatabaseState::Initialized(DatabaseStateInitialized {
db, provided_rules, ..
}) = &mut *state
{
db.update_rules(Arc::clone(new_provided_rules.rules()));
*provided_rules = new_provided_rules;
@ -333,13 +336,14 @@ impl Database {
self.shared.state.read().iox_object_store()
}
pub fn initialized(&self) -> Option<MappedRwLockReadGuard<'_, DatabaseStateInitialized>> {
RwLockReadGuard::try_map(self.shared.state.read(), |state| state.get_initialized()).ok()
}
/// Gets access to an initialized `Db`
pub fn initialized_db(&self) -> Option<Arc<Db>> {
self.shared
.state
.read()
.get_initialized()
.map(|state| Arc::clone(&state.db))
let initialized = self.initialized()?;
Some(Arc::clone(initialized.db()))
}
/// Returns Ok(()) when the Database is initialized, or the error
@ -444,7 +448,7 @@ impl Database {
let db_name = &shared.config.name;
current_state.replay_plan = Arc::new(None);
let current_state = current_state
.advance()
.advance(shared.as_ref())
.await
.map_err(Box::new)
.context(SkipReplay { db_name })?;
@ -523,6 +527,20 @@ async fn background_worker(shared: Arc<DatabaseShared>) {
db.background_worker(shared.shutdown.clone()).await
}
let write_buffer_consumer = shared
.state
.read()
.get_initialized()
.and_then(|initialized| initialized.write_buffer_consumer.clone());
if let Some(consumer) = write_buffer_consumer {
info!(db_name=%shared.config.name, "shutting down write buffer consumer");
consumer.shutdown();
if let Err(e) = consumer.join().await {
error!(db_name=%shared.config.name, %e, "error shutting down write buffer consumer")
}
}
info!(db_name=%shared.config.name, "draining tasks");
// Loop in case tasks are spawned during shutdown
@ -632,7 +650,7 @@ async fn initialize_database(shared: &DatabaseShared) {
Ok(state) => DatabaseState::CatalogLoaded(state),
Err(e) => DatabaseState::CatalogLoadError(state, Arc::new(e)),
},
DatabaseState::CatalogLoaded(state) => match state.advance().await {
DatabaseState::CatalogLoaded(state) => match state.advance(shared).await {
Ok(state) => DatabaseState::Initialized(state),
Err(e) => DatabaseState::ReplayError(state, Arc::new(e)),
},
@ -888,24 +906,27 @@ impl DatabaseStateRulesLoaded {
.await
.context(CatalogLoad)?;
let write_buffer = shared
.application
.write_buffer_factory()
.new_config(
shared.config.server_id,
self.provided_rules.rules().as_ref(),
)
.await
.context(CreateWriteBuffer)?;
let rules = self.provided_rules.rules();
let write_buffer_factory = shared.application.write_buffer_factory();
let producer = match rules.write_buffer_connection.as_ref() {
Some(connection) if matches!(connection.direction, WriteBufferDirection::Write) => {
let producer = write_buffer_factory
.new_config_write(shared.config.name.as_str(), connection)
.await
.context(CreateWriteBuffer)?;
Some(producer)
}
_ => None,
};
let database_to_commit = DatabaseToCommit {
server_id: shared.config.server_id,
iox_object_store: Arc::clone(&self.iox_object_store),
exec: Arc::clone(shared.application.executor()),
rules: Arc::clone(self.provided_rules.rules()),
rules: Arc::clone(rules),
preserved_catalog,
catalog,
write_buffer,
write_buffer_producer: producer,
metrics_registry_v2: Arc::clone(shared.application.metric_registry_v2()),
};
@ -931,30 +952,66 @@ struct DatabaseStateCatalogLoaded {
impl DatabaseStateCatalogLoaded {
/// Perform replay
async fn advance(&self) -> Result<DatabaseStateInitialized, InitError> {
async fn advance(
&self,
shared: &DatabaseShared,
) -> Result<DatabaseStateInitialized, InitError> {
let db = Arc::clone(&self.db);
db.perform_replay(self.replay_plan.as_ref().as_ref())
.await
.context(Replay)?;
// TODO: Pull write buffer and lifecycle out of Db
db.unsuppress_persistence().await;
db.allow_write_buffer_read();
let rules = self.provided_rules.rules();
let write_buffer_factory = shared.application.write_buffer_factory();
let write_buffer_consumer = match rules.write_buffer_connection.as_ref() {
Some(connection) if matches!(connection.direction, WriteBufferDirection::Read) => {
let mut consumer = write_buffer_factory
.new_config_read(
shared.config.server_id,
shared.config.name.as_str(),
connection,
)
.await
.context(CreateWriteBuffer)?;
db.perform_replay(self.replay_plan.as_ref().as_ref(), consumer.as_mut())
.await
.context(Replay)?;
Some(Arc::new(WriteBufferConsumer::new(
consumer,
Arc::clone(&db),
shared.application.metric_registry().as_ref(),
)))
}
_ => None,
};
Ok(DatabaseStateInitialized {
db,
write_buffer_consumer,
provided_rules: Arc::clone(&self.provided_rules),
})
}
}
#[derive(Debug, Clone)]
struct DatabaseStateInitialized {
pub struct DatabaseStateInitialized {
db: Arc<Db>,
write_buffer_consumer: Option<Arc<WriteBufferConsumer>>,
provided_rules: Arc<ProvidedDatabaseRules>,
}
impl DatabaseStateInitialized {
pub fn db(&self) -> &Arc<Db> {
&self.db
}
pub fn write_buffer_consumer(&self) -> Option<&Arc<WriteBufferConsumer>> {
self.write_buffer_consumer.as_ref()
}
}
#[cfg(test)]
mod tests {
use chrono::Utc;

View File

@ -1,23 +1,23 @@
//! This module contains the main IOx Database object which has the
//! instances of the mutable buffer, read buffer, and object store
use async_trait::async_trait;
use chrono::{TimeZone, Utc};
use futures::{stream::BoxStream, StreamExt};
use parking_lot::{Mutex, RwLock};
use rand_distr::{Distribution, Poisson};
use snafu::{ensure, OptionExt, ResultExt, Snafu};
use std::{
any::Any,
collections::HashMap,
num::NonZeroUsize,
sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
atomic::{AtomicUsize, Ordering},
Arc,
},
time::{Duration, Instant},
};
use async_trait::async_trait;
use chrono::{TimeZone, Utc};
use parking_lot::{Mutex, RwLock};
use rand_distr::{Distribution, Poisson};
use snafu::{ensure, OptionExt, ResultExt, Snafu};
use ::lifecycle::{LifecycleChunk, LockableChunk, LockablePartition};
use data_types::{
chunk_metadata::ChunkSummary,
@ -44,10 +44,7 @@ use query::{
QueryDatabase,
};
use trace::ctx::SpanContext;
use write_buffer::{
config::WriteBufferConfig,
core::{FetchHighWatermark, WriteBufferError},
};
use write_buffer::core::{WriteBufferReading, WriteBufferWriting};
pub(crate) use crate::db::chunk::DbChunk;
use crate::{
@ -98,11 +95,6 @@ pub enum Error {
#[snafu(display("Cannot write to this database: no mutable buffer configured"))]
DatabaseNotWriteable {},
#[snafu(display(
"Cannot write to this database, configured to only read from the write buffer"
))]
WritingOnlyAllowedThroughWriteBuffer {},
#[snafu(display("Hard buffer size limit reached"))]
HardLimitReached {},
@ -178,101 +170,6 @@ pub enum Error {
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Metrics for data ingest via write buffer.
#[derive(Debug)]
struct WriteBufferIngestMetrics {
/// Metrics domain
domain: Arc<metrics::Domain>,
}
impl WriteBufferIngestMetrics {
fn new(domain: Arc<metrics::Domain>) -> Self {
Self { domain }
}
fn new_sequencer_metrics(&self, sequencer_id: u32) -> SequencerMetrics {
let attributes = vec![KeyValue::new("sequencer_id", sequencer_id.to_string())];
let red = self
.domain
.register_red_metric_with_attributes(Some("ingest"), attributes.clone());
let bytes_read = self.domain.register_counter_metric_with_attributes(
"read",
Some("bytes"),
"Bytes read from sequencer",
attributes.clone(),
);
let last_sequence_number = self.domain.register_gauge_metric_with_attributes(
"last_sequence_number",
None,
"Last consumed sequence number (e.g. Kafka offset)",
&attributes,
);
let sequence_number_lag = self.domain.register_gauge_metric_with_attributes(
"sequence_number_lag",
None,
"The difference between the the last sequence number available (e.g. Kafka offset) and (= minus) last consumed sequence number",
&attributes,
);
let last_min_ts = self.domain.register_gauge_metric_with_attributes(
"last_min_ts",
None,
"Minimum timestamp of last write as unix timestamp in nanoseconds",
&attributes,
);
let last_max_ts = self.domain.register_gauge_metric_with_attributes(
"last_max_ts",
None,
"Maximum timestamp of last write as unix timestamp in nanoseconds",
&attributes,
);
let last_ingest_ts = self.domain.register_gauge_metric_with_attributes(
"last_ingest_ts",
None,
"Last seen ingest timestamp as unix timestamp in nanoseconds",
&attributes,
);
SequencerMetrics {
red,
bytes_read,
last_sequence_number,
sequence_number_lag,
last_min_ts,
last_max_ts,
last_ingest_ts,
}
}
}
/// Metrics for a single sequencer.
#[derive(Debug)]
struct SequencerMetrics {
/// Metrics for tracking ingest.
red: metrics::RedMetric,
/// Bytes read from sequencer.
///
/// This metrics is independent of the success / error state of the entries.
bytes_read: metrics::Counter,
/// Last consumed sequence number (e.g. Kafka offset).
last_sequence_number: metrics::Gauge,
// The difference between the the last sequence number available (e.g. Kafka offset) and (= minus) last consumed
// sequence number.
sequence_number_lag: metrics::Gauge,
/// Minimum timestamp of last write as unix timestamp in nanoseconds.
last_min_ts: metrics::Gauge,
/// Maximum timestamp of last write as unix timestamp in nanoseconds.
last_max_ts: metrics::Gauge,
/// Last seen ingest timestamp as unix timestamp in nanoseconds.
last_ingest_ts: metrics::Gauge,
}
/// `Db` is an instance-local, queryable, possibly persisted, and possibly mutable data store
///
/// It is responsible for:
@ -378,11 +275,9 @@ pub struct Db {
/// Metric attributes
metric_attributes: Vec<KeyValue>,
/// Ingest metrics
ingest_metrics: WriteBufferIngestMetrics,
/// Optionally connect to a write buffer for either buffering writes or reading buffered writes
write_buffer: Option<WriteBufferConfig>,
/// Optional write buffer producer
/// TODO: Move onto Database
write_buffer_producer: Option<Arc<dyn WriteBufferWriting>>,
/// Lock that prevents the cleanup job from deleting files that are written but not yet added to the preserved
/// catalog.
@ -400,11 +295,6 @@ pub struct Db {
/// - to keep the lifecycle state (e.g. the number of running compactions) around
lifecycle_policy: tokio::sync::Mutex<Option<::lifecycle::LifecyclePolicy<WeakDb>>>,
/// Flag to stop background worker from reading from the write buffer.
///
/// TODO: Move write buffer read loop out of Db.
no_write_buffer_read: AtomicBool,
/// TESTING ONLY: Mocked `Instant::now()` for the background worker
background_worker_now_override: Mutex<Option<Instant>>,
}
@ -418,7 +308,10 @@ pub(crate) struct DatabaseToCommit {
pub(crate) preserved_catalog: PreservedCatalog,
pub(crate) catalog: Catalog,
pub(crate) rules: Arc<DatabaseRules>,
pub(crate) write_buffer: Option<WriteBufferConfig>,
/// TODO: Move onto Database
pub(crate) write_buffer_producer: Option<Arc<dyn WriteBufferWriting>>,
pub(crate) metrics_registry_v2: Arc<metric::Registry>,
}
@ -432,10 +325,6 @@ impl Db {
let metrics_registry = Arc::clone(&database_to_commit.catalog.metrics_registry);
let metric_attributes = database_to_commit.catalog.metric_attributes.clone();
let ingest_domain = metrics_registry
.register_domain_with_attributes("write_buffer", metric_attributes.clone());
let ingest_metrics = WriteBufferIngestMetrics::new(Arc::new(ingest_domain));
let catalog = Arc::new(database_to_commit.catalog);
let catalog_access = QueryCatalogAccess::new(
@ -460,11 +349,9 @@ impl Db {
worker_iterations_lifecycle: AtomicUsize::new(0),
worker_iterations_cleanup: AtomicUsize::new(0),
metric_attributes,
ingest_metrics,
write_buffer: database_to_commit.write_buffer,
write_buffer_producer: database_to_commit.write_buffer_producer,
cleanup_lock: Default::default(),
lifecycle_policy: tokio::sync::Mutex::new(None),
no_write_buffer_read: AtomicBool::new(true),
background_worker_now_override: Default::default(),
};
let this = Arc::new(this);
@ -483,13 +370,6 @@ impl Db {
policy.unsuppress_persistence();
}
/// Allow continuous reads from the write buffer (if configured).
///
/// TODO: Move write buffer read loop out of Db.
pub fn allow_write_buffer_read(&self) {
self.no_write_buffer_read.store(false, Ordering::SeqCst);
}
/// Return a handle to the executor used to run queries
pub fn executor(&self) -> Arc<Executor> {
Arc::clone(&self.exec)
@ -565,7 +445,7 @@ impl Db {
}
}
fn partition(
pub fn partition(
&self,
table_name: &str,
partition_key: &str,
@ -574,7 +454,7 @@ impl Db {
Ok(Arc::clone(&partition))
}
fn chunk(
pub fn chunk(
&self,
table_name: &str,
partition_key: &str,
@ -863,12 +743,18 @@ impl Db {
///
/// When `replay_plan` is `None` then no real replay will be performed. Instead the write buffer streams will be set
/// to the current high watermark and normal playback will continue from there.
pub async fn perform_replay(&self, replay_plan: Option<&ReplayPlan>) -> Result<()> {
pub async fn perform_replay(
&self,
replay_plan: Option<&ReplayPlan>,
consumer: &mut dyn WriteBufferReading,
) -> Result<()> {
use crate::db::replay::{perform_replay, seek_to_end};
if let Some(replay_plan) = replay_plan {
perform_replay(self, replay_plan).await.context(ReplayError)
perform_replay(self, replay_plan, consumer)
.await
.context(ReplayError)
} else {
seek_to_end(self).await.context(ReplayError)
seek_to_end(self, consumer).await.context(ReplayError)
}
}
@ -942,210 +828,15 @@ impl Db {
}
};
// streaming from the write buffer loop
let write_buffer = async {
if let Some(WriteBufferConfig::Reading(write_buffer)) = &self.write_buffer {
// wait for permission
while self.no_write_buffer_read.load(Ordering::SeqCst) {
tokio::time::sleep(Duration::from_millis(10)).await;
}
let mut write_buffer = write_buffer
.try_lock()
.expect("no streams should exist at this point");
let mut futures = vec![];
for (sequencer_id, stream) in write_buffer.streams() {
let metrics = self.ingest_metrics.new_sequencer_metrics(sequencer_id);
let fut = self.stream_in_sequenced_entries(
sequencer_id,
stream.stream,
stream.fetch_high_watermark,
metrics,
);
futures.push(fut);
}
futures::future::join_all(futures).await;
} else {
futures::future::pending::<()>().await;
}
};
// None of the futures need to perform drain logic on shutdown.
// When the first one finishes, all of them are dropped
tokio::select! {
_ = lifecycle_loop => error!("lifecycle loop exited - database worker bailing out"),
_ = write_buffer => error!("write buffer loop exited - database worker bailing out"),
_ = object_store_cleanup_loop => error!("object store cleanup exited - database worker bailing out"),
_ = shutdown.cancelled() => info!("database worker shutting down"),
_ = lifecycle_loop => error!("lifecycle loop exited - db worker bailing out"),
_ = object_store_cleanup_loop => error!("object store cleanup exited - db worker bailing out"),
_ = shutdown.cancelled() => info!("db worker shutting down"),
}
info!("finished background worker");
}
/// This is used to take entries from a `Stream` and put them in the mutable buffer, such as
/// streaming entries from a write buffer.
async fn stream_in_sequenced_entries<'a>(
&'a self,
sequencer_id: u32,
mut stream: BoxStream<'a, Result<SequencedEntry, WriteBufferError>>,
f_mark: FetchHighWatermark<'a>,
mut metrics: SequencerMetrics,
) {
let db_name = self.rules.read().db_name().to_string();
let mut watermark_last_updated: Option<Instant> = None;
let mut watermark = 0;
while let Some(sequenced_entry_result) = stream.next().await {
let red_observation = metrics.red.observation();
// get entry from sequencer
let sequenced_entry = match sequenced_entry_result {
Ok(sequenced_entry) => sequenced_entry,
Err(e) => {
debug!(
%e,
%db_name,
sequencer_id,
"Error converting write buffer data to SequencedEntry",
);
red_observation.client_error();
continue;
}
};
let sequenced_entry = Arc::new(sequenced_entry);
// store entry
let mut logged_hard_limit = false;
loop {
match self.store_sequenced_entry(
Arc::clone(&sequenced_entry),
filter_table_batch_keep_all,
) {
Ok(_) => {
red_observation.ok();
break;
}
Err(Error::HardLimitReached {}) => {
// wait a bit and retry
if !logged_hard_limit {
info!(
%db_name,
sequencer_id,
"Hard limit reached while reading from write buffer, waiting for compaction to catch up",
);
logged_hard_limit = true;
}
tokio::time::sleep(Duration::from_millis(100)).await;
continue;
}
Err(e) => {
debug!(
%e,
%db_name,
sequencer_id,
"Error storing SequencedEntry from write buffer in database"
);
red_observation.error();
// no retry
break;
}
}
}
// maybe update sequencer watermark
// We are not updating this watermark every round because asking the sequencer for that watermark can be
// quite expensive.
let now = self.background_worker_now();
if watermark_last_updated
.map(|ts| (now - ts) > Duration::from_secs(10))
.unwrap_or(true)
{
match f_mark().await {
Ok(w) => {
watermark = w;
}
Err(e) => {
debug!(
%e,
%db_name,
sequencer_id,
"Error while reading sequencer watermark",
)
}
}
watermark_last_updated = Some(now);
}
// update:
// - bytes read
// - last sequence number
// - lag
// - min ts
// - max ts
// - ingest ts
let sequence = sequenced_entry
.sequence()
.expect("entry from write buffer must be sequenced");
let producer_wallclock_timestamp = sequenced_entry
.producer_wallclock_timestamp()
.expect("entry from write buffer must have a producer wallclock time");
let entry = sequenced_entry.entry();
metrics.bytes_read.add(entry.data().len() as u64);
metrics
.last_sequence_number
.set(sequence.number as usize, &[]);
metrics.sequence_number_lag.set(
watermark.saturating_sub(sequence.number).saturating_sub(1) as usize,
&[],
);
if let Some(min_ts) = entry
.partition_writes()
.map(|partition_writes| {
partition_writes
.iter()
.filter_map(|partition_write| {
partition_write
.table_batches()
.iter()
.filter_map(|table_batch| table_batch.min_max_time().ok())
.map(|(min, _max)| min)
.max()
})
.min()
})
.flatten()
{
metrics
.last_min_ts
.set(min_ts.timestamp_nanos() as usize, &[]);
}
if let Some(max_ts) = entry
.partition_writes()
.map(|partition_writes| {
partition_writes
.iter()
.filter_map(|partition_write| {
partition_write
.table_batches()
.iter()
.filter_map(|table_batch| table_batch.min_max_time().ok())
.map(|(_min, max)| max)
.max()
})
.max()
})
.flatten()
{
metrics
.last_max_ts
.set(max_ts.timestamp_nanos() as usize, &[]);
}
metrics
.last_ingest_ts
.set(producer_wallclock_timestamp.timestamp_nanos() as usize, &[]);
}
info!("finished db background worker");
}
/// `Instant::now()` that is used by the background worker. Can be mocked for testing.
@ -1176,10 +867,10 @@ impl Db {
let rules = self.rules.read();
rules.lifecycle_rules.immutable
};
debug!(%immutable, has_write_buffer=self.write_buffer.is_some(), "storing entry");
debug!(%immutable, has_write_buffer_producer=self.write_buffer_producer.is_some(), "storing entry");
match (self.write_buffer.as_ref(), immutable) {
(Some(WriteBufferConfig::Writing(write_buffer)), true) => {
match (self.write_buffer_producer.as_ref(), immutable) {
(Some(write_buffer), true) => {
// If only the write buffer is configured, this is passing the data through to
// the write buffer, and it's not an error. We ignore the returned metadata; it
// will get picked up when data is read from the write buffer.
@ -1191,7 +882,7 @@ impl Db {
.context(WriteBufferWritingError)?;
Ok(())
}
(Some(WriteBufferConfig::Writing(write_buffer)), false) => {
(Some(write_buffer), false) => {
// If using both write buffer and mutable buffer, we want to wait for the write
// buffer to return success before adding the entry to the mutable buffer.
@ -1214,10 +905,6 @@ impl Db {
// `SequencedEntry`.
DatabaseNotWriteable {}.fail()
}
(Some(WriteBufferConfig::Reading(_)), false) => {
// If configured to read entries from the write buffer, we shouldn't be here
WritingOnlyAllowedThroughWriteBuffer {}.fail()
}
(None, false) => {
// If no write buffer is configured, nothing is
// sequencing entries so skip doing so here
@ -1436,7 +1123,7 @@ impl Db {
}
}
fn filter_table_batch_keep_all(
pub fn filter_table_batch_keep_all(
_sequence: Option<&Sequence>,
_partition_key: &str,
_batch: &TableBatch<'_>,
@ -1587,7 +1274,6 @@ pub mod test_helpers {
#[cfg(test)]
mod tests {
use std::{
collections::BTreeMap,
convert::TryFrom,
iter::Iterator,
num::{NonZeroU32, NonZeroU64, NonZeroUsize},
@ -1602,7 +1288,7 @@ mod tests {
use futures::{stream, StreamExt, TryStreamExt};
use tokio_util::sync::CancellationToken;
use ::test_helpers::{assert_contains, tracing::TracingCapture};
use ::test_helpers::assert_contains;
use arrow_util::{assert_batches_eq, assert_batches_sorted_eq};
use data_types::{
chunk_metadata::{ChunkAddr, ChunkStorage},
@ -1610,7 +1296,7 @@ mod tests {
partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics, TableSummary},
write_summary::TimestampSummary,
};
use entry::{test_helpers::lp_to_entry, Sequence};
use entry::test_helpers::lp_to_entry;
use internal_types::{schema::Schema, selection::Selection};
use iox_object_store::ParquetFilePath;
use metric::{Attributes, CumulativeGauge, Metric, Observation};
@ -1621,10 +1307,9 @@ mod tests {
test_utils::{load_parquet_from_store_for_path, read_data_from_parquet_data},
};
use persistence_windows::min_max_sequence::MinMaxSequence;
use query::{frontend::sql::SqlQueryPlanner, QueryChunk, QueryDatabase};
use query::{QueryChunk, QueryDatabase};
use write_buffer::mock::{
MockBufferForReading, MockBufferForWriting, MockBufferForWritingThatAlwaysErrors,
MockBufferSharedState,
MockBufferForWriting, MockBufferForWritingThatAlwaysErrors, MockBufferSharedState,
};
use crate::{
@ -1674,7 +1359,7 @@ mod tests {
let write_buffer =
Arc::new(MockBufferForWriting::new(write_buffer_state.clone(), None).unwrap());
let test_db = TestDb::builder()
.write_buffer(WriteBufferConfig::Writing(Arc::clone(&write_buffer) as _))
.write_buffer_producer(write_buffer)
.lifecycle_rules(LifecycleRules {
immutable: true,
..Default::default()
@ -1698,7 +1383,7 @@ mod tests {
let write_buffer =
Arc::new(MockBufferForWriting::new(write_buffer_state.clone(), None).unwrap());
let db = TestDb::builder()
.write_buffer(WriteBufferConfig::Writing(Arc::clone(&write_buffer) as _))
.write_buffer_producer(write_buffer)
.build()
.await
.db;
@ -1725,7 +1410,7 @@ mod tests {
let write_buffer = Arc::new(MockBufferForWritingThatAlwaysErrors {});
let db = TestDb::builder()
.write_buffer(WriteBufferConfig::Writing(Arc::clone(&write_buffer) as _))
.write_buffer_producer(write_buffer)
.build()
.await
.db;
@ -1741,285 +1426,6 @@ mod tests {
);
}
#[tokio::test]
async fn read_from_write_buffer_write_to_mutable_buffer() {
let write_buffer_state =
MockBufferSharedState::empty_with_n_sequencers(NonZeroU32::try_from(1).unwrap());
let ingest_ts1 = Utc.timestamp_millis(42);
let ingest_ts2 = Utc.timestamp_millis(1337);
write_buffer_state.push_entry(SequencedEntry::new_from_sequence(
Sequence::new(0, 0),
ingest_ts1,
lp_to_entry("mem foo=1 10"),
));
write_buffer_state.push_entry(SequencedEntry::new_from_sequence(
Sequence::new(0, 7),
ingest_ts2,
lp_to_entry("cpu bar=2 20\ncpu bar=3 30"),
));
let write_buffer = MockBufferForReading::new(write_buffer_state, None).unwrap();
let test_db = TestDb::builder()
.write_buffer(WriteBufferConfig::Reading(Arc::new(
tokio::sync::Mutex::new(Box::new(write_buffer) as _),
)))
.build()
.await;
let db = test_db.db;
// do: start background task loop
let shutdown: CancellationToken = Default::default();
let shutdown_captured = shutdown.clone();
let db_captured = Arc::clone(&db);
let join_handle =
tokio::spawn(async move { db_captured.background_worker(shutdown_captured).await });
db.allow_write_buffer_read();
let query = "select * from cpu";
// check: after a while the table should exist and a query plan should succeed
let t_0 = Instant::now();
loop {
let planner = SqlQueryPlanner::default();
let ctx = db.new_query_context(None);
let physical_plan = planner.query(query, &ctx);
if physical_plan.is_ok() {
break;
}
assert!(t_0.elapsed() < Duration::from_secs(10));
tokio::time::sleep(Duration::from_millis(100)).await;
}
// check: metrics
// We need to do that BEFORE shutting down the background loop because gauges would be dropped and resetted otherwise
let metrics = test_db.metric_registry;
metrics
.has_metric_family("write_buffer_ingest_requests_total")
.with_attributes(&[
("db_name", "placeholder"),
("svr_id", "1"),
("sequencer_id", "0"),
("status", "ok"),
])
.counter()
.eq(2.0)
.unwrap();
metrics
.has_metric_family("write_buffer_read_bytes_total")
.with_attributes(&[
("db_name", "placeholder"),
("svr_id", "1"),
("sequencer_id", "0"),
])
.counter()
.eq(528.0)
.unwrap();
metrics
.has_metric_family("write_buffer_last_sequence_number")
.with_attributes(&[
("db_name", "placeholder"),
("svr_id", "1"),
("sequencer_id", "0"),
])
.gauge()
.eq(7.0)
.unwrap();
metrics
.has_metric_family("write_buffer_sequence_number_lag")
.with_attributes(&[
("db_name", "placeholder"),
("svr_id", "1"),
("sequencer_id", "0"),
])
.gauge()
.eq(0.0)
.unwrap();
metrics
.has_metric_family("write_buffer_last_min_ts")
.with_attributes(&[
("db_name", "placeholder"),
("svr_id", "1"),
("sequencer_id", "0"),
])
.gauge()
.eq(20.0)
.unwrap();
metrics
.has_metric_family("write_buffer_last_max_ts")
.with_attributes(&[
("db_name", "placeholder"),
("svr_id", "1"),
("sequencer_id", "0"),
])
.gauge()
.eq(30.0)
.unwrap();
metrics
.has_metric_family("write_buffer_last_ingest_ts")
.with_attributes(&[
("db_name", "placeholder"),
("svr_id", "1"),
("sequencer_id", "0"),
])
.gauge()
.eq(ingest_ts2.timestamp_nanos() as f64)
.unwrap();
// do: stop background task loop
shutdown.cancel();
join_handle.await.unwrap();
// check: the expected results should be there
let batches = run_query(db, "select * from cpu order by time").await;
let expected = vec![
"+-----+--------------------------------+",
"| bar | time |",
"+-----+--------------------------------+",
"| 2 | 1970-01-01T00:00:00.000000020Z |",
"| 3 | 1970-01-01T00:00:00.000000030Z |",
"+-----+--------------------------------+",
];
assert_batches_eq!(expected, &batches);
}
#[tokio::test]
async fn write_buffer_reads_wait_for_compaction() {
let tracing_capture = TracingCapture::new();
// setup write buffer
// these numbers are handtuned to trigger hard buffer limits w/o making the test too big
let n_entries = 50u64;
let write_buffer_state =
MockBufferSharedState::empty_with_n_sequencers(NonZeroU32::try_from(1).unwrap());
for sequence_number in 0..n_entries {
let lp = format!(
"table_1,tag_partition_by=a foo=\"hello\",bar=1 {}",
sequence_number / 2
);
write_buffer_state.push_entry(SequencedEntry::new_from_sequence(
Sequence::new(0, sequence_number),
Utc::now(),
lp_to_entry(&lp),
));
}
write_buffer_state.push_entry(SequencedEntry::new_from_sequence(
Sequence::new(0, n_entries),
Utc::now(),
lp_to_entry("table_2,partition_by=a foo=1 0"),
));
let write_buffer = MockBufferForReading::new(write_buffer_state, None).unwrap();
// create DB
let partition_template = PartitionTemplate {
parts: vec![TemplatePart::Column("tag_partition_by".to_string())],
};
let test_db = TestDb::builder()
.write_buffer(WriteBufferConfig::Reading(Arc::new(
tokio::sync::Mutex::new(Box::new(write_buffer) as _),
)))
.lifecycle_rules(data_types::database_rules::LifecycleRules {
buffer_size_hard: Some(NonZeroUsize::new(10_000).unwrap()),
mub_row_threshold: NonZeroUsize::new(10).unwrap(),
..Default::default()
})
.partition_template(partition_template)
.build()
.await;
let db = test_db.db;
// start background task loop
let shutdown: CancellationToken = Default::default();
let shutdown_captured = shutdown.clone();
let db_captured = Arc::clone(&db);
let join_handle =
tokio::spawn(async move { db_captured.background_worker(shutdown_captured).await });
db.allow_write_buffer_read();
// after a while the table should exist
let t_0 = Instant::now();
loop {
if db.table_schema("table_2").is_some() {
break;
}
assert!(t_0.elapsed() < Duration::from_secs(10));
tokio::time::sleep(Duration::from_millis(100)).await;
}
// do: stop background task loop
shutdown.cancel();
join_handle.await.unwrap();
// no rows should be dropped
let batches = run_query(db, "select sum(bar) as n from table_1").await;
let expected = vec!["+----+", "| n |", "+----+", "| 25 |", "+----+"];
assert_batches_eq!(expected, &batches);
// check that hard buffer limit was actually hit (otherwise this test is pointless/outdated)
assert_contains!(tracing_capture.to_string(), "Hard limit reached while reading from write buffer, waiting for compaction to catch up");
}
#[tokio::test]
async fn error_converting_data_from_write_buffer_to_sequenced_entry_is_reported() {
let write_buffer_state =
MockBufferSharedState::empty_with_n_sequencers(NonZeroU32::try_from(1).unwrap());
write_buffer_state.push_error(
String::from("Something bad happened on the way to creating a SequencedEntry").into(),
0,
);
let write_buffer = MockBufferForReading::new(write_buffer_state, None).unwrap();
let test_db = TestDb::builder()
.write_buffer(WriteBufferConfig::Reading(Arc::new(
tokio::sync::Mutex::new(Box::new(write_buffer) as _),
)))
.build()
.await;
let db = Arc::new(test_db.db);
let metrics = test_db.metric_registry;
// do: start background task loop
let shutdown: CancellationToken = Default::default();
let shutdown_captured = shutdown.clone();
let db_captured = Arc::clone(&db);
let join_handle =
tokio::spawn(async move { db_captured.background_worker(shutdown_captured).await });
db.allow_write_buffer_read();
// check: after a while the error should be reported in the database's metrics
let t_0 = Instant::now();
loop {
let family = metrics.try_has_metric_family("write_buffer_ingest_requests_total");
if let Ok(metric) = family {
if metric
.with_attributes(&[
("db_name", "placeholder"),
("svr_id", "1"),
("sequencer_id", "0"),
("status", "client_error"),
])
.counter()
.eq(1.0)
.is_ok()
{
break;
}
}
assert!(t_0.elapsed() < Duration::from_secs(10));
tokio::time::sleep(Duration::from_millis(100)).await;
}
// do: stop background task loop
shutdown.cancel();
join_handle.await.unwrap();
}
#[tokio::test]
async fn cant_write_when_reading_from_write_buffer() {
// Validate that writes are rejected if this database is reading from the write buffer
@ -2890,7 +2296,7 @@ mod tests {
MockBufferSharedState::empty_with_n_sequencers(NonZeroU32::try_from(1).unwrap());
let write_buffer = Arc::new(MockBufferForWriting::new(write_buffer_state, None).unwrap());
let db = TestDb::builder()
.write_buffer(WriteBufferConfig::Writing(Arc::clone(&write_buffer) as _))
.write_buffer_producer(write_buffer)
.build()
.await
.db;
@ -2930,79 +2336,6 @@ mod tests {
assert_eq!(open_max.timestamp_nanos(), 20);
}
#[tokio::test]
async fn read_from_write_buffer_updates_persistence_windows() {
let entry = lp_to_entry("cpu bar=1 10");
let partition_key = "1970-01-01T00";
let write_buffer_state =
MockBufferSharedState::empty_with_n_sequencers(NonZeroU32::try_from(2).unwrap());
write_buffer_state.push_entry(SequencedEntry::new_from_sequence(
Sequence::new(0, 0),
Utc::now(),
entry.clone(),
));
write_buffer_state.push_entry(SequencedEntry::new_from_sequence(
Sequence::new(1, 0),
Utc::now(),
entry.clone(),
));
write_buffer_state.push_entry(SequencedEntry::new_from_sequence(
Sequence::new(1, 2),
Utc::now(),
entry.clone(),
));
write_buffer_state.push_entry(SequencedEntry::new_from_sequence(
Sequence::new(0, 1),
Utc::now(),
entry,
));
let write_buffer = MockBufferForReading::new(write_buffer_state, None).unwrap();
let db = TestDb::builder()
.write_buffer(WriteBufferConfig::Reading(Arc::new(
tokio::sync::Mutex::new(Box::new(write_buffer) as _),
)))
.build()
.await
.db;
// do: start background task loop
let shutdown: CancellationToken = Default::default();
let shutdown_captured = shutdown.clone();
let db_captured = Arc::clone(&db);
let join_handle =
tokio::spawn(async move { db_captured.background_worker(shutdown_captured).await });
db.allow_write_buffer_read();
// check: after a while the persistence windows should have the expected data
let t_0 = Instant::now();
let min_unpersisted = loop {
if let Ok(partition) = db.catalog.partition("cpu", partition_key) {
let partition = partition.write();
let windows = partition.persistence_windows().unwrap();
let min_unpersisted = windows.minimum_unpersisted_sequence();
if let Some(min_unpersisted) = min_unpersisted {
break min_unpersisted;
}
}
assert!(t_0.elapsed() < Duration::from_secs(10));
tokio::time::sleep(Duration::from_millis(100)).await;
};
// do: stop background task loop
shutdown.cancel();
join_handle.await.unwrap();
let mut expected_unpersisted = BTreeMap::new();
expected_unpersisted.insert(0, MinMaxSequence::new(0, 1));
expected_unpersisted.insert(1, MinMaxSequence::new(0, 2));
assert_eq!(min_unpersisted, expected_unpersisted);
}
#[tokio::test]
async fn test_chunk_timestamps() {
let start = Utc::now();

View File

@ -14,7 +14,7 @@ use persistence_windows::{
persistence_windows::PersistenceWindows,
};
use snafu::{ResultExt, Snafu};
use write_buffer::config::WriteBufferConfig;
use write_buffer::core::WriteBufferReading;
use crate::Db;
@ -81,64 +81,59 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
/// operation fails. In that case some of the sequencers in the write buffers might already be seeked and others not.
/// The caller must NOT use the write buffer in that case without ensuring that it is put into some proper state, e.g.
/// by retrying this function.
pub async fn seek_to_end(db: &Db) -> Result<()> {
if let Some(WriteBufferConfig::Reading(write_buffer)) = &db.write_buffer {
let mut write_buffer = write_buffer
.try_lock()
.expect("no streams should exist at this point");
pub async fn seek_to_end(db: &Db, write_buffer: &mut dyn WriteBufferReading) -> Result<()> {
let mut watermarks = vec![];
for (sequencer_id, stream) in write_buffer.streams() {
let watermark = (stream.fetch_high_watermark)()
.await
.context(SeekError { sequencer_id })?;
watermarks.push((sequencer_id, watermark));
}
let mut watermarks = vec![];
for (sequencer_id, stream) in write_buffer.streams() {
let watermark = (stream.fetch_high_watermark)()
.await
.context(SeekError { sequencer_id })?;
watermarks.push((sequencer_id, watermark));
}
for (sequencer_id, watermark) in &watermarks {
write_buffer
.seek(*sequencer_id, *watermark)
.await
.context(SeekError {
sequencer_id: *sequencer_id,
})?;
}
for (sequencer_id, watermark) in &watermarks {
write_buffer
.seek(*sequencer_id, *watermark)
.await
.context(SeekError {
sequencer_id: *sequencer_id,
})?;
}
// remember max seen sequence numbers
let late_arrival_window = db.rules().lifecycle_rules.late_arrive_window();
let sequencer_numbers: BTreeMap<_, _> = watermarks
.into_iter()
.filter(|(_sequencer_id, watermark)| *watermark > 0)
.map(|(sequencer_id, watermark)| {
(
sequencer_id,
OptionalMinMaxSequence::new(None, watermark - 1),
)
})
.collect();
// remember max seen sequence numbers
let late_arrival_window = db.rules().lifecycle_rules.late_arrive_window();
let sequencer_numbers: BTreeMap<_, _> = watermarks
.into_iter()
.filter(|(_sequencer_id, watermark)| *watermark > 0)
.map(|(sequencer_id, watermark)| {
(
sequencer_id,
OptionalMinMaxSequence::new(None, watermark - 1),
)
})
.collect();
for partition in db.catalog.partitions() {
let mut partition = partition.write();
for partition in db.catalog.partitions() {
let mut partition = partition.write();
let dummy_checkpoint = PartitionCheckpoint::new(
Arc::from(partition.table_name()),
Arc::from(partition.key()),
sequencer_numbers.clone(),
Utc::now(),
);
let dummy_checkpoint = PartitionCheckpoint::new(
Arc::from(partition.table_name()),
Arc::from(partition.key()),
sequencer_numbers.clone(),
Utc::now(),
);
match partition.persistence_windows_mut() {
Some(windows) => {
windows.mark_seen_and_persisted(&dummy_checkpoint);
}
None => {
let mut windows = PersistenceWindows::new(
partition.addr().clone(),
late_arrival_window,
db.background_worker_now(),
);
windows.mark_seen_and_persisted(&dummy_checkpoint);
partition.set_persistence_windows(windows);
}
match partition.persistence_windows_mut() {
Some(windows) => {
windows.mark_seen_and_persisted(&dummy_checkpoint);
}
None => {
let mut windows = PersistenceWindows::new(
partition.addr().clone(),
late_arrival_window,
db.background_worker_now(),
);
windows.mark_seen_and_persisted(&dummy_checkpoint);
partition.set_persistence_windows(windows);
}
}
}
@ -147,158 +142,156 @@ pub async fn seek_to_end(db: &Db) -> Result<()> {
}
/// Perform sequencer-driven replay for this DB.
pub async fn perform_replay(db: &Db, replay_plan: &ReplayPlan) -> Result<()> {
if let Some(WriteBufferConfig::Reading(write_buffer)) = &db.write_buffer {
let db_name = db.rules.read().db_name().to_string();
info!(%db_name, "starting replay");
pub async fn perform_replay(
db: &Db,
replay_plan: &ReplayPlan,
write_buffer: &mut dyn WriteBufferReading,
) -> Result<()> {
let db_name = db.rules.read().db_name().to_string();
info!(%db_name, "starting replay");
let mut write_buffer = write_buffer
.try_lock()
.expect("no streams should exist at this point");
// check if write buffer and replay plan agree on the set of sequencer ids
let sequencer_ids: BTreeSet<_> = write_buffer
.streams()
.into_iter()
.map(|(sequencer_id, _stream)| sequencer_id)
.collect();
for sequencer_id in replay_plan.sequencer_ids() {
if !sequencer_ids.contains(&sequencer_id) {
return Err(Error::UnknownSequencer {
sequencer_id,
sequencer_ids: sequencer_ids.iter().copied().collect(),
});
}
// check if write buffer and replay plan agree on the set of sequencer ids
let sequencer_ids: BTreeSet<_> = write_buffer
.streams()
.into_iter()
.map(|(sequencer_id, _stream)| sequencer_id)
.collect();
for sequencer_id in replay_plan.sequencer_ids() {
if !sequencer_ids.contains(&sequencer_id) {
return Err(Error::UnknownSequencer {
sequencer_id,
sequencer_ids: sequencer_ids.iter().copied().collect(),
});
}
}
// determine replay ranges based on the plan
let replay_ranges: BTreeMap<_, _> = sequencer_ids
.into_iter()
.filter_map(|sequencer_id| {
replay_plan
.replay_range(sequencer_id)
.map(|min_max| (sequencer_id, min_max))
})
.collect();
// determine replay ranges based on the plan
let replay_ranges: BTreeMap<_, _> = sequencer_ids
.into_iter()
.filter_map(|sequencer_id| {
replay_plan
.replay_range(sequencer_id)
.map(|min_max| (sequencer_id, min_max))
})
.collect();
// seek write buffer according to the plan
for (sequencer_id, min_max) in &replay_ranges {
if let Some(min) = min_max.min() {
info!(%db_name, sequencer_id, sequence_number=min, "seek sequencer in preperation for replay");
write_buffer
.seek(*sequencer_id, min)
.await
.context(SeekError {
sequencer_id: *sequencer_id,
})?;
} else {
let sequence_number = min_max.max() + 1;
info!(%db_name, sequencer_id, sequence_number, "seek sequencer that did not require replay");
write_buffer
.seek(*sequencer_id, sequence_number)
.await
.context(SeekError {
sequencer_id: *sequencer_id,
})?;
}
// seek write buffer according to the plan
for (sequencer_id, min_max) in &replay_ranges {
if let Some(min) = min_max.min() {
info!(%db_name, sequencer_id, sequence_number=min, "seek sequencer in preperation for replay");
write_buffer
.seek(*sequencer_id, min)
.await
.context(SeekError {
sequencer_id: *sequencer_id,
})?;
} else {
let sequence_number = min_max.max() + 1;
info!(%db_name, sequencer_id, sequence_number, "seek sequencer that did not require replay");
write_buffer
.seek(*sequencer_id, sequence_number)
.await
.context(SeekError {
sequencer_id: *sequencer_id,
})?;
}
}
// replay ranges
for (sequencer_id, mut stream) in write_buffer.streams() {
if let Some(min_max) = replay_ranges.get(&sequencer_id) {
if min_max.min().is_none() {
// no replay required
continue;
// replay ranges
for (sequencer_id, mut stream) in write_buffer.streams() {
if let Some(min_max) = replay_ranges.get(&sequencer_id) {
if min_max.min().is_none() {
// no replay required
continue;
}
info!(
%db_name,
sequencer_id,
sequence_number_min=min_max.min().expect("checked above"),
sequence_number_max=min_max.max(),
"replay sequencer",
);
while let Some(entry) = stream
.stream
.try_next()
.await
.context(EntryError { sequencer_id })?
{
let sequence = *entry.sequence().expect("entry must be sequenced");
if sequence.number > min_max.max() {
return Err(Error::EntryLostError {
sequencer_id,
actual_sequence_number: sequence.number,
expected_sequence_number: min_max.max(),
});
}
info!(
%db_name,
sequencer_id,
sequence_number_min=min_max.min().expect("checked above"),
sequence_number_max=min_max.max(),
"replay sequencer",
);
while let Some(entry) = stream
.stream
.try_next()
.await
.context(EntryError { sequencer_id })?
{
let sequence = *entry.sequence().expect("entry must be sequenced");
if sequence.number > min_max.max() {
return Err(Error::EntryLostError {
sequencer_id,
actual_sequence_number: sequence.number,
expected_sequence_number: min_max.max(),
});
}
let entry = Arc::new(entry);
let mut logged_hard_limit = false;
let n_tries = 600; // 600*100ms = 60s
for n_try in 1..=n_tries {
match db.store_sequenced_entry(
Arc::clone(&entry),
|sequence, partition_key, table_batch| {
filter_entry(sequence, partition_key, table_batch, replay_plan)
},
) {
Ok(_) => {
break;
}
Err(crate::db::Error::HardLimitReached {}) if n_try < n_tries => {
if !logged_hard_limit {
info!(
%db_name,
sequencer_id,
n_try,
n_tries,
"Hard limit reached while replaying, waiting for compaction to catch up",
);
logged_hard_limit = true;
}
tokio::time::sleep(Duration::from_millis(100)).await;
continue;
}
Err(e) => {
return Err(Error::StoreError {
let entry = Arc::new(entry);
let mut logged_hard_limit = false;
let n_tries = 600; // 600*100ms = 60s
for n_try in 1..=n_tries {
match db.store_sequenced_entry(
Arc::clone(&entry),
|sequence, partition_key, table_batch| {
filter_entry(sequence, partition_key, table_batch, replay_plan)
},
) {
Ok(_) => {
break;
}
Err(crate::db::Error::HardLimitReached {}) if n_try < n_tries => {
if !logged_hard_limit {
info!(
%db_name,
sequencer_id,
source: Box::new(e),
});
n_try,
n_tries,
"Hard limit reached while replaying, waiting for compaction to catch up",
);
logged_hard_limit = true;
}
tokio::time::sleep(Duration::from_millis(100)).await;
continue;
}
Err(e) => {
return Err(Error::StoreError {
sequencer_id,
source: Box::new(e),
});
}
}
}
// done replaying?
if sequence.number == min_max.max() {
break;
}
// done replaying?
if sequence.number == min_max.max() {
break;
}
}
}
}
// remember max seen sequence numbers even for partitions that were not touched during replay
let late_arrival_window = db.rules().lifecycle_rules.late_arrive_window();
for (table_name, partition_key) in replay_plan.partitions() {
if let Ok(partition) = db.partition(&table_name, &partition_key) {
let mut partition = partition.write();
let partition_checkpoint = replay_plan
.last_partition_checkpoint(&table_name, &partition_key)
.expect("replay plan inconsistent");
// remember max seen sequence numbers even for partitions that were not touched during replay
let late_arrival_window = db.rules().lifecycle_rules.late_arrive_window();
for (table_name, partition_key) in replay_plan.partitions() {
if let Ok(partition) = db.partition(&table_name, &partition_key) {
let mut partition = partition.write();
let partition_checkpoint = replay_plan
.last_partition_checkpoint(&table_name, &partition_key)
.expect("replay plan inconsistent");
match partition.persistence_windows_mut() {
Some(windows) => {
windows.mark_seen_and_persisted(partition_checkpoint);
}
None => {
let mut windows = PersistenceWindows::new(
partition.addr().clone(),
late_arrival_window,
db.background_worker_now(),
);
windows.mark_seen_and_persisted(partition_checkpoint);
partition.set_persistence_windows(windows);
}
match partition.persistence_windows_mut() {
Some(windows) => {
windows.mark_seen_and_persisted(partition_checkpoint);
}
None => {
let mut windows = PersistenceWindows::new(
partition.addr().clone(),
late_arrival_window,
db.background_worker_now(),
);
windows.mark_seen_and_persisted(partition_checkpoint);
partition.set_persistence_windows(windows);
}
}
}
@ -442,12 +435,10 @@ mod tests {
use test_helpers::{assert_contains, assert_not_contains, tracing::TracingCapture};
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use write_buffer::{
config::WriteBufferConfig,
mock::{MockBufferForReading, MockBufferSharedState},
};
use write_buffer::mock::{MockBufferForReading, MockBufferSharedState};
use crate::utils::TestDb;
use crate::write_buffer::WriteBufferConsumer;
#[derive(Debug)]
struct TestSequencedEntry {
@ -573,11 +564,13 @@ mod tests {
let partition_template = PartitionTemplate {
parts: vec![TemplatePart::Column("tag_partition_by".to_string())],
};
let registry = metrics::MetricRegistry::new();
let write_buffer_state =
MockBufferSharedState::empty_with_n_sequencers(self.n_sequencers);
let (mut test_db, mut shutdown, mut join_handle) = Self::create_test_db(
Arc::clone(&object_store),
write_buffer_state.clone(),
server_id,
db_name,
partition_template.clone(),
@ -586,6 +579,15 @@ mod tests {
)
.await;
let mut maybe_consumer = Some(WriteBufferConsumer::new(
Box::new(MockBufferForReading::new(write_buffer_state.clone(), None).unwrap()),
Arc::clone(&test_db.db),
&registry,
));
// This is used to carry the write buffer from replay to await
let mut maybe_write_buffer = None;
// ==================== do: main loop ====================
for (step, action_or_check) in self.steps.into_iter().enumerate() {
println!("===== step {} =====\n{:?}", step + 1, action_or_check);
@ -597,6 +599,11 @@ mod tests {
}
}
Step::Restart => {
if let Some(consumer) = maybe_consumer.take() {
consumer.shutdown();
consumer.join().await.unwrap();
}
// stop background worker
shutdown.cancel();
join_handle.await.unwrap();
@ -610,7 +617,6 @@ mod tests {
// then create new one
let (test_db_tmp, shutdown_tmp, join_handle_tmp) = Self::create_test_db(
Arc::clone(&object_store),
write_buffer_state.clone(),
server_id,
db_name,
partition_template.clone(),
@ -622,15 +628,26 @@ mod tests {
shutdown = shutdown_tmp;
join_handle = join_handle_tmp;
}
Step::Replay => {
let db = &test_db.db;
Step::Replay | Step::SkipReplay => {
assert!(maybe_consumer.is_none());
assert!(maybe_write_buffer.is_none());
db.perform_replay(Some(&test_db.replay_plan)).await.unwrap();
}
Step::SkipReplay => {
let db = &test_db.db;
let replay_plan = match action_or_check {
Step::Replay => Some(&test_db.replay_plan),
Step::SkipReplay => None,
_ => unreachable!(),
};
db.perform_replay(None).await.unwrap();
let mut write_buffer =
MockBufferForReading::new(write_buffer_state.clone(), None).unwrap();
test_db
.db
.perform_replay(replay_plan, &mut write_buffer)
.await
.unwrap();
maybe_write_buffer = Some(write_buffer);
}
Step::Persist(partitions) => {
let db = &test_db.db;
@ -689,9 +706,21 @@ mod tests {
Self::eval_checks(&checks, true, &test_db).await;
}
Step::Await(checks) => {
if maybe_consumer.is_none() {
let write_buffer = match maybe_write_buffer.take() {
Some(write_buffer) => write_buffer,
None => MockBufferForReading::new(write_buffer_state.clone(), None)
.unwrap(),
};
maybe_consumer = Some(WriteBufferConsumer::new(
Box::new(write_buffer),
Arc::clone(&test_db.db),
&registry,
));
}
let db = &test_db.db;
db.unsuppress_persistence().await;
db.allow_write_buffer_read();
// wait until checks pass
let t_0 = Instant::now();
@ -734,20 +763,15 @@ mod tests {
async fn create_test_db(
object_store: Arc<ObjectStore>,
write_buffer_state: MockBufferSharedState,
server_id: ServerId,
db_name: &'static str,
partition_template: PartitionTemplate,
catalog_transactions_until_checkpoint: NonZeroU64,
now: Instant,
) -> (TestDb, CancellationToken, JoinHandle<()>) {
let write_buffer = MockBufferForReading::new(write_buffer_state, None).unwrap();
let test_db = TestDb::builder()
.object_store(object_store)
.server_id(server_id)
.write_buffer(WriteBufferConfig::Reading(Arc::new(
tokio::sync::Mutex::new(Box::new(write_buffer) as _),
)))
.lifecycle_rules(data_types::database_rules::LifecycleRules {
buffer_size_hard: Some(NonZeroUsize::new(12_000).unwrap()),
late_arrive_window_seconds: NonZeroU32::try_from(1).unwrap(),
@ -2588,16 +2612,10 @@ mod tests {
// create write buffer w/ sequencer 0 and 1
let write_buffer_state =
MockBufferSharedState::empty_with_n_sequencers(NonZeroU32::try_from(2).unwrap());
let write_buffer = MockBufferForReading::new(write_buffer_state, None).unwrap();
let mut write_buffer = MockBufferForReading::new(write_buffer_state, None).unwrap();
// create DB
let db = TestDb::builder()
.write_buffer(WriteBufferConfig::Reading(Arc::new(
tokio::sync::Mutex::new(Box::new(write_buffer) as _),
)))
.build()
.await
.db;
let db = TestDb::builder().build().await.db;
// construct replay plan for sequencers 0 and 2
let mut sequencer_numbers = BTreeMap::new();
@ -2618,7 +2636,9 @@ mod tests {
let replay_plan = replay_planner.build().unwrap();
// replay fails
let res = db.perform_replay(Some(&replay_plan)).await;
let res = db
.perform_replay(Some(&replay_plan), &mut write_buffer)
.await;
assert_contains!(
res.unwrap_err().to_string(),
"Replay plan references unknown sequencer"
@ -2640,16 +2660,10 @@ mod tests {
Utc::now(),
lp_to_entry("cpu bar=1 10"),
));
let write_buffer = MockBufferForReading::new(write_buffer_state, None).unwrap();
let mut write_buffer = MockBufferForReading::new(write_buffer_state, None).unwrap();
// create DB
let db = TestDb::builder()
.write_buffer(WriteBufferConfig::Reading(Arc::new(
tokio::sync::Mutex::new(Box::new(write_buffer) as _),
)))
.build()
.await
.db;
let db = TestDb::builder().build().await.db;
// construct replay plan to replay sequence numbers 0 and 1
let mut sequencer_numbers = BTreeMap::new();
@ -2669,7 +2683,9 @@ mod tests {
let replay_plan = replay_planner.build().unwrap();
// replay fails
let res = db.perform_replay(Some(&replay_plan)).await;
let res = db
.perform_replay(Some(&replay_plan), &mut write_buffer)
.await;
assert_contains!(
res.unwrap_err().to_string(),
"Cannot replay: For sequencer 0 expected to find sequence 1 but replay jumped to 2"
@ -2699,19 +2715,14 @@ mod tests {
Utc::now(),
lp_to_entry("cpu bar=11 11"),
));
let write_buffer = MockBufferForReading::new(write_buffer_state.clone(), None).unwrap();
let mut write_buffer = MockBufferForReading::new(write_buffer_state.clone(), None).unwrap();
// create DB
let test_db = TestDb::builder()
.write_buffer(WriteBufferConfig::Reading(Arc::new(
tokio::sync::Mutex::new(Box::new(write_buffer) as _),
)))
.build()
.await;
let test_db = TestDb::builder().build().await;
let db = &test_db.db;
// seek
db.perform_replay(None).await.unwrap();
db.perform_replay(None, &mut write_buffer).await.unwrap();
// add more data
write_buffer_state.push_entry(SequencedEntry::new_from_sequence(
@ -2736,7 +2747,9 @@ mod tests {
let db_captured = Arc::clone(db);
let join_handle =
tokio::spawn(async move { db_captured.background_worker(shutdown_captured).await });
db.allow_write_buffer_read();
let consumer =
WriteBufferConsumer::new(Box::new(write_buffer), Arc::clone(db), &Default::default());
// wait until checks pass
let checks = vec![Check::Query(
@ -2771,6 +2784,9 @@ mod tests {
// stop background worker
shutdown.cancel();
join_handle.await.unwrap();
consumer.shutdown();
consumer.join().await.unwrap();
}
#[test]

View File

@ -114,6 +114,8 @@ pub mod rules;
/// Utility modules used by benchmarks and tests
pub mod utils;
mod write_buffer;
type DatabaseError = Box<dyn std::error::Error + Send + Sync + 'static>;
#[derive(Debug, Snafu)]
@ -646,11 +648,16 @@ where
Ok(Arc::clone(db))
}
/// Returns an active `Database` by name
pub fn active_database(&self, db_name: &DatabaseName<'_>) -> Result<Arc<Database>> {
let database = self.database(db_name)?;
ensure!(database.is_active(), DatabaseNotFound { db_name });
Ok(database)
}
/// Returns an initialized `Db` by name
pub fn db(&self, db_name: &DatabaseName<'_>) -> Result<Arc<Db>> {
let database = self.database(db_name)?;
ensure!(database.is_active(), DatabaseNotFound { db_name });
let database = self.active_database(db_name)?;
database
.initialized_db()
@ -930,7 +937,18 @@ where
///
/// TODO: Push this out of `Server` into `Database`
pub async fn write_entry_local(&self, db_name: &DatabaseName<'_>, entry: Entry) -> Result<()> {
let db = self.db(db_name)?;
let database = self.active_database(db_name)?;
let db = {
let initialized = database
.initialized()
.context(DatabaseNotInitialized { db_name })?;
if initialized.write_buffer_consumer().is_some() {
return WritingOnlyAllowedThroughWriteBuffer { db_name }.fail();
}
Arc::clone(initialized.db())
};
let bytes = entry.data().len() as u64;
db.store_entry(entry).await.map_err(|e| {
self.metrics.ingest_entries_bytes_total.add_with_attributes(
@ -942,11 +960,6 @@ where
);
match e {
db::Error::HardLimitReached {} => Error::HardLimitReached {},
db::Error::WritingOnlyAllowedThroughWriteBuffer {} => {
Error::WritingOnlyAllowedThroughWriteBuffer {
db_name: db_name.into(),
}
}
db::Error::WriteBufferWritingError { .. } => {
Error::WriteBuffer { source: e, bytes }
}
@ -1243,6 +1256,7 @@ where
#[cfg(test)]
mod tests {
use super::*;
use ::write_buffer::config::WriteBufferConfigFactory;
use arrow::record_batch::RecordBatch;
use arrow_util::assert_batches_eq;
use bytes::Bytes;
@ -1274,7 +1288,6 @@ mod tests {
time::{Duration, Instant},
};
use test_helpers::assert_contains;
use write_buffer::config::WriteBufferConfigFactory;
const ARBITRARY_DEFAULT_TIME: i64 = 456;

View File

@ -14,7 +14,7 @@ use persistence_windows::checkpoint::ReplayPlan;
use query::exec::ExecutorConfig;
use query::{exec::Executor, QueryDatabase};
use std::{borrow::Cow, convert::TryFrom, num::NonZeroU32, sync::Arc, time::Duration};
use write_buffer::config::WriteBufferConfig;
use write_buffer::core::WriteBufferWriting;
// A wrapper around a Db and a metrics registry allowing for isolated testing
// of a Db and its metrics.
@ -38,7 +38,7 @@ pub struct TestDbBuilder {
object_store: Option<Arc<ObjectStore>>,
db_name: Option<DatabaseName<'static>>,
worker_cleanup_avg_sleep: Option<Duration>,
write_buffer: Option<WriteBufferConfig>,
write_buffer_producer: Option<Arc<dyn WriteBufferWriting>>,
lifecycle_rules: Option<LifecycleRules>,
partition_template: Option<PartitionTemplate>,
}
@ -124,7 +124,7 @@ impl TestDbBuilder {
iox_object_store,
preserved_catalog,
catalog,
write_buffer: self.write_buffer,
write_buffer_producer: self.write_buffer_producer,
exec,
metrics_registry_v2: Arc::clone(&metrics_registry_v2),
};
@ -160,8 +160,11 @@ impl TestDbBuilder {
self
}
pub fn write_buffer(mut self, write_buffer: WriteBufferConfig) -> Self {
self.write_buffer = Some(write_buffer);
pub fn write_buffer_producer(
mut self,
write_buffer_producer: Arc<dyn WriteBufferWriting>,
) -> Self {
self.write_buffer_producer = Some(write_buffer_producer);
self
}

545
server/src/write_buffer.rs Normal file
View File

@ -0,0 +1,545 @@
use std::future::Future;
use std::sync::Arc;
use std::time::{Duration, Instant};
use futures::future::{BoxFuture, Shared};
use futures::stream::{BoxStream, FuturesUnordered};
use futures::{FutureExt, StreamExt, TryFutureExt};
use tokio::task::JoinError;
use tokio_util::sync::CancellationToken;
use ::metrics::{KeyValue, MetricRegistry};
use entry::SequencedEntry;
use observability_deps::tracing::{debug, error, info};
use write_buffer::core::{FetchHighWatermark, WriteBufferError, WriteBufferReading};
use crate::Db;
use self::metrics::SequencerMetrics;
mod metrics;
/// A `WriteBufferConsumer` is created from a `Db` and a `WriteBufferReading` and
/// sinks records from the inbound streams into the `Db`
#[derive(Debug)]
pub struct WriteBufferConsumer {
/// Future that resolves when the background worker exits
join: Shared<BoxFuture<'static, Result<(), Arc<JoinError>>>>,
/// A token that is used to trigger shutdown of the background worker
shutdown: CancellationToken,
}
impl WriteBufferConsumer {
pub fn new(
mut write_buffer: Box<dyn WriteBufferReading>,
db: Arc<Db>,
registry: &MetricRegistry,
) -> Self {
let shutdown = CancellationToken::new();
let ingest_domain = registry.register_domain_with_attributes(
"write_buffer",
vec![KeyValue::new("db_name", db.rules().name.to_string())],
);
let ingest_metrics = metrics::WriteBufferIngestMetrics::new(Arc::new(ingest_domain));
let shutdown_captured = shutdown.clone();
let join = tokio::spawn(async move {
let mut futures: FuturesUnordered<_> = write_buffer
.streams()
.into_iter()
.map(|(sequencer_id, stream)| {
let metrics = ingest_metrics.new_sequencer_metrics(sequencer_id);
stream_in_sequenced_entries(
Arc::clone(&db),
sequencer_id,
stream.stream,
stream.fetch_high_watermark,
metrics,
)
})
.collect();
tokio::select! {
_ = shutdown_captured.cancelled() => info!("write buffer shut down triggered"),
_ = futures.next() => error!("unexpected shutdown of write buffer consumer"),
}
})
.map_err(Arc::new)
.boxed()
.shared();
Self { join, shutdown }
}
/// Triggers shutdown of this `WriteBufferConsumer`
pub fn shutdown(&self) {
self.shutdown.cancel()
}
/// Waits for the background worker of this `Database` to exit
pub fn join(&self) -> impl Future<Output = Result<(), Arc<JoinError>>> {
self.join.clone()
}
}
/// This is used to take entries from a `Stream` and put them in the mutable buffer, such as
/// streaming entries from a write buffer.
async fn stream_in_sequenced_entries<'a>(
db: Arc<Db>,
sequencer_id: u32,
mut stream: BoxStream<'a, Result<SequencedEntry, WriteBufferError>>,
f_mark: FetchHighWatermark<'a>,
mut metrics: SequencerMetrics,
) {
let db_name = db.rules().name.to_string();
let mut watermark_last_updated: Option<Instant> = None;
let mut watermark = 0_u64;
while let Some(sequenced_entry_result) = stream.next().await {
let red_observation = metrics.red.observation();
// get entry from sequencer
let sequenced_entry = match sequenced_entry_result {
Ok(sequenced_entry) => sequenced_entry,
Err(e) => {
debug!(
%e,
%db_name,
sequencer_id,
"Error converting write buffer data to SequencedEntry",
);
red_observation.client_error();
continue;
}
};
let sequenced_entry = Arc::new(sequenced_entry);
// store entry
let mut logged_hard_limit = false;
loop {
match db.store_sequenced_entry(
Arc::clone(&sequenced_entry),
crate::db::filter_table_batch_keep_all,
) {
Ok(_) => {
red_observation.ok();
break;
}
Err(crate::db::Error::HardLimitReached {}) => {
// wait a bit and retry
if !logged_hard_limit {
info!(
%db_name,
sequencer_id,
"Hard limit reached while reading from write buffer, waiting for compaction to catch up",
);
logged_hard_limit = true;
}
tokio::time::sleep(Duration::from_millis(100)).await;
continue;
}
Err(e) => {
debug!(
%e,
%db_name,
sequencer_id,
"Error storing SequencedEntry from write buffer in database"
);
red_observation.error();
// no retry
break;
}
}
}
// maybe update sequencer watermark
// We are not updating this watermark every round because asking the sequencer for that watermark can be
// quite expensive.
let now = Instant::now();
if watermark_last_updated
.map(|ts| now.duration_since(ts) > Duration::from_secs(10))
.unwrap_or(true)
{
match f_mark().await {
Ok(w) => {
watermark = w;
}
Err(e) => {
debug!(
%e,
%db_name,
sequencer_id,
"Error while reading sequencer watermark",
)
}
}
watermark_last_updated = Some(now);
}
std::mem::drop(red_observation);
metrics.record_write(&sequenced_entry, watermark)
}
}
#[cfg(test)]
mod tests {
use std::collections::BTreeMap;
use std::convert::TryFrom;
use std::num::{NonZeroU32, NonZeroUsize};
use chrono::{TimeZone, Utc};
use ::test_helpers::assert_contains;
use arrow_util::assert_batches_eq;
use data_types::database_rules::{PartitionTemplate, TemplatePart};
use entry::test_helpers::lp_to_entry;
use entry::Sequence;
use persistence_windows::min_max_sequence::MinMaxSequence;
use query::exec::ExecutionContextProvider;
use query::frontend::sql::SqlQueryPlanner;
use query::QueryDatabase;
use test_helpers::tracing::TracingCapture;
use write_buffer::mock::{MockBufferForReading, MockBufferSharedState};
use crate::db::test_helpers::run_query;
use crate::utils::TestDb;
use super::*;
#[tokio::test]
async fn read_from_write_buffer_updates_persistence_windows() {
let entry = lp_to_entry("cpu bar=1 10");
let partition_key = "1970-01-01T00";
let write_buffer_state =
MockBufferSharedState::empty_with_n_sequencers(NonZeroU32::try_from(2).unwrap());
write_buffer_state.push_entry(SequencedEntry::new_from_sequence(
Sequence::new(0, 0),
Utc::now(),
entry.clone(),
));
write_buffer_state.push_entry(SequencedEntry::new_from_sequence(
Sequence::new(1, 0),
Utc::now(),
entry.clone(),
));
write_buffer_state.push_entry(SequencedEntry::new_from_sequence(
Sequence::new(1, 2),
Utc::now(),
entry.clone(),
));
write_buffer_state.push_entry(SequencedEntry::new_from_sequence(
Sequence::new(0, 1),
Utc::now(),
entry,
));
let db = TestDb::builder().build().await.db;
// do: start background task loop
let shutdown: CancellationToken = Default::default();
let shutdown_captured = shutdown.clone();
let db_captured = Arc::clone(&db);
let join_handle =
tokio::spawn(async move { db_captured.background_worker(shutdown_captured).await });
let consumer = WriteBufferConsumer::new(
Box::new(MockBufferForReading::new(write_buffer_state, None).unwrap()),
Arc::clone(&db),
&Default::default(),
);
// check: after a while the persistence windows should have the expected data
let t_0 = Instant::now();
let min_unpersisted = loop {
if let Ok(partition) = db.partition("cpu", partition_key) {
let partition = partition.write();
let windows = partition.persistence_windows().unwrap();
let min_unpersisted = windows.minimum_unpersisted_sequence();
if let Some(min_unpersisted) = min_unpersisted {
break min_unpersisted;
}
}
assert!(t_0.elapsed() < Duration::from_secs(10));
tokio::time::sleep(Duration::from_millis(100)).await;
};
// do: stop background task loop
shutdown.cancel();
join_handle.await.unwrap();
consumer.shutdown();
consumer.join().await.unwrap();
let mut expected_unpersisted = BTreeMap::new();
expected_unpersisted.insert(0, MinMaxSequence::new(0, 1));
expected_unpersisted.insert(1, MinMaxSequence::new(0, 2));
assert_eq!(min_unpersisted, expected_unpersisted);
}
#[tokio::test]
async fn read_from_write_buffer_write_to_mutable_buffer() {
let write_buffer_state =
MockBufferSharedState::empty_with_n_sequencers(NonZeroU32::try_from(1).unwrap());
let ingest_ts1 = Utc.timestamp_millis(42);
let ingest_ts2 = Utc.timestamp_millis(1337);
write_buffer_state.push_entry(SequencedEntry::new_from_sequence(
Sequence::new(0, 0),
ingest_ts1,
lp_to_entry("mem foo=1 10"),
));
write_buffer_state.push_entry(SequencedEntry::new_from_sequence(
Sequence::new(0, 7),
ingest_ts2,
lp_to_entry("cpu bar=2 20\ncpu bar=3 30"),
));
let test_db = TestDb::builder().build().await;
let db = test_db.db;
// do: start background task loop
let shutdown: CancellationToken = Default::default();
let shutdown_captured = shutdown.clone();
let db_captured = Arc::clone(&db);
let join_handle =
tokio::spawn(async move { db_captured.background_worker(shutdown_captured).await });
let consumer = WriteBufferConsumer::new(
Box::new(MockBufferForReading::new(write_buffer_state, None).unwrap()),
Arc::clone(&db),
test_db.metric_registry.registry().as_ref(),
);
let query = "select * from cpu";
// check: after a while the table should exist and a query plan should succeed
let t_0 = Instant::now();
loop {
let planner = SqlQueryPlanner::default();
let ctx = db.new_query_context(None);
let physical_plan = planner.query(query, &ctx);
if physical_plan.is_ok() {
break;
}
assert!(t_0.elapsed() < Duration::from_secs(10));
tokio::time::sleep(Duration::from_millis(100)).await;
}
// check: metrics
// We need to do that BEFORE shutting down the background loop because gauges would be dropped and resetted otherwise
let metrics = test_db.metric_registry;
metrics
.has_metric_family("write_buffer_ingest_requests_total")
.with_attributes(&[
("db_name", "placeholder"),
("sequencer_id", "0"),
("status", "ok"),
])
.counter()
.eq(2.0)
.unwrap();
metrics
.has_metric_family("write_buffer_read_bytes_total")
.with_attributes(&[("db_name", "placeholder"), ("sequencer_id", "0")])
.counter()
.eq(528.0)
.unwrap();
metrics
.has_metric_family("write_buffer_last_sequence_number")
.with_attributes(&[("db_name", "placeholder"), ("sequencer_id", "0")])
.gauge()
.eq(7.0)
.unwrap();
metrics
.has_metric_family("write_buffer_sequence_number_lag")
.with_attributes(&[("db_name", "placeholder"), ("sequencer_id", "0")])
.gauge()
.eq(0.0)
.unwrap();
metrics
.has_metric_family("write_buffer_last_min_ts")
.with_attributes(&[("db_name", "placeholder"), ("sequencer_id", "0")])
.gauge()
.eq(20.0)
.unwrap();
metrics
.has_metric_family("write_buffer_last_max_ts")
.with_attributes(&[("db_name", "placeholder"), ("sequencer_id", "0")])
.gauge()
.eq(30.0)
.unwrap();
metrics
.has_metric_family("write_buffer_last_ingest_ts")
.with_attributes(&[("db_name", "placeholder"), ("sequencer_id", "0")])
.gauge()
.eq(ingest_ts2.timestamp_nanos() as f64)
.unwrap();
// do: stop background task loop
shutdown.cancel();
join_handle.await.unwrap();
consumer.shutdown();
consumer.join().await.unwrap();
// check: the expected results should be there
let batches = run_query(db, "select * from cpu order by time").await;
let expected = vec![
"+-----+--------------------------------+",
"| bar | time |",
"+-----+--------------------------------+",
"| 2 | 1970-01-01T00:00:00.000000020Z |",
"| 3 | 1970-01-01T00:00:00.000000030Z |",
"+-----+--------------------------------+",
];
assert_batches_eq!(expected, &batches);
}
#[tokio::test]
async fn write_buffer_reads_wait_for_compaction() {
let tracing_capture = TracingCapture::new();
// setup write buffer
// these numbers are handtuned to trigger hard buffer limits w/o making the test too big
let n_entries = 50u64;
let write_buffer_state =
MockBufferSharedState::empty_with_n_sequencers(NonZeroU32::try_from(1).unwrap());
for sequence_number in 0..n_entries {
let lp = format!(
"table_1,tag_partition_by=a foo=\"hello\",bar=1 {}",
sequence_number / 2
);
write_buffer_state.push_entry(SequencedEntry::new_from_sequence(
Sequence::new(0, sequence_number),
Utc::now(),
lp_to_entry(&lp),
));
}
write_buffer_state.push_entry(SequencedEntry::new_from_sequence(
Sequence::new(0, n_entries),
Utc::now(),
lp_to_entry("table_2,partition_by=a foo=1 0"),
));
// create DB
let partition_template = PartitionTemplate {
parts: vec![TemplatePart::Column("tag_partition_by".to_string())],
};
let test_db = TestDb::builder()
.lifecycle_rules(data_types::database_rules::LifecycleRules {
buffer_size_hard: Some(NonZeroUsize::new(10_000).unwrap()),
mub_row_threshold: NonZeroUsize::new(10).unwrap(),
..Default::default()
})
.partition_template(partition_template)
.build()
.await;
let db = test_db.db;
// start background task loop
let shutdown: CancellationToken = Default::default();
let shutdown_captured = shutdown.clone();
let db_captured = Arc::clone(&db);
let join_handle =
tokio::spawn(async move { db_captured.background_worker(shutdown_captured).await });
let consumer = WriteBufferConsumer::new(
Box::new(MockBufferForReading::new(write_buffer_state, None).unwrap()),
Arc::clone(&db),
test_db.metric_registry.registry().as_ref(),
);
// after a while the table should exist
let t_0 = Instant::now();
loop {
if db.table_schema("table_2").is_some() {
break;
}
assert!(t_0.elapsed() < Duration::from_secs(10));
tokio::time::sleep(Duration::from_millis(100)).await;
}
// do: stop background task loop
shutdown.cancel();
join_handle.await.unwrap();
consumer.shutdown();
consumer.join().await.unwrap();
// no rows should be dropped
let batches = run_query(db, "select sum(bar) as n from table_1").await;
let expected = vec!["+----+", "| n |", "+----+", "| 25 |", "+----+"];
assert_batches_eq!(expected, &batches);
// check that hard buffer limit was actually hit (otherwise this test is pointless/outdated)
assert_contains!(tracing_capture.to_string(), "Hard limit reached while reading from write buffer, waiting for compaction to catch up");
}
#[tokio::test]
async fn error_converting_data_from_write_buffer_to_sequenced_entry_is_reported() {
let write_buffer_state =
MockBufferSharedState::empty_with_n_sequencers(NonZeroU32::try_from(1).unwrap());
write_buffer_state.push_error(
String::from("Something bad happened on the way to creating a SequencedEntry").into(),
0,
);
let test_db = TestDb::builder().build().await;
let db = Arc::new(test_db.db);
let metrics = test_db.metric_registry;
// do: start background task loop
let shutdown: CancellationToken = Default::default();
let shutdown_captured = shutdown.clone();
let db_captured = Arc::clone(&db);
let join_handle =
tokio::spawn(async move { db_captured.background_worker(shutdown_captured).await });
let consumer = WriteBufferConsumer::new(
Box::new(MockBufferForReading::new(write_buffer_state, None).unwrap()),
Arc::clone(&db),
metrics.registry().as_ref(),
);
// check: after a while the error should be reported in the database's metrics
let t_0 = Instant::now();
loop {
let family = metrics.try_has_metric_family("write_buffer_ingest_requests_total");
if let Ok(metric) = family {
if metric
.with_attributes(&[
("db_name", "placeholder"),
("sequencer_id", "0"),
("status", "client_error"),
])
.counter()
.eq(1.0)
.is_ok()
{
break;
}
}
assert!(t_0.elapsed() < Duration::from_secs(10));
tokio::time::sleep(Duration::from_millis(100)).await;
}
// do: stop background task loop
shutdown.cancel();
join_handle.await.unwrap();
consumer.shutdown();
consumer.join().await.unwrap();
}
}

View File

@ -0,0 +1,167 @@
use metrics::KeyValue;
use std::sync::Arc;
/// Metrics for data ingest via write buffer.
#[derive(Debug)]
pub struct WriteBufferIngestMetrics {
/// Metrics domain
domain: Arc<metrics::Domain>,
}
impl WriteBufferIngestMetrics {
pub fn new(domain: Arc<metrics::Domain>) -> Self {
Self { domain }
}
pub fn new_sequencer_metrics(&self, sequencer_id: u32) -> SequencerMetrics {
let attributes = vec![KeyValue::new("sequencer_id", sequencer_id.to_string())];
let red = self
.domain
.register_red_metric_with_attributes(Some("ingest"), attributes.clone());
let bytes_read = self.domain.register_counter_metric_with_attributes(
"read",
Some("bytes"),
"Bytes read from sequencer",
attributes.clone(),
);
let last_sequence_number = self.domain.register_gauge_metric_with_attributes(
"last_sequence_number",
None,
"Last consumed sequence number (e.g. Kafka offset)",
&attributes,
);
let sequence_number_lag = self.domain.register_gauge_metric_with_attributes(
"sequence_number_lag",
None,
"The difference between the the last sequence number available (e.g. Kafka offset) and (= minus) last consumed sequence number",
&attributes,
);
let last_min_ts = self.domain.register_gauge_metric_with_attributes(
"last_min_ts",
None,
"Minimum timestamp of last write as unix timestamp in nanoseconds",
&attributes,
);
let last_max_ts = self.domain.register_gauge_metric_with_attributes(
"last_max_ts",
None,
"Maximum timestamp of last write as unix timestamp in nanoseconds",
&attributes,
);
let last_ingest_ts = self.domain.register_gauge_metric_with_attributes(
"last_ingest_ts",
None,
"Last seen ingest timestamp as unix timestamp in nanoseconds",
&attributes,
);
SequencerMetrics {
red,
bytes_read,
last_sequence_number,
sequence_number_lag,
last_min_ts,
last_max_ts,
last_ingest_ts,
}
}
}
/// Metrics for a single sequencer.
#[derive(Debug)]
pub struct SequencerMetrics {
/// Metrics for tracking ingest.
pub(super) red: metrics::RedMetric,
/// Bytes read from sequencer.
///
/// This metrics is independent of the success / error state of the entries.
bytes_read: metrics::Counter,
/// Last consumed sequence number (e.g. Kafka offset).
last_sequence_number: metrics::Gauge,
// The difference between the the last sequence number available (e.g. Kafka offset) and (= minus) last consumed
// sequence number.
sequence_number_lag: metrics::Gauge,
/// Minimum timestamp of last write as unix timestamp in nanoseconds.
last_min_ts: metrics::Gauge,
/// Maximum timestamp of last write as unix timestamp in nanoseconds.
last_max_ts: metrics::Gauge,
/// Last seen ingest timestamp as unix timestamp in nanoseconds.
last_ingest_ts: metrics::Gauge,
}
impl SequencerMetrics {
/// Record a succesful write
///
/// Updates:
///
/// - bytes read
/// - last sequence number
/// - lag
/// - min ts
/// - max ts
/// - ingest ts
pub fn record_write(&mut self, sequenced_entry: &entry::SequencedEntry, watermark: u64) {
let entry = sequenced_entry.entry();
let producer_wallclock_timestamp = sequenced_entry
.producer_wallclock_timestamp()
.expect("entry from write buffer must have a producer wallclock time");
let sequence = sequenced_entry
.sequence()
.expect("entry from write buffer must be sequenced");
self.bytes_read.add(entry.data().len() as u64);
self.last_sequence_number.set(sequence.number as usize, &[]);
self.sequence_number_lag.set(
watermark.saturating_sub(sequence.number).saturating_sub(1) as usize,
&[],
);
if let Some(min_ts) = entry
.partition_writes()
.map(|partition_writes| {
partition_writes
.iter()
.filter_map(|partition_write| {
partition_write
.table_batches()
.iter()
.filter_map(|table_batch| table_batch.min_max_time().ok())
.map(|(min, _max)| min)
.max()
})
.min()
})
.flatten()
{
self.last_min_ts.set(min_ts.timestamp_nanos() as usize, &[]);
}
if let Some(max_ts) = entry
.partition_writes()
.map(|partition_writes| {
partition_writes
.iter()
.filter_map(|partition_write| {
partition_write
.table_batches()
.iter()
.filter_map(|table_batch| table_batch.min_max_time().ok())
.map(|(_min, max)| max)
.max()
})
.max()
})
.flatten()
{
self.last_max_ts.set(max_ts.timestamp_nanos() as usize, &[]);
}
self.last_ingest_ts
.set(producer_wallclock_timestamp.timestamp_nanos() as usize, &[]);
}
}

View File

@ -4,7 +4,7 @@ use std::{
};
use data_types::{
database_rules::{DatabaseRules, WriteBufferConnection, WriteBufferDirection},
database_rules::{WriteBufferConnection, WriteBufferDirection},
server_id::ServerId,
};
@ -29,7 +29,8 @@ enum Mock {
AlwaysFailing,
}
/// Factory that creates [`WriteBufferConfig`] from [`DatabaseRules`].
/// Factory that creates [`WriteBufferReading`] and [`WriteBufferWriting`]
/// from [`WriteBufferConnection`].
#[derive(Debug)]
pub struct WriteBufferConfigFactory {
mocks: BTreeMap<String, Mock>,
@ -77,30 +78,12 @@ impl WriteBufferConfigFactory {
.ok_or_else::<WriteBufferError, _>(|| format!("Unknown mock ID: {}", name).into())
}
/// Create new config.
pub async fn new_config(
&self,
server_id: ServerId,
rules: &DatabaseRules,
) -> Result<Option<WriteBufferConfig>, WriteBufferError> {
let db_name = rules.db_name();
let cfg = match rules.write_buffer_connection.as_ref() {
Some(cfg) => match cfg.direction {
WriteBufferDirection::Write => Some(WriteBufferConfig::Writing(
self.new_config_write(db_name, cfg).await?,
)),
WriteBufferDirection::Read => Some(WriteBufferConfig::Reading(Arc::new(
tokio::sync::Mutex::new(self.new_config_read(server_id, db_name, cfg).await?),
))),
},
None => None,
};
Ok(cfg)
}
async fn new_config_write(
/// Returns a new [`WriteBufferWriting`] for the provided [`WriteBufferConnection`]
///
/// # Panics
/// When the provided connection is not [`WriteBufferDirection::Write`]
///
pub async fn new_config_write(
&self,
db_name: &str,
cfg: &WriteBufferConnection,
@ -137,7 +120,11 @@ impl WriteBufferConfigFactory {
Ok(writer)
}
async fn new_config_read(
/// Returns a new [`WriteBufferReading`] for the provided [`WriteBufferConnection`]
///
/// # Panics
/// When the provided connection is not [`WriteBufferDirection::Read`]
pub async fn new_config_read(
&self,
server_id: ServerId,
db_name: &str,
@ -196,47 +183,24 @@ mod tests {
use super::*;
#[tokio::test]
async fn test_none() {
let factory = WriteBufferConfigFactory::new();
let server_id = ServerId::try_from(1).unwrap();
let mut rules = DatabaseRules::new(DatabaseName::new("foo").unwrap());
rules.write_buffer_connection = None;
assert!(factory
.new_config(server_id, &rules)
.await
.unwrap()
.is_none());
}
#[tokio::test]
async fn test_writing_kafka() {
let conn = maybe_skip_kafka_integration!();
let factory = WriteBufferConfigFactory::new();
let server_id = ServerId::try_from(1).unwrap();
let mut rules = DatabaseRules::new(DatabaseName::try_from(random_kafka_topic()).unwrap());
rules.write_buffer_connection = Some(WriteBufferConnection {
let db_name = DatabaseName::try_from(random_kafka_topic()).unwrap();
let cfg = WriteBufferConnection {
direction: WriteBufferDirection::Write,
type_: "kafka".to_string(),
connection: conn,
creation_config: Some(WriteBufferCreationConfig::default()),
..Default::default()
});
};
if let WriteBufferConfig::Writing(conn) = factory
.new_config(server_id, &rules)
let conn = factory
.new_config_write(db_name.as_str(), &cfg)
.await
.unwrap()
.unwrap()
{
assert_eq!(conn.type_name(), "kafka");
} else {
panic!("not a writing connection");
}
.unwrap();
assert_eq!(conn.type_name(), "kafka");
}
#[tokio::test]
@ -245,26 +209,20 @@ mod tests {
let factory = WriteBufferConfigFactory::new();
let server_id = ServerId::try_from(1).unwrap();
let mut rules = DatabaseRules::new(DatabaseName::try_from(random_kafka_topic()).unwrap());
rules.write_buffer_connection = Some(WriteBufferConnection {
let db_name = DatabaseName::try_from(random_kafka_topic()).unwrap();
let cfg = WriteBufferConnection {
direction: WriteBufferDirection::Read,
type_: "kafka".to_string(),
connection: conn,
creation_config: Some(WriteBufferCreationConfig::default()),
..Default::default()
});
};
if let WriteBufferConfig::Reading(conn) = factory
.new_config(server_id, &rules)
let conn = factory
.new_config_read(server_id, db_name.as_str(), &cfg)
.await
.unwrap()
.unwrap()
{
let conn = conn.lock().await;
assert_eq!(conn.type_name(), "kafka");
} else {
panic!("not a reading connection");
}
.unwrap();
assert_eq!(conn.type_name(), "kafka");
}
#[tokio::test]
@ -276,35 +234,31 @@ mod tests {
let mock_name = "some_mock";
factory.register_mock(mock_name.to_string(), state);
let server_id = ServerId::try_from(1).unwrap();
let mut rules = DatabaseRules::new(DatabaseName::new("foo").unwrap());
rules.write_buffer_connection = Some(WriteBufferConnection {
let db_name = DatabaseName::try_from(random_kafka_topic()).unwrap();
let cfg = WriteBufferConnection {
direction: WriteBufferDirection::Write,
type_: "mock".to_string(),
connection: mock_name.to_string(),
..Default::default()
});
};
if let WriteBufferConfig::Writing(conn) = factory
.new_config(server_id, &rules)
let conn = factory
.new_config_write(db_name.as_str(), &cfg)
.await
.unwrap()
.unwrap()
{
assert_eq!(conn.type_name(), "mock");
} else {
panic!("not a writing connection");
}
.unwrap();
assert_eq!(conn.type_name(), "mock");
// will error when state is unknown
rules.write_buffer_connection = Some(WriteBufferConnection {
let cfg = WriteBufferConnection {
direction: WriteBufferDirection::Write,
type_: "mock".to_string(),
connection: "bar".to_string(),
..Default::default()
});
let err = factory.new_config(server_id, &rules).await.unwrap_err();
};
let err = factory
.new_config_write(db_name.as_str(), &cfg)
.await
.unwrap_err();
assert!(err.to_string().starts_with("Unknown mock ID:"));
}
@ -318,35 +272,31 @@ mod tests {
factory.register_mock(mock_name.to_string(), state);
let server_id = ServerId::try_from(1).unwrap();
let mut rules = DatabaseRules::new(DatabaseName::new("foo").unwrap());
rules.write_buffer_connection = Some(WriteBufferConnection {
let db_name = DatabaseName::try_from(random_kafka_topic()).unwrap();
let cfg = WriteBufferConnection {
direction: WriteBufferDirection::Read,
type_: "mock".to_string(),
connection: mock_name.to_string(),
..Default::default()
});
};
if let WriteBufferConfig::Reading(conn) = factory
.new_config(server_id, &rules)
let conn = factory
.new_config_read(server_id, db_name.as_str(), &cfg)
.await
.unwrap()
.unwrap()
{
let conn = conn.lock().await;
assert_eq!(conn.type_name(), "mock");
} else {
panic!("not a reading connection");
}
.unwrap();
assert_eq!(conn.type_name(), "mock");
// will error when state is unknown
rules.write_buffer_connection = Some(WriteBufferConnection {
let cfg = WriteBufferConnection {
direction: WriteBufferDirection::Read,
type_: "mock".to_string(),
connection: "bar".to_string(),
..Default::default()
});
let err = factory.new_config(server_id, &rules).await.unwrap_err();
};
let err = factory
.new_config_read(server_id, db_name.as_str(), &cfg)
.await
.unwrap_err();
assert!(err.to_string().starts_with("Unknown mock ID:"));
}
@ -357,35 +307,31 @@ mod tests {
let mock_name = "some_mock";
factory.register_always_fail_mock(mock_name.to_string());
let server_id = ServerId::try_from(1).unwrap();
let mut rules = DatabaseRules::new(DatabaseName::new("foo").unwrap());
rules.write_buffer_connection = Some(WriteBufferConnection {
let db_name = DatabaseName::try_from(random_kafka_topic()).unwrap();
let cfg = WriteBufferConnection {
direction: WriteBufferDirection::Write,
type_: "mock".to_string(),
connection: mock_name.to_string(),
..Default::default()
});
};
if let WriteBufferConfig::Writing(conn) = factory
.new_config(server_id, &rules)
let conn = factory
.new_config_write(db_name.as_str(), &cfg)
.await
.unwrap()
.unwrap()
{
assert_eq!(conn.type_name(), "mock_failing");
} else {
panic!("not a writing connection");
}
.unwrap();
assert_eq!(conn.type_name(), "mock_failing");
// will error when state is unknown
rules.write_buffer_connection = Some(WriteBufferConnection {
let cfg = WriteBufferConnection {
direction: WriteBufferDirection::Write,
type_: "mock".to_string(),
connection: "bar".to_string(),
..Default::default()
});
let err = factory.new_config(server_id, &rules).await.unwrap_err();
};
let err = factory
.new_config_write(db_name.as_str(), &cfg)
.await
.unwrap_err();
assert!(err.to_string().starts_with("Unknown mock ID:"));
}
@ -398,34 +344,31 @@ mod tests {
let server_id = ServerId::try_from(1).unwrap();
let mut rules = DatabaseRules::new(DatabaseName::new("foo").unwrap());
rules.write_buffer_connection = Some(WriteBufferConnection {
let db_name = DatabaseName::new("foo").unwrap();
let cfg = WriteBufferConnection {
direction: WriteBufferDirection::Read,
type_: "mock".to_string(),
connection: mock_name.to_string(),
..Default::default()
});
};
if let WriteBufferConfig::Reading(conn) = factory
.new_config(server_id, &rules)
let conn = factory
.new_config_read(server_id, db_name.as_str(), &cfg)
.await
.unwrap()
.unwrap()
{
let conn = conn.lock().await;
assert_eq!(conn.type_name(), "mock_failing");
} else {
panic!("not a reading connection");
}
.unwrap();
assert_eq!(conn.type_name(), "mock_failing");
// will error when state is unknown
rules.write_buffer_connection = Some(WriteBufferConnection {
let cfg = WriteBufferConnection {
direction: WriteBufferDirection::Read,
type_: "mock".to_string(),
connection: "bar".to_string(),
..Default::default()
});
let err = factory.new_config(server_id, &rules).await.unwrap_err();
};
let err = factory
.new_config_read(server_id, db_name.as_str(), &cfg)
.await
.unwrap_err();
assert!(err.to_string().starts_with("Unknown mock ID:"));
}

View File

@ -27,6 +27,9 @@ struct EntryResVec {
entries: Vec<Result<SequencedEntry, WriteBufferError>>,
/// A list of Waker waiting for a new entry to be pushed
///
/// Note: this is a list because it is possible to create
/// two streams consuming from the same shared state
wait_list: Vec<Waker>,
}
@ -49,8 +52,8 @@ impl EntryResVec {
/// Register a waker to be notified when a new entry is pushed
pub fn register_waker(&mut self, waker: &Waker) {
for waker in &self.wait_list {
if waker.will_wake(waker) {
for wait_waker in &self.wait_list {
if wait_waker.will_wake(waker) {
return;
}
}