refactor: use SequencedStreamHandler

Removes the old stream_in_sequenced_entries() write buffer handler,
replacing it with the SequencedStreamHandler introduced in #4203.

This change will affect the metrics emitted by an ingester as outlined
in #4243.
pull/24376/head
Dom Dwyer 2022-04-07 12:02:02 +01:00
parent 71a278ac7e
commit dce939c580
4 changed files with 73 additions and 348 deletions

1
Cargo.lock generated
View File

@ -2308,7 +2308,6 @@ dependencies = [
"chrono",
"data_types2",
"datafusion 0.1.0",
"db",
"dml",
"futures",
"generated_types",

View File

@ -17,7 +17,6 @@ data_types2 = { path = "../data_types2" }
futures = "0.3"
generated_types = { path = "../generated_types" }
chrono = { version = "0.4", default-features = false }
db = { path = "../db" }
dml = { path = "../dml" }
hyper = "0.14"
iox_catalog = { path = "../iox_catalog" }

View File

@ -2,39 +2,33 @@
use crate::{
data::{IngesterData, IngesterQueryResponse, SequencerData},
lifecycle::{run_lifecycle_manager, LifecycleConfig, LifecycleHandle, LifecycleManager},
poison::{PoisonCabinet, PoisonPill},
lifecycle::{run_lifecycle_manager, LifecycleConfig, LifecycleManager},
poison::PoisonCabinet,
querier_handler::prepare_data_to_querier,
stream_handler::{
sink_adaptor::IngestSinkAdaptor, sink_instrumentation::SinkInstrumentation,
PeriodicWatermarkFetcher, SequencedStreamHandler,
},
};
use async_trait::async_trait;
use backoff::BackoffConfig;
use data_types2::{IngesterQueryRequest, KafkaPartition, KafkaTopic, Sequencer, SequencerId};
use db::write_buffer::metrics::{SequencerMetrics, WriteBufferIngestMetrics};
use dml::DmlOperation;
use data_types2::{IngesterQueryRequest, KafkaPartition, KafkaTopic, Sequencer};
use futures::{
future::{BoxFuture, Shared},
pin_mut,
stream::FuturesUnordered,
FutureExt, StreamExt, TryFutureExt,
};
use iox_catalog::interface::Catalog;
use metric::Attributes;
use metric::U64Counter;
use metric::U64Gauge;
use object_store::DynObjectStore;
use observability_deps::tracing::{debug, error, info, warn};
use observability_deps::tracing::*;
use query::exec::Executor;
use snafu::{ResultExt, Snafu};
use std::{
collections::BTreeMap,
sync::Arc,
time::{Duration, Instant},
};
use time::{SystemProvider, TimeProvider};
use std::{collections::BTreeMap, sync::Arc, time::Duration};
use time::SystemProvider;
use tokio::task::{JoinError, JoinHandle};
use tokio_util::sync::CancellationToken;
use trace::span::SpanRecorder;
use write_buffer::core::{WriteBufferReading, WriteBufferStreamHandler};
use write_buffer::core::WriteBufferReading;
use write_summary::SequencerProgress;
#[derive(Debug, Snafu)]
@ -113,9 +107,6 @@ pub struct IngestHandlerImpl {
/// A token that is used to trigger shutdown of the background worker
shutdown: CancellationToken,
/// Poison pills for testing.
poison_cabinet: Arc<PoisonCabinet>,
/// The cache and buffered data for the ingester
data: Arc<IngesterData>,
}
@ -132,7 +123,6 @@ impl IngestHandlerImpl {
write_buffer: Arc<dyn WriteBufferReading>,
exec: Arc<Executor>,
metric_registry: Arc<metric::Registry>,
time_provider: Arc<dyn TimeProvider>,
) -> Result<Self> {
// build the initial ingester data state
let mut sequencers = BTreeMap::new();
@ -152,7 +142,6 @@ impl IngestHandlerImpl {
let ingester_data = Arc::clone(&data);
let kafka_topic_name = topic.name.clone();
let ingest_metrics = WriteBufferIngestMetrics::new(&metric_registry, &topic.name);
// start the lifecycle manager
let persister = Arc::clone(&data);
@ -163,12 +152,11 @@ impl IngestHandlerImpl {
);
let lifecycle_handle = lifecycle_manager.handle();
let shutdown = CancellationToken::new();
let poison_cabinet = Arc::new(PoisonCabinet::new());
let handle = tokio::task::spawn(run_lifecycle_manager(
lifecycle_manager,
persister,
shutdown.clone(),
Arc::clone(&poison_cabinet),
Arc::new(PoisonCabinet::new()),
));
info!(
"ingester handler and lifecycle started with config {:?}",
@ -179,36 +167,62 @@ impl IngestHandlerImpl {
join_handles.push(("lifecycle manager".to_owned(), shared_handle(handle)));
for (kafka_partition, sequencer) in sequencer_states {
let worker_name = format!("stream handler for partition {}", kafka_partition.get());
let metrics = ingest_metrics.new_sequencer_metrics(kafka_partition.get() as u32);
let ingester_data = Arc::clone(&ingester_data);
let kafka_topic_name = kafka_topic_name.clone();
let metric_registry = Arc::clone(&metric_registry);
let mut stream_handler = write_buffer
// Acquire a write buffer stream and seek it to the last
// definitely-already-persisted op
let mut op_stream = write_buffer
.stream_handler(kafka_partition.get() as u32)
.await
.context(WriteBufferSnafu)?;
stream_handler
op_stream
.seek(sequencer.min_unpersisted_sequence_number as u64)
.await
.context(WriteBufferSnafu)?;
let handle = tokio::task::spawn(stream_in_sequenced_entries(
lifecycle_handle.clone(),
ingester_data,
sequencer.id,
kafka_topic_name,
kafka_partition,
// Initialise the DmlSink stack.
let watermark_fetcher = PeriodicWatermarkFetcher::new(
Arc::clone(&write_buffer),
stream_handler,
metrics,
metric_registry,
shutdown.clone(),
Arc::clone(&poison_cabinet),
Arc::clone(&time_provider),
));
sequencer.kafka_partition,
Duration::from_secs(10),
&*metric_registry,
);
// Wrap the IngesterData in a DmlSink adapter
let sink = IngestSinkAdaptor::new(
Arc::clone(&ingester_data),
lifecycle_handle.clone(),
sequencer.id,
);
// Emit metrics when ops flow through the sink
let sink = SinkInstrumentation::new(
sink,
watermark_fetcher,
kafka_topic_name.clone(),
sequencer.kafka_partition,
&*metric_registry,
);
// Spawn a task to stream in ops from the op_stream and push them
// into the sink
let handle = tokio::task::spawn({
let shutdown = shutdown.child_token();
let lifecycle_handle = lifecycle_handle.clone();
let kafka_topic_name = kafka_topic_name.clone();
async move {
let handler = SequencedStreamHandler::new(
op_stream.stream().await,
sink,
lifecycle_handle,
kafka_topic_name,
sequencer.kafka_partition,
&*metric_registry,
);
handler.run(shutdown).await
}
});
let worker_name = format!("stream handler for partition {}", kafka_partition.get());
join_handles.push((worker_name, shared_handle(handle)));
}
@ -217,7 +231,6 @@ impl IngestHandlerImpl {
kafka_topic: topic,
join_handles,
shutdown,
poison_cabinet,
})
}
}
@ -285,210 +298,22 @@ impl Drop for IngestHandlerImpl {
}
}
/// This is used to take entries from a `Stream` and put them in the
/// mutable buffer, such as streaming entries from a write buffer.
///
/// Note all errors reading / parsing / writing entries from the write
/// buffer are ignored.
#[allow(clippy::too_many_arguments)]
async fn stream_in_sequenced_entries(
lifecycle_manager: LifecycleHandle,
ingester_data: Arc<IngesterData>,
sequencer_id: SequencerId,
kafka_topic: String,
kafka_partition: KafkaPartition,
write_buffer: Arc<dyn WriteBufferReading>,
mut write_buffer_stream: Box<dyn WriteBufferStreamHandler>,
mut metrics: SequencerMetrics,
metric_registry: Arc<metric::Registry>,
shutdown: CancellationToken,
poison_cabinet: Arc<PoisonCabinet>,
time_provider: Arc<dyn TimeProvider>,
) {
let mut watermark_last_updated: Option<Instant> = None;
let mut watermark = 0_u64;
let mut stream = write_buffer_stream.stream().await;
let shutdown_cancelled = shutdown.cancelled().fuse();
let poison_wait_panic = poison_cabinet
.wait_for(PoisonPill::StreamPanic(kafka_partition))
.fuse();
let poison_wait_exit = poison_cabinet
.wait_for(PoisonPill::StreamExit(kafka_partition))
.fuse();
let pause_duration_ms = metric_registry
.register_metric::<U64Counter>(
"ingest_paused_duration_ms_total",
"Duration of time ingestion has been paused by the lifecycle manager in milliseconds",
)
.recorder(&[]);
// using the kafka partition number to keep metrics consistent with write buffer which calls
// the sequencer_id the kafka partition number.
let sequencer_id_attr = format!("{}", kafka_partition.get());
let attributes = Attributes::from([
("sequencer_id", sequencer_id_attr.into()),
("kafka_topic", kafka_topic.clone().into()),
]);
let time_to_be_readable_ms = metric_registry.register_metric::<U64Gauge>(
"ingester_ttbr_ms",
"Duration of time between producer writing to consumer putting into queryable cache in milliseconds",
).recorder(attributes);
pin_mut!(shutdown_cancelled);
pin_mut!(poison_wait_panic);
pin_mut!(poison_wait_exit);
while let Some(db_write_result) = futures::select!(next = stream.next().fuse() => next, _ = shutdown_cancelled => {
info!(
kafka_topic=kafka_topic.as_str(),
%kafka_partition,
"Stream handler shutdown",
);
return;
}, _ = poison_wait_panic => {
panic!("Stream {} poisened, panic", kafka_partition.get());
},_ = poison_wait_exit => {
error!("Stream {} poisened, exit early", kafka_partition.get());
return;
},
) {
if poison_cabinet.contains(&PoisonPill::StreamPanic(kafka_partition)) {
panic!("Stream {} poisened, panic", kafka_partition.get());
}
if poison_cabinet.contains(&PoisonPill::StreamExit(kafka_partition)) {
error!("Stream {} poisened, exit early", kafka_partition.get());
return;
}
if shutdown.is_cancelled() {
info!(
kafka_topic=kafka_topic.as_str(),
%kafka_partition,
"Stream handler shutdown",
);
return;
}
// maybe update sequencer watermark
// We are not updating this watermark every round because asking the sequencer for that watermark can be
// quite expensive.
let now = Instant::now();
if watermark_last_updated
.map(|ts| now.duration_since(ts) > Duration::from_secs(10))
.unwrap_or(true)
{
match write_buffer
.fetch_high_watermark(sequencer_id.get() as u32)
.await
{
Ok(w) => {
watermark = w;
}
// skip over invalid data in the write buffer so recovery can succeed
Err(e) => {
debug!(
%e,
%kafka_topic,
%kafka_partition,
"Error while reading sequencer watermark",
)
}
}
watermark_last_updated = Some(now);
}
let ingest_recorder = metrics.recorder(watermark);
// get entry from sequencer
let dml_operation: DmlOperation = match db_write_result {
Ok(db_write) => db_write,
// skip over invalid data in the write buffer so recovery can succeed
Err(e) => {
warn!(
%e,
%kafka_topic,
%kafka_partition,
"Error converting write buffer data to SequencedEntry",
);
continue;
}
};
let ingest_recorder = ingest_recorder.operation(&dml_operation);
// store entry
let mut span_recorder = SpanRecorder::new(
dml_operation
.meta()
.span_context()
.map(|parent| parent.child("IOx write buffer")),
);
let result = ingester_data
.buffer_operation(sequencer_id, dml_operation.clone(), &lifecycle_manager)
.await;
match result {
Ok(should_pause) => {
ingest_recorder.success();
span_recorder.ok("stored write");
if should_pause {
warn!(%sequencer_id, "pausing ingest until persistence has run");
while !lifecycle_manager.can_resume_ingest() {
tokio::time::sleep(INGEST_PAUSE_DELAY).await;
// Incrementally report on the sleeps (as opposed to
// measuring the start/end duration) in order to report
// a blocked ingester _before_ it recovers.
//
// While the actual sleep may be slightly longer than
// INGEST_PAUSE_DELAY, it's not likely to be a useful
// distinction in the metrics.
pause_duration_ms.inc(INGEST_PAUSE_DELAY.as_millis() as _);
}
info!(%sequencer_id, "resuming ingest");
}
if let Some(ts) = dml_operation.meta().producer_ts() {
if let Some(ttbr) = time_provider.now().checked_duration_since(ts) {
time_to_be_readable_ms.set(ttbr.as_millis() as u64);
}
}
}
Err(e) => {
// skip over invalid data in the write buffer so recovery can succeed
debug!(
%e,
%kafka_topic,
%sequencer_id,
"Error storing SequencedEntry from write buffer in ingester buffer"
);
span_recorder.error("cannot store write");
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::poison::PoisonPill;
use data_types2::{Namespace, NamespaceSchema, QueryPool, Sequence, SequenceNumber};
use dml::{DmlMeta, DmlWrite};
use iox_catalog::{mem::MemCatalog, validate_or_insert_schema};
use metric::{Attributes, Metric, U64Counter, U64Gauge};
use metric::{Attributes, Metric, U64Counter, U64Gauge, U64Histogram};
use mutable_batch_lp::lines_to_batches;
use object_store::ObjectStoreImpl;
use std::{num::NonZeroU32, ops::DerefMut};
use time::{MockProvider, Time};
use time::Time;
use write_buffer::mock::{MockBufferForReading, MockBufferSharedState};
#[tokio::test]
async fn read_from_write_buffer_write_to_mutable_buffer() {
let start_time = Time::from_timestamp_millis(1340);
let ingester = TestIngester::new(start_time).await;
let ingester = TestIngester::new().await;
let schema = NamespaceSchema::new(
ingester.namespace.id,
@ -576,23 +401,24 @@ mod tests {
let observation = ingester
.metrics
.get_instrument::<Metric<U64Counter>>("write_buffer_ingest_requests")
.get_instrument::<Metric<U64Histogram>>("ingester_op_apply_duration_ms")
.unwrap()
.get_observer(&Attributes::from(&[
("db_name", "whatevs"),
("kafka_topic", "whatevs"),
("sequencer_id", "0"),
("status", "ok"),
("result", "success"),
]))
.unwrap()
.fetch();
assert_eq!(observation, 3);
let hits = observation.buckets.iter().map(|b| b.count).sum::<u64>();
assert_eq!(hits, 3);
let observation = ingester
.metrics
.get_instrument::<Metric<U64Counter>>("write_buffer_read_bytes")
.unwrap()
.get_observer(&Attributes::from(&[
("db_name", "whatevs"),
("kafka_topic", "whatevs"),
("sequencer_id", "0"),
]))
.unwrap()
@ -604,7 +430,7 @@ mod tests {
.get_instrument::<Metric<U64Gauge>>("write_buffer_last_sequence_number")
.unwrap()
.get_observer(&Attributes::from(&[
("db_name", "whatevs"),
("kafka_topic", "whatevs"),
("sequencer_id", "0"),
]))
.unwrap()
@ -616,72 +442,29 @@ mod tests {
.get_instrument::<Metric<U64Gauge>>("write_buffer_sequence_number_lag")
.unwrap()
.get_observer(&Attributes::from(&[
("db_name", "whatevs"),
("kafka_topic", "whatevs"),
("sequencer_id", "0"),
]))
.unwrap()
.fetch();
assert_eq!(observation, 0);
let observation = ingester
.metrics
.get_instrument::<Metric<U64Gauge>>("write_buffer_last_min_ts")
.unwrap()
.get_observer(&Attributes::from(&[
("db_name", "whatevs"),
("sequencer_id", "0"),
]))
.unwrap()
.fetch();
assert_eq!(observation, 200);
let observation = ingester
.metrics
.get_instrument::<Metric<U64Gauge>>("write_buffer_last_max_ts")
.unwrap()
.get_observer(&Attributes::from(&[
("db_name", "whatevs"),
("sequencer_id", "0"),
]))
.unwrap()
.fetch();
assert_eq!(observation, 200);
let observation = ingester
.metrics
.get_instrument::<Metric<U64Gauge>>("write_buffer_last_ingest_ts")
.unwrap()
.get_observer(&Attributes::from(&[
("db_name", "whatevs"),
("sequencer_id", "0"),
]))
.unwrap()
.fetch();
assert_eq!(observation, ingest_ts2.timestamp_nanos() as u64);
let observation = ingester
.metrics
.get_instrument::<Metric<U64Gauge>>("ingester_ttbr_ms")
.unwrap()
.get_observer(&Attributes::from(&[
("kafka_topic", "whatevs"),
("sequencer_id", "0"),
]))
.unwrap()
.fetch();
assert_eq!(
observation,
(start_time.timestamp_nanos() as u64 - ingest_ts2.timestamp_nanos() as u64)
/ 1000
/ 1000
);
assert_eq!(observation, ingest_ts2.timestamp_nanos() as u64);
}
#[tokio::test]
async fn test_shutdown() {
let ingester = TestIngester::new(Time::from_timestamp_millis(10000))
.await
.ingester;
let ingester = TestIngester::new().await.ingester;
// does not exit w/o shutdown
tokio::select! {
@ -696,56 +479,6 @@ mod tests {
.unwrap();
}
#[tokio::test]
#[should_panic(expected = "Background worker 'lifecycle manager' exited early!")]
async fn test_supervise_lifecycle_manager_early_exit() {
let ingester = TestIngester::new(Time::from_timestamp_millis(10000))
.await
.ingester;
ingester.poison_cabinet.add(PoisonPill::LifecycleExit);
tokio::time::timeout(Duration::from_millis(1000), ingester.join())
.await
.unwrap();
}
#[tokio::test]
#[should_panic(expected = "JoinError::Panic")]
async fn test_supervise_lifecycle_manager_panic() {
let ingester = TestIngester::new(Time::from_timestamp_millis(10000))
.await
.ingester;
ingester.poison_cabinet.add(PoisonPill::LifecyclePanic);
tokio::time::timeout(Duration::from_millis(1000), ingester.join())
.await
.unwrap();
}
#[tokio::test]
#[should_panic(expected = "Background worker 'stream handler for partition 0' exited early!")]
async fn test_supervise_stream_early_exit() {
let ingester = TestIngester::new(Time::from_timestamp_millis(10000)).await;
ingester
.ingester
.poison_cabinet
.add(PoisonPill::StreamExit(ingester.kafka_partition));
tokio::time::timeout(Duration::from_millis(1000), ingester.ingester.join())
.await
.unwrap();
}
#[tokio::test]
#[should_panic(expected = "JoinError::Panic")]
async fn test_supervise_stream_panic() {
let ingester = TestIngester::new(Time::from_timestamp_millis(10000)).await;
ingester
.ingester
.poison_cabinet
.add(PoisonPill::StreamPanic(ingester.kafka_partition));
tokio::time::timeout(Duration::from_millis(1000), ingester.ingester.join())
.await
.unwrap();
}
#[tokio::test]
async fn seeks_on_initialization() {
let metrics: Arc<metric::Registry> = Default::default();
@ -820,7 +553,6 @@ mod tests {
reading,
Arc::new(Executor::new(1)),
Arc::clone(&metrics),
Arc::new(SystemProvider::new()),
)
.await
.unwrap();
@ -876,7 +608,7 @@ mod tests {
}
impl TestIngester {
async fn new(start_time: Time) -> Self {
async fn new() -> Self {
let metrics: Arc<metric::Registry> = Default::default();
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(Arc::clone(&metrics)));
@ -905,8 +637,6 @@ mod tests {
Arc::new(MockBufferForReading::new(write_buffer_state.clone(), None).unwrap());
let object_store = Arc::new(ObjectStoreImpl::new_in_memory());
let time_provider = Arc::new(MockProvider::new(start_time));
let lifecycle_config = LifecycleConfig::new(
1000000,
1000,
@ -923,7 +653,6 @@ mod tests {
reading,
Arc::new(Executor::new(1)),
Arc::clone(&metrics),
time_provider,
)
.await
.unwrap();

View File

@ -30,7 +30,6 @@ use ioxd_common::{
setup_builder,
};
use thiserror::Error;
use time::SystemProvider;
#[derive(Debug, Error)]
pub enum Error {
@ -192,7 +191,6 @@ pub async fn create_ingester_server_type(
write_buffer,
exec,
Arc::clone(&metric_registry),
Arc::new(SystemProvider::new()),
)
.await?,
);