From f54204548522b4fd974d0c99793da3319822b624 Mon Sep 17 00:00:00 2001 From: Paul Dix Date: Tue, 15 Feb 2022 19:13:40 -0500 Subject: [PATCH] feat: wire up persistence in ingester (#3685) This adds persistence into the ingester with a lifecycle manager. The persist operation must still be updated to keep track of the min_unpersisted_sequence_number for each sequencer. --- influxdb_iox/src/commands/run/ingester.rs | 10 + ingester/src/data.rs | 520 ++++++++++++++++++++-- ingester/src/handler.rs | 76 ++-- ingester/src/lifecycle.rs | 2 +- ingester/src/query.rs | 2 +- ingester/src/test_util.rs | 2 +- 6 files changed, 557 insertions(+), 55 deletions(-) diff --git a/influxdb_iox/src/commands/run/ingester.rs b/influxdb_iox/src/commands/run/ingester.rs index ed4acced83..68434298cd 100644 --- a/influxdb_iox/src/commands/run/ingester.rs +++ b/influxdb_iox/src/commands/run/ingester.rs @@ -20,6 +20,7 @@ use ingester::{ use iox_catalog::interface::KafkaPartition; use object_store::ObjectStore; use observability_deps::tracing::*; +use query::exec::Executor; use std::{collections::BTreeMap, convert::TryFrom, sync::Arc, time::Duration}; use thiserror::Error; @@ -134,6 +135,14 @@ pub struct Config { default_value = "1800" )] pub persist_partition_age_threshold_seconds: u64, + + /// Number of threads to use for the ingester query execution, compaction and persistence. + #[clap( + long = "--query-exec-thread-count", + env = "INFLUXDB_IOX_QUERY_EXEC_THREAD_COUNT", + default_value = "4" + )] + pub query_exect_thread_count: usize, } pub async fn command(config: Config) -> Result<()> { @@ -191,6 +200,7 @@ pub async fn command(config: Config) -> Result<()> { catalog, object_store, write_buffer, + Executor::new(config.query_exect_thread_count), &metric_registry, ) .await?, diff --git a/ingester/src/data.rs b/ingester/src/data.rs index 306962d1db..a658d4f812 100644 --- a/ingester/src/data.rs +++ b/ingester/src/data.rs @@ -1,5 +1,8 @@ //! Data for the lifecycle of the Ingester +use crate::compact::compact_persisting_batch; +use crate::lifecycle::LifecycleManager; +use crate::persist::persist; use arrow::record_batch::RecordBatch; use async_trait::async_trait; use chrono::{format::StrftimeItems, TimeZone, Utc}; @@ -7,19 +10,24 @@ use data_types::delete_predicate::DeletePredicate; use dml::DmlOperation; use generated_types::{google::FieldViolation, influxdata::iox::ingester::v1 as proto}; use iox_catalog::interface::{ - Catalog, KafkaPartition, NamespaceId, PartitionId, SequenceNumber, SequencerId, TableId, - Timestamp, Tombstone, + Catalog, KafkaPartition, NamespaceId, PartitionId, PartitionInfo, SequenceNumber, SequencerId, + TableId, Timestamp, Tombstone, }; use mutable_batch::column::ColumnData; use mutable_batch::MutableBatch; use object_store::ObjectStore; +use observability_deps::tracing::{error, warn}; use parking_lot::RwLock; use predicate::Predicate; +use query::exec::Executor; use schema::selection::Selection; use schema::TIME_COLUMN_NAME; use snafu::{OptionExt, ResultExt, Snafu}; use std::convert::TryFrom; +use std::ops::DerefMut; +use std::time::Duration; use std::{collections::BTreeMap, sync::Arc}; +use time::SystemProvider; use uuid::Uuid; #[derive(Debug, Snafu)] @@ -43,6 +51,9 @@ pub enum Error { #[snafu(display("Namespace {} not found in catalog", namespace))] NamespaceNotFound { namespace: String }, + #[snafu(display("Table {} not found in buffer", table_name))] + TableNotFound { table_name: String }, + #[snafu(display("Table must be specified in delete"))] TableNotPresent, @@ -68,8 +79,14 @@ pub enum Error { #[snafu(display("Error while filter columns from snapshot: {}", source))] FilterColumn { source: arrow::error::ArrowError }, + + #[snafu(display("Partition not found: {}", partition_id))] + PartitionNotFound { partition_id: PartitionId }, } +/// Time to wait to retry if there is some sort of network error with the catalog or object storage. +const RETRY_TIME: Duration = Duration::from_secs(1); + /// A specialized `Error` for Ingester Data errors pub type Result = std::result::Result; @@ -79,28 +96,38 @@ pub struct IngesterData { pub(crate) object_store: Arc, /// The global catalog for schema, parquet files and tombstones pub(crate) catalog: Arc, - // This map gets set up on initialization of the ingester so it won't ever be modified. - // The content of each SequenceData will get changed when more namespaces and tables - // get ingested. + /// This map gets set up on initialization of the ingester so it won't ever be modified. + /// The content of each SequenceData will get changed when more namespaces and tables + /// get ingested. pub(crate) sequencers: BTreeMap, + /// Executor for running queries and compacting and persisting + pub(crate) exec: Executor, } impl IngesterData { /// Store the write or delete in the in memory buffer. Deletes will /// be written into the catalog before getting stored in the buffer. /// Any writes that create new IOx partitions will have those records - /// created in the catalog before putting into the buffer. + /// created in the catalog before putting into the buffer. Writes will + /// get logged in the lifecycle manager. If it indicates ingest should + /// be paused, this function will return true. pub async fn buffer_operation( &self, sequencer_id: SequencerId, dml_operation: DmlOperation, - ) -> Result<()> { + lifecycle_manager: &LifecycleManager, + ) -> Result { let sequencer_data = self .sequencers .get(&sequencer_id) .context(SequencerNotFoundSnafu { sequencer_id })?; sequencer_data - .buffer_operation(dml_operation, sequencer_id, self.catalog.as_ref()) + .buffer_operation( + dml_operation, + sequencer_id, + self.catalog.as_ref(), + lifecycle_manager, + ) .await } } @@ -108,19 +135,156 @@ impl IngesterData { /// The Persister has a single function that will persist a given partition Id. It is expected /// that the persist function will retry forever until it succeeds. #[async_trait] -pub(crate) trait Persister: Send + Sync + 'static { +pub trait Persister: Send + Sync + 'static { + /// Persits the partition ID. Will retry forever until it succeeds. async fn persist(&self, partition_id: PartitionId); } #[async_trait] impl Persister for IngesterData { - async fn persist(&self, _partition_id: PartitionId) { - // lookup the TableData - // let persisting_batch = table_data.create_persisting_batch(partition.partition_key); - // do the persist with this persisting batch - // update the catalog - // table_data.clear_persisting_batch() (behind the scenes this will remove the persisting batch - // and if the partition is empty, remove it from the map in table_data) + async fn persist(&self, partition_id: PartitionId) { + let mut repos = self.catalog.repositories().await; + + // lookup the partition_info from the catalog + let partition_info: Option = loop { + match repos.partitions().partition_info_by_id(partition_id).await { + Ok(p) => break p, + Err(e) => { + warn!(%e, ?partition_id, "getting partition_info_by_id failed: retrying."); + tokio::time::sleep(RETRY_TIME).await; + } + } + }; + std::mem::drop(repos); + + // lookup the state from the ingester data. If something isn't found, it's unexpected. Crash + // so someone can take a look. + let partition_info = partition_info + .unwrap_or_else(|| panic!("partition {} not found in catalog", partition_id)); + let sequencer_data = self + .sequencers + .get(&partition_info.partition.sequencer_id) + .unwrap_or_else(|| { + panic!( + "sequencer state for {} not in ingester data", + partition_info.partition.sequencer_id + ) + }); //{ + let namespace = sequencer_data + .namespace(&partition_info.namespace_name) + .unwrap_or_else(|| { + panic!( + "namespace {} not in sequencer {} state", + partition_info.namespace_name, partition_info.partition.sequencer_id + ) + }); + let table_data = namespace + .table_data(&partition_info.table_name) + .unwrap_or_else(|| { + panic!( + "table {} for namespace {} not in sequencer {} state", + partition_info.table_name, + partition_info.namespace_name, + partition_info.partition.sequencer_id + ) + }); + let partition_data = table_data + .partition_data(&partition_info.partition.partition_key) + .unwrap_or_else(|| { + panic!( + "partition {} not in table {} for namespace {} in sequencer {} state", + partition_info.partition.partition_key, + partition_info.table_name, + partition_info.namespace_name, + partition_info.partition.sequencer_id + ) + }); + + // snapshot and make arc clones of the data. + let persisting_batch = partition_data.snapshot_to_persisting_batch( + partition_info.partition.sequencer_id, + partition_info.partition.table_id, + partition_info.partition.id, + &partition_info.table_name, + ); + + // do the CPU intensive work of compaction, de-duplication and sorting + let (record_batches, iox_meta) = match compact_persisting_batch( + Arc::new(SystemProvider::new()), + &self.exec, + namespace.namespace_id.get(), + &partition_info.namespace_name, + &partition_info.table_name, + &partition_info.partition.partition_key, + Arc::clone(&persisting_batch), + ) + .await + { + Err(e) => { + // this should never error out. if it does, we need to crash hard so + // someone can take a look. + panic!("unable to compact persisting batch with error: {:?}", e); + } + Ok(Some(r)) => r, + Ok(None) => { + warn!("persist called with no data"); + return; + } + }; + + // save the compacted data to a parquet file in object storage + loop { + match persist(&iox_meta, record_batches.to_vec(), &self.object_store).await { + Ok(_) => break, + Err(e) => { + warn!(%e, "persisting to object store failed: retrying."); + tokio::time::sleep(RETRY_TIME).await; + } + } + } + + // Commit the parquet file and tombstones to the catalog. This is pretty ugly because of all + // the failures that might happen where we just want to keep retrying it. + // TODO: clean this up when updating the min_sequence_number is added in. + let parquet_file = iox_meta.to_parquet_file(); + loop { + match self.catalog.start_transaction().await { + Ok(mut txn) => { + match iox_catalog::add_parquet_file_with_tombstones( + &parquet_file, + &persisting_batch.data.deletes, + txn.deref_mut(), + ) + .await + { + Ok(_) => match txn.commit().await { + Ok(_) => break, + Err(e) => { + error!(%e, "error commiting transaction to catalog"); + tokio::time::sleep(RETRY_TIME).await; + } + }, + Err(e) => { + error!(%e, "error from catalog adding parquet file and processed tombstones"); + if let Err(e) = txn.abort().await { + error!(%e, "error aborting failed transaction to add parquet file and tombstones"); + } + tokio::time::sleep(RETRY_TIME).await; + } + } + } + Err(e) => { + error!(%e, "error starting catalog transaction"); + tokio::time::sleep(RETRY_TIME).await; + } + } + } + + // and remove the persisted data from memory + namespace.mark_persisted_and_remove_if_empty( + &partition_info.table_name, + &partition_info.partition.partition_key, + ); } } @@ -141,7 +305,8 @@ impl SequencerData { dml_operation: DmlOperation, sequencer_id: SequencerId, catalog: &dyn Catalog, - ) -> Result<()> { + lifecycle_manager: &LifecycleManager, + ) -> Result { let namespace_data = match self.namespace(dml_operation.namespace()) { Some(d) => d, None => { @@ -151,7 +316,7 @@ impl SequencerData { }; namespace_data - .buffer_operation(dml_operation, sequencer_id, catalog) + .buffer_operation(dml_operation, sequencer_id, catalog, lifecycle_manager) .await } @@ -201,13 +366,15 @@ impl NamespaceData { } } - /// Buffer the operation in the cache, adding any new partitions or delete tombstones to the caatalog + /// Buffer the operation in the cache, adding any new partitions or delete tombstones to the catalog. + /// Returns true if ingest should be paused due to memory limits set in the passed lifecycle manager. pub async fn buffer_operation( &self, dml_operation: DmlOperation, sequencer_id: SequencerId, catalog: &dyn Catalog, - ) -> Result<()> { + lifecycle_manager: &LifecycleManager, + ) -> Result { let sequence_number = dml_operation .meta() .sequence() @@ -218,17 +385,27 @@ impl NamespaceData { match dml_operation { DmlOperation::Write(write) => { + let mut pause_writes = false; + for (t, b) in write.into_tables() { let table_data = match self.table_data(&t) { Some(t) => t, None => self.insert_table(&t, catalog).await?, }; - table_data - .buffer_table_write(sequence_number, b, sequencer_id, catalog) + let should_pause = table_data + .buffer_table_write( + sequence_number, + b, + sequencer_id, + catalog, + lifecycle_manager, + ) .await?; + + pause_writes = pause_writes || should_pause; } - Ok(()) + Ok(pause_writes) } DmlOperation::Delete(delete) => { let table_name = delete.table_name().context(TableNotPresentSnafu)?; @@ -239,7 +416,10 @@ impl NamespaceData { table_data .buffer_delete(delete.predicate(), sequencer_id, sequence_number, catalog) - .await + .await?; + + // don't pause writes since deletes don't count towards memory limits + Ok(false) } } } @@ -271,6 +451,31 @@ impl NamespaceData { Ok(data) } + + /// Walks down the table and partition and clears the persisting batch. If there is no + /// data buffered in the partition, it is removed. If there are no other partitions in + /// the table, it is removed. + fn mark_persisted_and_remove_if_empty(&self, table_name: &str, partition_key: &str) { + let mut tables = self.tables.write(); + let table = tables.get(table_name).cloned(); + + if let Some(t) = table { + let mut partitions = t.partition_data.write(); + let partition = partitions.get(partition_key).cloned(); + + if let Some(p) = partition { + let mut data = p.inner.write(); + data.persisting = None; + if data.is_empty() { + partitions.remove(partition_key); + } + } + + if partitions.is_empty() { + tables.remove(table_name); + } + } + } } /// Data of a Table in a given Namesapce that belongs to a given Shard @@ -289,13 +494,16 @@ impl TableData { } } + // buffers the table write and returns true if the lifecycle manager indicates that + // ingest should be paused. async fn buffer_table_write( &self, sequence_number: SequenceNumber, batch: MutableBatch, sequencer_id: SequencerId, catalog: &dyn Catalog, - ) -> Result<()> { + lifecycle_manager: &LifecycleManager, + ) -> Result { let (_, col) = batch .columns() .find(|(name, _)| *name == TIME_COLUMN_NAME) @@ -319,9 +527,10 @@ impl TableData { } }; + let should_pause = lifecycle_manager.log_write(partition_data.id, batch.size()); partition_data.buffer_write(sequence_number, batch); - Ok(()) + Ok(should_pause) } async fn buffer_delete( @@ -397,6 +606,26 @@ impl PartitionData { } } + /// Snapshot anything in the buffer and move all snapshot data into a persisting batch + pub fn snapshot_to_persisting_batch( + &self, + sequencer_id: SequencerId, + table_id: TableId, + partition_id: PartitionId, + table_name: &str, + ) -> Arc { + let mut data = self.inner.write(); + data.snapshot_to_persisting(sequencer_id, table_id, partition_id, table_name) + } + + /// Clears the persisting batch and returns true if there is no other data in the partition. + fn clear_persisting(&self) -> bool { + let mut d = self.inner.write(); + d.persisting = None; + + d.snapshots.is_empty() && d.buffer.is_empty() + } + /// Snapshot whatever is in the buffer and return a new vec of the /// arc cloned snapshots pub fn snapshot(&self) -> Result>> { @@ -511,6 +740,47 @@ impl DataBuffer { Ok(()) } + /// Returns true if there are no batches in the buffer or snapshots or persisting data + fn is_empty(&self) -> bool { + self.snapshots.is_empty() && self.buffer.is_empty() && self.persisting.is_none() + } + + /// Snapshots the buffer and moves snapshots over to the `PersistingBatch`. Returns error + /// if there is already a persisting batch. + pub fn snapshot_to_persisting( + &mut self, + sequencer_id: SequencerId, + table_id: TableId, + partition_id: PartitionId, + table_name: &str, + ) -> Arc { + if self.persisting.is_some() { + panic!("Unable to snapshot while persisting. This is an unexpected state.") + } + + self.snapshot() + .expect("This mutable batch snapshot error should be impossible."); + + let mut data = vec![]; + std::mem::swap(&mut data, &mut self.snapshots); + let mut deletes = vec![]; + std::mem::swap(&mut deletes, &mut self.deletes); + + let queryable_batch = QueryableBatch::new(table_name, data, deletes); + + let persisting_batch = Arc::new(PersistingBatch { + sequencer_id, + table_id, + partition_id, + object_store_id: Uuid::new_v4(), + data: Arc::new(queryable_batch), + }); + + self.persisting = Some(Arc::clone(&persisting_batch)); + + persisting_batch + } + /// Add a persiting batch into the buffer persisting list /// Note: For now, there is at most one persisting batch at a time but /// the plan is to process several of them a time as needed @@ -614,7 +884,7 @@ pub struct PersistingBatch { #[derive(Debug, PartialEq)] pub struct QueryableBatch { /// data - pub data: Vec, + pub data: Vec>, /// Tomstones to be applied on data pub deletes: Vec, @@ -678,9 +948,20 @@ pub struct QueryData {} #[cfg(test)] mod tests { use super::*; + use crate::lifecycle::LifecycleConfig; + use data_types::sequence::Sequence; use datafusion::logical_plan::col; + use dml::{DmlMeta, DmlWrite}; + use futures::TryStreamExt; + use iox_catalog::interface::NamespaceSchema; + use iox_catalog::mem::MemCatalog; + use iox_catalog::validate_or_insert_schema; + use mutable_batch_lp::lines_to_batches; use mutable_batch_lp::test_helpers::lp_to_mutable_batch; + use object_store::ObjectStoreApi; + use std::ops::DerefMut; use test_helpers::assert_error; + use time::Time; #[test] fn query_from_protobuf() { @@ -864,4 +1145,189 @@ mod tests { assert_eq!(data_buffer.buffer.len(), 2); assert!(data_buffer.snapshots.is_empty()); } + + #[tokio::test] + async fn buffer_write_updates_lifecycle_manager_indicates_pause() { + let catalog: Arc = Arc::new(MemCatalog::new()); + let mut repos = catalog.repositories().await; + let kafka_topic = repos.kafka_topics().create_or_get("whatevs").await.unwrap(); + let query_pool = repos.query_pools().create_or_get("whatevs").await.unwrap(); + let kafka_partition = KafkaPartition::new(0); + let namespace = repos + .namespaces() + .create("foo", "inf", kafka_topic.id, query_pool.id) + .await + .unwrap(); + let sequencer1 = repos + .sequencers() + .create_or_get(&kafka_topic, kafka_partition) + .await + .unwrap(); + + let mut sequencers = BTreeMap::new(); + sequencers.insert(sequencer1.id, SequencerData::default()); + + let object_store = Arc::new(ObjectStore::new_in_memory()); + + let data = Arc::new(IngesterData { + object_store: Arc::clone(&object_store), + catalog: Arc::clone(&catalog), + sequencers, + exec: Executor::new(1), + }); + + let schema = NamespaceSchema::new(namespace.id, kafka_topic.id, query_pool.id); + + let ignored_ts = Time::from_timestamp_millis(42); + + let w1 = DmlWrite::new( + "foo", + lines_to_batches("mem foo=1 10", 0).unwrap(), + DmlMeta::sequenced(Sequence::new(1, 1), ignored_ts, None, 50), + ); + let _ = validate_or_insert_schema(w1.tables(), &schema, repos.deref_mut()) + .await + .unwrap() + .unwrap(); + + std::mem::drop(repos); + + let pause_size = w1.size() + 1; + let manager = LifecycleManager::new( + LifecycleConfig::new(pause_size, 0, 0, Duration::from_secs(1)), + Arc::new(SystemProvider::new()), + ); + let should_pause = data + .buffer_operation(sequencer1.id, DmlOperation::Write(w1.clone()), &manager) + .await + .unwrap(); + assert!(!should_pause); + let should_pause = data + .buffer_operation(sequencer1.id, DmlOperation::Write(w1), &manager) + .await + .unwrap(); + assert!(should_pause); + } + + #[tokio::test] + async fn persist() { + let catalog: Arc = Arc::new(MemCatalog::new()); + let mut repos = catalog.repositories().await; + let kafka_topic = repos.kafka_topics().create_or_get("whatevs").await.unwrap(); + let query_pool = repos.query_pools().create_or_get("whatevs").await.unwrap(); + let kafka_partition = KafkaPartition::new(0); + let namespace = repos + .namespaces() + .create("foo", "inf", kafka_topic.id, query_pool.id) + .await + .unwrap(); + let sequencer1 = repos + .sequencers() + .create_or_get(&kafka_topic, kafka_partition) + .await + .unwrap(); + let sequencer2 = repos + .sequencers() + .create_or_get(&kafka_topic, kafka_partition) + .await + .unwrap(); + let mut sequencers = BTreeMap::new(); + sequencers.insert(sequencer1.id, SequencerData::default()); + sequencers.insert(sequencer2.id, SequencerData::default()); + + let object_store = Arc::new(ObjectStore::new_in_memory()); + + let data = Arc::new(IngesterData { + object_store: Arc::clone(&object_store), + catalog: Arc::clone(&catalog), + sequencers, + exec: Executor::new(1), + }); + + let schema = NamespaceSchema::new(namespace.id, kafka_topic.id, query_pool.id); + + let ignored_ts = Time::from_timestamp_millis(42); + + let w1 = DmlWrite::new( + "foo", + lines_to_batches("mem foo=1 10", 0).unwrap(), + DmlMeta::sequenced(Sequence::new(1, 1), ignored_ts, None, 50), + ); + let schema = validate_or_insert_schema(w1.tables(), &schema, repos.deref_mut()) + .await + .unwrap() + .unwrap(); + + let w2 = DmlWrite::new( + "foo", + lines_to_batches("cpu foo=1 10", 1).unwrap(), + DmlMeta::sequenced(Sequence::new(2, 1), ignored_ts, None, 50), + ); + let _ = validate_or_insert_schema(w2.tables(), &schema, repos.deref_mut()) + .await + .unwrap() + .unwrap(); + + let w3 = DmlWrite::new( + "foo", + lines_to_batches("mem foo=1 30", 2).unwrap(), + DmlMeta::sequenced(Sequence::new(1, 2), ignored_ts, None, 50), + ); + + // drop repos so the mem catalog won't deadlock. + std::mem::drop(repos); + let manager = LifecycleManager::new( + LifecycleConfig::new(1, 0, 0, Duration::from_secs(1)), + Arc::new(SystemProvider::new()), + ); + + data.buffer_operation(sequencer1.id, DmlOperation::Write(w1), &manager) + .await + .unwrap(); + data.buffer_operation(sequencer2.id, DmlOperation::Write(w2), &manager) + .await + .unwrap(); + data.buffer_operation(sequencer1.id, DmlOperation::Write(w3), &manager) + .await + .unwrap(); + + let sd = data.sequencers.get(&sequencer1.id).unwrap(); + let n = sd.namespace("foo").unwrap(); + let mem_table = n.table_data("mem").unwrap(); + assert!(n.table_data("cpu").is_some()); + + let p = mem_table.partition_data("1970-01-01").unwrap(); + data.persist(p.id).await; + + // verify that a file got put into object store + let file_paths: Vec<_> = object_store + .list(None) + .await + .unwrap() + .try_collect() + .await + .unwrap(); + assert_eq!(file_paths.len(), 1); + + let mut repos = catalog.repositories().await; + // verify it put the record in the catalog + let parquet_files = repos + .parquet_files() + .list_by_sequencer_greater_than(sequencer1.id, SequenceNumber::new(0)) + .await + .unwrap(); + assert_eq!(parquet_files.len(), 1); + let pf = parquet_files.first().unwrap(); + assert_eq!(pf.partition_id, p.id); + assert_eq!(pf.table_id, mem_table.table_id); + assert_eq!(pf.min_time, Timestamp::new(10)); + assert_eq!(pf.max_time, Timestamp::new(30)); + assert_eq!(pf.min_sequence_number, SequenceNumber::new(1)); + assert_eq!(pf.max_sequence_number, SequenceNumber::new(2)); + assert_eq!(pf.sequencer_id, sequencer1.id); + assert!(!pf.to_delete); + + // verify that the partition got removed from the table because it is now empty + assert!(mem_table.partition_data("1970-01-01").is_none()); + } } diff --git a/ingester/src/handler.rs b/ingester/src/handler.rs index 81c112c883..9bb3962fd4 100644 --- a/ingester/src/handler.rs +++ b/ingester/src/handler.rs @@ -8,7 +8,8 @@ use db::write_buffer::metrics::{SequencerMetrics, WriteBufferIngestMetrics}; use futures::StreamExt; use iox_catalog::interface::{Catalog, KafkaPartition, KafkaTopic, Sequencer, SequencerId}; use object_store::ObjectStore; -use observability_deps::tracing::{debug, warn}; +use observability_deps::tracing::{debug, info, warn}; +use query::exec::Executor; use snafu::{ResultExt, Snafu}; use std::collections::BTreeMap; use std::{ @@ -40,6 +41,11 @@ pub enum Error { }, } +/// When the lifecycle manager indicates that ingest should be paused because of +/// memory pressure, the sequencer will loop, sleeping this long before checking +/// with the manager if it can resume ingest. +const INGEST_PAUSE_DELAY: Duration = Duration::from_millis(100); + /// A specialized `Error` for Catalog errors pub type Result = std::result::Result; @@ -70,6 +76,7 @@ impl std::fmt::Debug for IngestHandlerImpl { impl IngestHandlerImpl { /// Initialize the Ingester + #[allow(clippy::too_many_arguments)] pub async fn new( lifecycle_config: LifecycleConfig, topic: KafkaTopic, @@ -77,6 +84,7 @@ impl IngestHandlerImpl { catalog: Arc, object_store: Arc, write_buffer: Arc, + exec: Executor, registry: &metric::Registry, ) -> Result { // build the initial ingester data state @@ -88,34 +96,13 @@ impl IngestHandlerImpl { object_store, catalog, sequencers, + exec, }); let ingester_data = Arc::clone(&data); let kafka_topic_name = topic.name.clone(); let ingest_metrics = WriteBufferIngestMetrics::new(registry, &topic.name); - let mut join_handles = Vec::with_capacity(sequencer_states.len()); - for (kafka_partition, sequencer) in sequencer_states { - let metrics = ingest_metrics.new_sequencer_metrics(kafka_partition.get() as u32); - let ingester_data = Arc::clone(&ingester_data); - let kafka_topic_name = kafka_topic_name.clone(); - - let stream_handler = write_buffer - .stream_handler(kafka_partition.get() as u32) - .await - .context(WriteBufferSnafu)?; - - join_handles.push(tokio::task::spawn(stream_in_sequenced_entries( - ingester_data, - sequencer.id, - kafka_topic_name, - kafka_partition, - Arc::clone(&write_buffer), - stream_handler, - metrics, - ))); - } - // start the lifecycle manager let persister = Arc::clone(&data); let lifecycle_manager = Arc::new(LifecycleManager::new( @@ -126,8 +113,36 @@ impl IngestHandlerImpl { let handle = tokio::task::spawn(async move { run_lifecycle_manager(manager, persister).await; }); + info!( + "ingester handler and lifecycle started with config {:?}", + lifecycle_config + ); + + let mut join_handles = Vec::with_capacity(sequencer_states.len()); join_handles.push(handle); + for (kafka_partition, sequencer) in sequencer_states { + let metrics = ingest_metrics.new_sequencer_metrics(kafka_partition.get() as u32); + let ingester_data = Arc::clone(&ingester_data); + let kafka_topic_name = kafka_topic_name.clone(); + + let stream_handler = write_buffer + .stream_handler(kafka_partition.get() as u32) + .await + .context(WriteBufferSnafu)?; + + join_handles.push(tokio::task::spawn(stream_in_sequenced_entries( + Arc::clone(&lifecycle_manager), + ingester_data, + sequencer.id, + kafka_topic_name, + kafka_partition, + Arc::clone(&write_buffer), + stream_handler, + metrics, + ))); + } + Ok(Self { data, kafka_topic: topic, @@ -156,7 +171,9 @@ impl Drop for IngestHandlerImpl { /// /// Note all errors reading / parsing / writing entries from the write /// buffer are ignored. +#[allow(clippy::too_many_arguments)] async fn stream_in_sequenced_entries( + lifecycle_manager: Arc, ingester_data: Arc, sequencer_id: SequencerId, kafka_topic: String, @@ -226,13 +243,21 @@ async fn stream_in_sequenced_entries( ); let result = ingester_data - .buffer_operation(sequencer_id, dml_operation.clone()) + .buffer_operation(sequencer_id, dml_operation.clone(), &lifecycle_manager) .await; match result { - Ok(_) => { + Ok(should_pause) => { ingest_recorder.success(); span_recorder.ok("stored write"); + + if should_pause { + warn!(%sequencer_id, "pausing ingest until persistence has run"); + while !lifecycle_manager.can_resume_ingest() { + tokio::time::sleep(INGEST_PAUSE_DELAY).await; + } + warn!(%sequencer_id, "resuming ingest"); + } } Err(e) => { // skip over invalid data in the write buffer so recovery can succeed @@ -323,6 +348,7 @@ mod tests { Arc::new(catalog), object_store, reading, + Executor::new(1), &metrics, ) .await diff --git a/ingester/src/lifecycle.rs b/ingester/src/lifecycle.rs index 87d388aab2..a97aaea8e2 100644 --- a/ingester/src/lifecycle.rs +++ b/ingester/src/lifecycle.rs @@ -16,7 +16,7 @@ use time::{Time, TimeProvider}; /// The lifecycle manager keeps track of the size and age of partitions across all sequencers. /// It triggers persistence based on keeping total memory usage around a set amount while /// ensuring that partitions don't get too old or large before being persisted. -pub(crate) struct LifecycleManager { +pub struct LifecycleManager { config: LifecycleConfig, time_provider: Arc, state: Mutex, diff --git a/ingester/src/query.rs b/ingester/src/query.rs index c5a18b0636..c4f714a85e 100644 --- a/ingester/src/query.rs +++ b/ingester/src/query.rs @@ -66,7 +66,7 @@ pub type Result = std::result::Result; impl QueryableBatch { /// Initilaize a QueryableBatch - pub fn new(table_name: &str, data: Vec, deletes: Vec) -> Self { + pub fn new(table_name: &str, data: Vec>, deletes: Vec) -> Self { let mut delete_predicates = vec![]; for delete in &deletes { let delete_predicate = Arc::new( diff --git a/ingester/src/test_util.rs b/ingester/src/test_util.rs index fe489b08d1..863a4bd22f 100644 --- a/ingester/src/test_util.rs +++ b/ingester/src/test_util.rs @@ -183,7 +183,7 @@ pub fn make_queryable_batch_with_deletes( let mut seq_num = seq_num_start; for batch in batches { let seq = SequenceNumber::new(seq_num); - snapshots.push(make_snapshot_batch(batch, seq, seq)); + snapshots.push(Arc::new(make_snapshot_batch(batch, seq, seq))); seq_num += 1; }