diff --git a/Cargo.lock b/Cargo.lock index 4fa9a6d7e9..799b3bead8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2308,7 +2308,6 @@ dependencies = [ "chrono", "data_types2", "datafusion 0.1.0", - "db", "dml", "futures", "generated_types", diff --git a/ingester/Cargo.toml b/ingester/Cargo.toml index 43b76a7f70..50fb60c668 100644 --- a/ingester/Cargo.toml +++ b/ingester/Cargo.toml @@ -17,7 +17,6 @@ data_types2 = { path = "../data_types2" } futures = "0.3" generated_types = { path = "../generated_types" } chrono = { version = "0.4", default-features = false } -db = { path = "../db" } dml = { path = "../dml" } hyper = "0.14" iox_catalog = { path = "../iox_catalog" } diff --git a/ingester/src/handler.rs b/ingester/src/handler.rs index 68f726ffa7..61475b6662 100644 --- a/ingester/src/handler.rs +++ b/ingester/src/handler.rs @@ -2,39 +2,33 @@ use crate::{ data::{IngesterData, IngesterQueryResponse, SequencerData}, - lifecycle::{run_lifecycle_manager, LifecycleConfig, LifecycleHandle, LifecycleManager}, - poison::{PoisonCabinet, PoisonPill}, + lifecycle::{run_lifecycle_manager, LifecycleConfig, LifecycleManager}, + poison::PoisonCabinet, querier_handler::prepare_data_to_querier, + stream_handler::{ + sink_adaptor::IngestSinkAdaptor, sink_instrumentation::SinkInstrumentation, + PeriodicWatermarkFetcher, SequencedStreamHandler, + }, }; use async_trait::async_trait; use backoff::BackoffConfig; -use data_types2::{IngesterQueryRequest, KafkaPartition, KafkaTopic, Sequencer, SequencerId}; -use db::write_buffer::metrics::{SequencerMetrics, WriteBufferIngestMetrics}; -use dml::DmlOperation; +use data_types2::{IngesterQueryRequest, KafkaPartition, KafkaTopic, Sequencer}; + use futures::{ future::{BoxFuture, Shared}, - pin_mut, stream::FuturesUnordered, FutureExt, StreamExt, TryFutureExt, }; use iox_catalog::interface::Catalog; -use metric::Attributes; -use metric::U64Counter; -use metric::U64Gauge; use object_store::DynObjectStore; -use observability_deps::tracing::{debug, error, info, warn}; +use observability_deps::tracing::*; use query::exec::Executor; use snafu::{ResultExt, Snafu}; -use std::{ - collections::BTreeMap, - sync::Arc, - time::{Duration, Instant}, -}; -use time::{SystemProvider, TimeProvider}; +use std::{collections::BTreeMap, sync::Arc, time::Duration}; +use time::SystemProvider; use tokio::task::{JoinError, JoinHandle}; use tokio_util::sync::CancellationToken; -use trace::span::SpanRecorder; -use write_buffer::core::{WriteBufferReading, WriteBufferStreamHandler}; +use write_buffer::core::WriteBufferReading; use write_summary::SequencerProgress; #[derive(Debug, Snafu)] @@ -113,9 +107,6 @@ pub struct IngestHandlerImpl { /// A token that is used to trigger shutdown of the background worker shutdown: CancellationToken, - /// Poison pills for testing. - poison_cabinet: Arc, - /// The cache and buffered data for the ingester data: Arc, } @@ -132,7 +123,6 @@ impl IngestHandlerImpl { write_buffer: Arc, exec: Arc, metric_registry: Arc, - time_provider: Arc, ) -> Result { // build the initial ingester data state let mut sequencers = BTreeMap::new(); @@ -152,7 +142,6 @@ impl IngestHandlerImpl { let ingester_data = Arc::clone(&data); let kafka_topic_name = topic.name.clone(); - let ingest_metrics = WriteBufferIngestMetrics::new(&metric_registry, &topic.name); // start the lifecycle manager let persister = Arc::clone(&data); @@ -163,12 +152,11 @@ impl IngestHandlerImpl { ); let lifecycle_handle = lifecycle_manager.handle(); let shutdown = CancellationToken::new(); - let poison_cabinet = Arc::new(PoisonCabinet::new()); let handle = tokio::task::spawn(run_lifecycle_manager( lifecycle_manager, persister, shutdown.clone(), - Arc::clone(&poison_cabinet), + Arc::new(PoisonCabinet::new()), )); info!( "ingester handler and lifecycle started with config {:?}", @@ -179,36 +167,62 @@ impl IngestHandlerImpl { join_handles.push(("lifecycle manager".to_owned(), shared_handle(handle))); for (kafka_partition, sequencer) in sequencer_states { - let worker_name = format!("stream handler for partition {}", kafka_partition.get()); - let metrics = ingest_metrics.new_sequencer_metrics(kafka_partition.get() as u32); - let ingester_data = Arc::clone(&ingester_data); - let kafka_topic_name = kafka_topic_name.clone(); let metric_registry = Arc::clone(&metric_registry); - let mut stream_handler = write_buffer + // Acquire a write buffer stream and seek it to the last + // definitely-already-persisted op + let mut op_stream = write_buffer .stream_handler(kafka_partition.get() as u32) .await .context(WriteBufferSnafu)?; - - stream_handler + op_stream .seek(sequencer.min_unpersisted_sequence_number as u64) .await .context(WriteBufferSnafu)?; - let handle = tokio::task::spawn(stream_in_sequenced_entries( - lifecycle_handle.clone(), - ingester_data, - sequencer.id, - kafka_topic_name, - kafka_partition, + // Initialise the DmlSink stack. + let watermark_fetcher = PeriodicWatermarkFetcher::new( Arc::clone(&write_buffer), - stream_handler, - metrics, - metric_registry, - shutdown.clone(), - Arc::clone(&poison_cabinet), - Arc::clone(&time_provider), - )); + sequencer.kafka_partition, + Duration::from_secs(10), + &*metric_registry, + ); + // Wrap the IngesterData in a DmlSink adapter + let sink = IngestSinkAdaptor::new( + Arc::clone(&ingester_data), + lifecycle_handle.clone(), + sequencer.id, + ); + // Emit metrics when ops flow through the sink + let sink = SinkInstrumentation::new( + sink, + watermark_fetcher, + kafka_topic_name.clone(), + sequencer.kafka_partition, + &*metric_registry, + ); + + // Spawn a task to stream in ops from the op_stream and push them + // into the sink + let handle = tokio::task::spawn({ + let shutdown = shutdown.child_token(); + let lifecycle_handle = lifecycle_handle.clone(); + let kafka_topic_name = kafka_topic_name.clone(); + async move { + let handler = SequencedStreamHandler::new( + op_stream.stream().await, + sink, + lifecycle_handle, + kafka_topic_name, + sequencer.kafka_partition, + &*metric_registry, + ); + + handler.run(shutdown).await + } + }); + + let worker_name = format!("stream handler for partition {}", kafka_partition.get()); join_handles.push((worker_name, shared_handle(handle))); } @@ -217,7 +231,6 @@ impl IngestHandlerImpl { kafka_topic: topic, join_handles, shutdown, - poison_cabinet, }) } } @@ -285,210 +298,22 @@ impl Drop for IngestHandlerImpl { } } -/// This is used to take entries from a `Stream` and put them in the -/// mutable buffer, such as streaming entries from a write buffer. -/// -/// Note all errors reading / parsing / writing entries from the write -/// buffer are ignored. -#[allow(clippy::too_many_arguments)] -async fn stream_in_sequenced_entries( - lifecycle_manager: LifecycleHandle, - ingester_data: Arc, - sequencer_id: SequencerId, - kafka_topic: String, - kafka_partition: KafkaPartition, - write_buffer: Arc, - mut write_buffer_stream: Box, - mut metrics: SequencerMetrics, - metric_registry: Arc, - shutdown: CancellationToken, - poison_cabinet: Arc, - time_provider: Arc, -) { - let mut watermark_last_updated: Option = None; - let mut watermark = 0_u64; - let mut stream = write_buffer_stream.stream().await; - - let shutdown_cancelled = shutdown.cancelled().fuse(); - let poison_wait_panic = poison_cabinet - .wait_for(PoisonPill::StreamPanic(kafka_partition)) - .fuse(); - let poison_wait_exit = poison_cabinet - .wait_for(PoisonPill::StreamExit(kafka_partition)) - .fuse(); - - let pause_duration_ms = metric_registry - .register_metric::( - "ingest_paused_duration_ms_total", - "Duration of time ingestion has been paused by the lifecycle manager in milliseconds", - ) - .recorder(&[]); - - // using the kafka partition number to keep metrics consistent with write buffer which calls - // the sequencer_id the kafka partition number. - let sequencer_id_attr = format!("{}", kafka_partition.get()); - let attributes = Attributes::from([ - ("sequencer_id", sequencer_id_attr.into()), - ("kafka_topic", kafka_topic.clone().into()), - ]); - let time_to_be_readable_ms = metric_registry.register_metric::( - "ingester_ttbr_ms", - "Duration of time between producer writing to consumer putting into queryable cache in milliseconds", - ).recorder(attributes); - - pin_mut!(shutdown_cancelled); - pin_mut!(poison_wait_panic); - pin_mut!(poison_wait_exit); - - while let Some(db_write_result) = futures::select!(next = stream.next().fuse() => next, _ = shutdown_cancelled => { - info!( - kafka_topic=kafka_topic.as_str(), - %kafka_partition, - "Stream handler shutdown", - ); - return; - }, _ = poison_wait_panic => { - panic!("Stream {} poisened, panic", kafka_partition.get()); - },_ = poison_wait_exit => { - error!("Stream {} poisened, exit early", kafka_partition.get()); - return; - }, - ) { - if poison_cabinet.contains(&PoisonPill::StreamPanic(kafka_partition)) { - panic!("Stream {} poisened, panic", kafka_partition.get()); - } - if poison_cabinet.contains(&PoisonPill::StreamExit(kafka_partition)) { - error!("Stream {} poisened, exit early", kafka_partition.get()); - return; - } - - if shutdown.is_cancelled() { - info!( - kafka_topic=kafka_topic.as_str(), - %kafka_partition, - "Stream handler shutdown", - ); - return; - } - - // maybe update sequencer watermark - // We are not updating this watermark every round because asking the sequencer for that watermark can be - // quite expensive. - let now = Instant::now(); - if watermark_last_updated - .map(|ts| now.duration_since(ts) > Duration::from_secs(10)) - .unwrap_or(true) - { - match write_buffer - .fetch_high_watermark(sequencer_id.get() as u32) - .await - { - Ok(w) => { - watermark = w; - } - // skip over invalid data in the write buffer so recovery can succeed - Err(e) => { - debug!( - %e, - %kafka_topic, - %kafka_partition, - "Error while reading sequencer watermark", - ) - } - } - watermark_last_updated = Some(now); - } - - let ingest_recorder = metrics.recorder(watermark); - - // get entry from sequencer - let dml_operation: DmlOperation = match db_write_result { - Ok(db_write) => db_write, - // skip over invalid data in the write buffer so recovery can succeed - Err(e) => { - warn!( - %e, - %kafka_topic, - %kafka_partition, - "Error converting write buffer data to SequencedEntry", - ); - continue; - } - }; - - let ingest_recorder = ingest_recorder.operation(&dml_operation); - - // store entry - let mut span_recorder = SpanRecorder::new( - dml_operation - .meta() - .span_context() - .map(|parent| parent.child("IOx write buffer")), - ); - - let result = ingester_data - .buffer_operation(sequencer_id, dml_operation.clone(), &lifecycle_manager) - .await; - - match result { - Ok(should_pause) => { - ingest_recorder.success(); - span_recorder.ok("stored write"); - - if should_pause { - warn!(%sequencer_id, "pausing ingest until persistence has run"); - while !lifecycle_manager.can_resume_ingest() { - tokio::time::sleep(INGEST_PAUSE_DELAY).await; - // Incrementally report on the sleeps (as opposed to - // measuring the start/end duration) in order to report - // a blocked ingester _before_ it recovers. - // - // While the actual sleep may be slightly longer than - // INGEST_PAUSE_DELAY, it's not likely to be a useful - // distinction in the metrics. - pause_duration_ms.inc(INGEST_PAUSE_DELAY.as_millis() as _); - } - info!(%sequencer_id, "resuming ingest"); - } - - if let Some(ts) = dml_operation.meta().producer_ts() { - if let Some(ttbr) = time_provider.now().checked_duration_since(ts) { - time_to_be_readable_ms.set(ttbr.as_millis() as u64); - } - } - } - Err(e) => { - // skip over invalid data in the write buffer so recovery can succeed - debug!( - %e, - %kafka_topic, - %sequencer_id, - "Error storing SequencedEntry from write buffer in ingester buffer" - ); - span_recorder.error("cannot store write"); - } - } - } -} - #[cfg(test)] mod tests { use super::*; - use crate::poison::PoisonPill; use data_types2::{Namespace, NamespaceSchema, QueryPool, Sequence, SequenceNumber}; use dml::{DmlMeta, DmlWrite}; use iox_catalog::{mem::MemCatalog, validate_or_insert_schema}; - use metric::{Attributes, Metric, U64Counter, U64Gauge}; + use metric::{Attributes, Metric, U64Counter, U64Gauge, U64Histogram}; use mutable_batch_lp::lines_to_batches; use object_store::ObjectStoreImpl; use std::{num::NonZeroU32, ops::DerefMut}; - use time::{MockProvider, Time}; + use time::Time; use write_buffer::mock::{MockBufferForReading, MockBufferSharedState}; #[tokio::test] async fn read_from_write_buffer_write_to_mutable_buffer() { - let start_time = Time::from_timestamp_millis(1340); - let ingester = TestIngester::new(start_time).await; + let ingester = TestIngester::new().await; let schema = NamespaceSchema::new( ingester.namespace.id, @@ -576,23 +401,24 @@ mod tests { let observation = ingester .metrics - .get_instrument::>("write_buffer_ingest_requests") + .get_instrument::>("ingester_op_apply_duration_ms") .unwrap() .get_observer(&Attributes::from(&[ - ("db_name", "whatevs"), + ("kafka_topic", "whatevs"), ("sequencer_id", "0"), - ("status", "ok"), + ("result", "success"), ])) .unwrap() .fetch(); - assert_eq!(observation, 3); + let hits = observation.buckets.iter().map(|b| b.count).sum::(); + assert_eq!(hits, 3); let observation = ingester .metrics .get_instrument::>("write_buffer_read_bytes") .unwrap() .get_observer(&Attributes::from(&[ - ("db_name", "whatevs"), + ("kafka_topic", "whatevs"), ("sequencer_id", "0"), ])) .unwrap() @@ -604,7 +430,7 @@ mod tests { .get_instrument::>("write_buffer_last_sequence_number") .unwrap() .get_observer(&Attributes::from(&[ - ("db_name", "whatevs"), + ("kafka_topic", "whatevs"), ("sequencer_id", "0"), ])) .unwrap() @@ -616,72 +442,29 @@ mod tests { .get_instrument::>("write_buffer_sequence_number_lag") .unwrap() .get_observer(&Attributes::from(&[ - ("db_name", "whatevs"), + ("kafka_topic", "whatevs"), ("sequencer_id", "0"), ])) .unwrap() .fetch(); assert_eq!(observation, 0); - let observation = ingester - .metrics - .get_instrument::>("write_buffer_last_min_ts") - .unwrap() - .get_observer(&Attributes::from(&[ - ("db_name", "whatevs"), - ("sequencer_id", "0"), - ])) - .unwrap() - .fetch(); - assert_eq!(observation, 200); - - let observation = ingester - .metrics - .get_instrument::>("write_buffer_last_max_ts") - .unwrap() - .get_observer(&Attributes::from(&[ - ("db_name", "whatevs"), - ("sequencer_id", "0"), - ])) - .unwrap() - .fetch(); - assert_eq!(observation, 200); - let observation = ingester .metrics .get_instrument::>("write_buffer_last_ingest_ts") .unwrap() - .get_observer(&Attributes::from(&[ - ("db_name", "whatevs"), - ("sequencer_id", "0"), - ])) - .unwrap() - .fetch(); - assert_eq!(observation, ingest_ts2.timestamp_nanos() as u64); - - let observation = ingester - .metrics - .get_instrument::>("ingester_ttbr_ms") - .unwrap() .get_observer(&Attributes::from(&[ ("kafka_topic", "whatevs"), ("sequencer_id", "0"), ])) .unwrap() .fetch(); - assert_eq!( - observation, - (start_time.timestamp_nanos() as u64 - ingest_ts2.timestamp_nanos() as u64) - / 1000 - / 1000 - ); + assert_eq!(observation, ingest_ts2.timestamp_nanos() as u64); } #[tokio::test] async fn test_shutdown() { - let ingester = TestIngester::new(Time::from_timestamp_millis(10000)) - .await - .ingester; + let ingester = TestIngester::new().await.ingester; // does not exit w/o shutdown tokio::select! { @@ -696,56 +479,6 @@ mod tests { .unwrap(); } - #[tokio::test] - #[should_panic(expected = "Background worker 'lifecycle manager' exited early!")] - async fn test_supervise_lifecycle_manager_early_exit() { - let ingester = TestIngester::new(Time::from_timestamp_millis(10000)) - .await - .ingester; - ingester.poison_cabinet.add(PoisonPill::LifecycleExit); - tokio::time::timeout(Duration::from_millis(1000), ingester.join()) - .await - .unwrap(); - } - - #[tokio::test] - #[should_panic(expected = "JoinError::Panic")] - async fn test_supervise_lifecycle_manager_panic() { - let ingester = TestIngester::new(Time::from_timestamp_millis(10000)) - .await - .ingester; - ingester.poison_cabinet.add(PoisonPill::LifecyclePanic); - tokio::time::timeout(Duration::from_millis(1000), ingester.join()) - .await - .unwrap(); - } - - #[tokio::test] - #[should_panic(expected = "Background worker 'stream handler for partition 0' exited early!")] - async fn test_supervise_stream_early_exit() { - let ingester = TestIngester::new(Time::from_timestamp_millis(10000)).await; - ingester - .ingester - .poison_cabinet - .add(PoisonPill::StreamExit(ingester.kafka_partition)); - tokio::time::timeout(Duration::from_millis(1000), ingester.ingester.join()) - .await - .unwrap(); - } - - #[tokio::test] - #[should_panic(expected = "JoinError::Panic")] - async fn test_supervise_stream_panic() { - let ingester = TestIngester::new(Time::from_timestamp_millis(10000)).await; - ingester - .ingester - .poison_cabinet - .add(PoisonPill::StreamPanic(ingester.kafka_partition)); - tokio::time::timeout(Duration::from_millis(1000), ingester.ingester.join()) - .await - .unwrap(); - } - #[tokio::test] async fn seeks_on_initialization() { let metrics: Arc = Default::default(); @@ -820,7 +553,6 @@ mod tests { reading, Arc::new(Executor::new(1)), Arc::clone(&metrics), - Arc::new(SystemProvider::new()), ) .await .unwrap(); @@ -876,7 +608,7 @@ mod tests { } impl TestIngester { - async fn new(start_time: Time) -> Self { + async fn new() -> Self { let metrics: Arc = Default::default(); let catalog: Arc = Arc::new(MemCatalog::new(Arc::clone(&metrics))); @@ -905,8 +637,6 @@ mod tests { Arc::new(MockBufferForReading::new(write_buffer_state.clone(), None).unwrap()); let object_store = Arc::new(ObjectStoreImpl::new_in_memory()); - let time_provider = Arc::new(MockProvider::new(start_time)); - let lifecycle_config = LifecycleConfig::new( 1000000, 1000, @@ -923,7 +653,6 @@ mod tests { reading, Arc::new(Executor::new(1)), Arc::clone(&metrics), - time_provider, ) .await .unwrap(); diff --git a/ioxd_ingester/src/lib.rs b/ioxd_ingester/src/lib.rs index 74b0c4e415..bcc4f534df 100644 --- a/ioxd_ingester/src/lib.rs +++ b/ioxd_ingester/src/lib.rs @@ -30,7 +30,6 @@ use ioxd_common::{ setup_builder, }; use thiserror::Error; -use time::SystemProvider; #[derive(Debug, Error)] pub enum Error { @@ -192,7 +191,6 @@ pub async fn create_ingester_server_type( write_buffer, exec, Arc::clone(&metric_registry), - Arc::new(SystemProvider::new()), ) .await?, );