feat: wire up persistence in ingester (#3685)

This adds persistence into the ingester with a lifecycle manager. The persist operation must still be updated to keep track of the min_unpersisted_sequence_number for each sequencer.
pull/24376/head
Paul Dix 2022-02-15 19:13:40 -05:00 committed by GitHub
parent 5be546173a
commit f542045485
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 557 additions and 55 deletions

View File

@ -20,6 +20,7 @@ use ingester::{
use iox_catalog::interface::KafkaPartition;
use object_store::ObjectStore;
use observability_deps::tracing::*;
use query::exec::Executor;
use std::{collections::BTreeMap, convert::TryFrom, sync::Arc, time::Duration};
use thiserror::Error;
@ -134,6 +135,14 @@ pub struct Config {
default_value = "1800"
)]
pub persist_partition_age_threshold_seconds: u64,
/// Number of threads to use for the ingester query execution, compaction and persistence.
#[clap(
long = "--query-exec-thread-count",
env = "INFLUXDB_IOX_QUERY_EXEC_THREAD_COUNT",
default_value = "4"
)]
pub query_exect_thread_count: usize,
}
pub async fn command(config: Config) -> Result<()> {
@ -191,6 +200,7 @@ pub async fn command(config: Config) -> Result<()> {
catalog,
object_store,
write_buffer,
Executor::new(config.query_exect_thread_count),
&metric_registry,
)
.await?,

View File

@ -1,5 +1,8 @@
//! Data for the lifecycle of the Ingester
use crate::compact::compact_persisting_batch;
use crate::lifecycle::LifecycleManager;
use crate::persist::persist;
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
use chrono::{format::StrftimeItems, TimeZone, Utc};
@ -7,19 +10,24 @@ use data_types::delete_predicate::DeletePredicate;
use dml::DmlOperation;
use generated_types::{google::FieldViolation, influxdata::iox::ingester::v1 as proto};
use iox_catalog::interface::{
Catalog, KafkaPartition, NamespaceId, PartitionId, SequenceNumber, SequencerId, TableId,
Timestamp, Tombstone,
Catalog, KafkaPartition, NamespaceId, PartitionId, PartitionInfo, SequenceNumber, SequencerId,
TableId, Timestamp, Tombstone,
};
use mutable_batch::column::ColumnData;
use mutable_batch::MutableBatch;
use object_store::ObjectStore;
use observability_deps::tracing::{error, warn};
use parking_lot::RwLock;
use predicate::Predicate;
use query::exec::Executor;
use schema::selection::Selection;
use schema::TIME_COLUMN_NAME;
use snafu::{OptionExt, ResultExt, Snafu};
use std::convert::TryFrom;
use std::ops::DerefMut;
use std::time::Duration;
use std::{collections::BTreeMap, sync::Arc};
use time::SystemProvider;
use uuid::Uuid;
#[derive(Debug, Snafu)]
@ -43,6 +51,9 @@ pub enum Error {
#[snafu(display("Namespace {} not found in catalog", namespace))]
NamespaceNotFound { namespace: String },
#[snafu(display("Table {} not found in buffer", table_name))]
TableNotFound { table_name: String },
#[snafu(display("Table must be specified in delete"))]
TableNotPresent,
@ -68,8 +79,14 @@ pub enum Error {
#[snafu(display("Error while filter columns from snapshot: {}", source))]
FilterColumn { source: arrow::error::ArrowError },
#[snafu(display("Partition not found: {}", partition_id))]
PartitionNotFound { partition_id: PartitionId },
}
/// Time to wait to retry if there is some sort of network error with the catalog or object storage.
const RETRY_TIME: Duration = Duration::from_secs(1);
/// A specialized `Error` for Ingester Data errors
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -79,28 +96,38 @@ pub struct IngesterData {
pub(crate) object_store: Arc<ObjectStore>,
/// The global catalog for schema, parquet files and tombstones
pub(crate) catalog: Arc<dyn Catalog>,
// This map gets set up on initialization of the ingester so it won't ever be modified.
// The content of each SequenceData will get changed when more namespaces and tables
// get ingested.
/// This map gets set up on initialization of the ingester so it won't ever be modified.
/// The content of each SequenceData will get changed when more namespaces and tables
/// get ingested.
pub(crate) sequencers: BTreeMap<SequencerId, SequencerData>,
/// Executor for running queries and compacting and persisting
pub(crate) exec: Executor,
}
impl IngesterData {
/// Store the write or delete in the in memory buffer. Deletes will
/// be written into the catalog before getting stored in the buffer.
/// Any writes that create new IOx partitions will have those records
/// created in the catalog before putting into the buffer.
/// created in the catalog before putting into the buffer. Writes will
/// get logged in the lifecycle manager. If it indicates ingest should
/// be paused, this function will return true.
pub async fn buffer_operation(
&self,
sequencer_id: SequencerId,
dml_operation: DmlOperation,
) -> Result<()> {
lifecycle_manager: &LifecycleManager,
) -> Result<bool> {
let sequencer_data = self
.sequencers
.get(&sequencer_id)
.context(SequencerNotFoundSnafu { sequencer_id })?;
sequencer_data
.buffer_operation(dml_operation, sequencer_id, self.catalog.as_ref())
.buffer_operation(
dml_operation,
sequencer_id,
self.catalog.as_ref(),
lifecycle_manager,
)
.await
}
}
@ -108,19 +135,156 @@ impl IngesterData {
/// The Persister has a single function that will persist a given partition Id. It is expected
/// that the persist function will retry forever until it succeeds.
#[async_trait]
pub(crate) trait Persister: Send + Sync + 'static {
pub trait Persister: Send + Sync + 'static {
/// Persits the partition ID. Will retry forever until it succeeds.
async fn persist(&self, partition_id: PartitionId);
}
#[async_trait]
impl Persister for IngesterData {
async fn persist(&self, _partition_id: PartitionId) {
// lookup the TableData
// let persisting_batch = table_data.create_persisting_batch(partition.partition_key);
// do the persist with this persisting batch
// update the catalog
// table_data.clear_persisting_batch() (behind the scenes this will remove the persisting batch
// and if the partition is empty, remove it from the map in table_data)
async fn persist(&self, partition_id: PartitionId) {
let mut repos = self.catalog.repositories().await;
// lookup the partition_info from the catalog
let partition_info: Option<PartitionInfo> = loop {
match repos.partitions().partition_info_by_id(partition_id).await {
Ok(p) => break p,
Err(e) => {
warn!(%e, ?partition_id, "getting partition_info_by_id failed: retrying.");
tokio::time::sleep(RETRY_TIME).await;
}
}
};
std::mem::drop(repos);
// lookup the state from the ingester data. If something isn't found, it's unexpected. Crash
// so someone can take a look.
let partition_info = partition_info
.unwrap_or_else(|| panic!("partition {} not found in catalog", partition_id));
let sequencer_data = self
.sequencers
.get(&partition_info.partition.sequencer_id)
.unwrap_or_else(|| {
panic!(
"sequencer state for {} not in ingester data",
partition_info.partition.sequencer_id
)
}); //{
let namespace = sequencer_data
.namespace(&partition_info.namespace_name)
.unwrap_or_else(|| {
panic!(
"namespace {} not in sequencer {} state",
partition_info.namespace_name, partition_info.partition.sequencer_id
)
});
let table_data = namespace
.table_data(&partition_info.table_name)
.unwrap_or_else(|| {
panic!(
"table {} for namespace {} not in sequencer {} state",
partition_info.table_name,
partition_info.namespace_name,
partition_info.partition.sequencer_id
)
});
let partition_data = table_data
.partition_data(&partition_info.partition.partition_key)
.unwrap_or_else(|| {
panic!(
"partition {} not in table {} for namespace {} in sequencer {} state",
partition_info.partition.partition_key,
partition_info.table_name,
partition_info.namespace_name,
partition_info.partition.sequencer_id
)
});
// snapshot and make arc clones of the data.
let persisting_batch = partition_data.snapshot_to_persisting_batch(
partition_info.partition.sequencer_id,
partition_info.partition.table_id,
partition_info.partition.id,
&partition_info.table_name,
);
// do the CPU intensive work of compaction, de-duplication and sorting
let (record_batches, iox_meta) = match compact_persisting_batch(
Arc::new(SystemProvider::new()),
&self.exec,
namespace.namespace_id.get(),
&partition_info.namespace_name,
&partition_info.table_name,
&partition_info.partition.partition_key,
Arc::clone(&persisting_batch),
)
.await
{
Err(e) => {
// this should never error out. if it does, we need to crash hard so
// someone can take a look.
panic!("unable to compact persisting batch with error: {:?}", e);
}
Ok(Some(r)) => r,
Ok(None) => {
warn!("persist called with no data");
return;
}
};
// save the compacted data to a parquet file in object storage
loop {
match persist(&iox_meta, record_batches.to_vec(), &self.object_store).await {
Ok(_) => break,
Err(e) => {
warn!(%e, "persisting to object store failed: retrying.");
tokio::time::sleep(RETRY_TIME).await;
}
}
}
// Commit the parquet file and tombstones to the catalog. This is pretty ugly because of all
// the failures that might happen where we just want to keep retrying it.
// TODO: clean this up when updating the min_sequence_number is added in.
let parquet_file = iox_meta.to_parquet_file();
loop {
match self.catalog.start_transaction().await {
Ok(mut txn) => {
match iox_catalog::add_parquet_file_with_tombstones(
&parquet_file,
&persisting_batch.data.deletes,
txn.deref_mut(),
)
.await
{
Ok(_) => match txn.commit().await {
Ok(_) => break,
Err(e) => {
error!(%e, "error commiting transaction to catalog");
tokio::time::sleep(RETRY_TIME).await;
}
},
Err(e) => {
error!(%e, "error from catalog adding parquet file and processed tombstones");
if let Err(e) = txn.abort().await {
error!(%e, "error aborting failed transaction to add parquet file and tombstones");
}
tokio::time::sleep(RETRY_TIME).await;
}
}
}
Err(e) => {
error!(%e, "error starting catalog transaction");
tokio::time::sleep(RETRY_TIME).await;
}
}
}
// and remove the persisted data from memory
namespace.mark_persisted_and_remove_if_empty(
&partition_info.table_name,
&partition_info.partition.partition_key,
);
}
}
@ -141,7 +305,8 @@ impl SequencerData {
dml_operation: DmlOperation,
sequencer_id: SequencerId,
catalog: &dyn Catalog,
) -> Result<()> {
lifecycle_manager: &LifecycleManager,
) -> Result<bool> {
let namespace_data = match self.namespace(dml_operation.namespace()) {
Some(d) => d,
None => {
@ -151,7 +316,7 @@ impl SequencerData {
};
namespace_data
.buffer_operation(dml_operation, sequencer_id, catalog)
.buffer_operation(dml_operation, sequencer_id, catalog, lifecycle_manager)
.await
}
@ -201,13 +366,15 @@ impl NamespaceData {
}
}
/// Buffer the operation in the cache, adding any new partitions or delete tombstones to the caatalog
/// Buffer the operation in the cache, adding any new partitions or delete tombstones to the catalog.
/// Returns true if ingest should be paused due to memory limits set in the passed lifecycle manager.
pub async fn buffer_operation(
&self,
dml_operation: DmlOperation,
sequencer_id: SequencerId,
catalog: &dyn Catalog,
) -> Result<()> {
lifecycle_manager: &LifecycleManager,
) -> Result<bool> {
let sequence_number = dml_operation
.meta()
.sequence()
@ -218,17 +385,27 @@ impl NamespaceData {
match dml_operation {
DmlOperation::Write(write) => {
let mut pause_writes = false;
for (t, b) in write.into_tables() {
let table_data = match self.table_data(&t) {
Some(t) => t,
None => self.insert_table(&t, catalog).await?,
};
table_data
.buffer_table_write(sequence_number, b, sequencer_id, catalog)
let should_pause = table_data
.buffer_table_write(
sequence_number,
b,
sequencer_id,
catalog,
lifecycle_manager,
)
.await?;
pause_writes = pause_writes || should_pause;
}
Ok(())
Ok(pause_writes)
}
DmlOperation::Delete(delete) => {
let table_name = delete.table_name().context(TableNotPresentSnafu)?;
@ -239,7 +416,10 @@ impl NamespaceData {
table_data
.buffer_delete(delete.predicate(), sequencer_id, sequence_number, catalog)
.await
.await?;
// don't pause writes since deletes don't count towards memory limits
Ok(false)
}
}
}
@ -271,6 +451,31 @@ impl NamespaceData {
Ok(data)
}
/// Walks down the table and partition and clears the persisting batch. If there is no
/// data buffered in the partition, it is removed. If there are no other partitions in
/// the table, it is removed.
fn mark_persisted_and_remove_if_empty(&self, table_name: &str, partition_key: &str) {
let mut tables = self.tables.write();
let table = tables.get(table_name).cloned();
if let Some(t) = table {
let mut partitions = t.partition_data.write();
let partition = partitions.get(partition_key).cloned();
if let Some(p) = partition {
let mut data = p.inner.write();
data.persisting = None;
if data.is_empty() {
partitions.remove(partition_key);
}
}
if partitions.is_empty() {
tables.remove(table_name);
}
}
}
}
/// Data of a Table in a given Namesapce that belongs to a given Shard
@ -289,13 +494,16 @@ impl TableData {
}
}
// buffers the table write and returns true if the lifecycle manager indicates that
// ingest should be paused.
async fn buffer_table_write(
&self,
sequence_number: SequenceNumber,
batch: MutableBatch,
sequencer_id: SequencerId,
catalog: &dyn Catalog,
) -> Result<()> {
lifecycle_manager: &LifecycleManager,
) -> Result<bool> {
let (_, col) = batch
.columns()
.find(|(name, _)| *name == TIME_COLUMN_NAME)
@ -319,9 +527,10 @@ impl TableData {
}
};
let should_pause = lifecycle_manager.log_write(partition_data.id, batch.size());
partition_data.buffer_write(sequence_number, batch);
Ok(())
Ok(should_pause)
}
async fn buffer_delete(
@ -397,6 +606,26 @@ impl PartitionData {
}
}
/// Snapshot anything in the buffer and move all snapshot data into a persisting batch
pub fn snapshot_to_persisting_batch(
&self,
sequencer_id: SequencerId,
table_id: TableId,
partition_id: PartitionId,
table_name: &str,
) -> Arc<PersistingBatch> {
let mut data = self.inner.write();
data.snapshot_to_persisting(sequencer_id, table_id, partition_id, table_name)
}
/// Clears the persisting batch and returns true if there is no other data in the partition.
fn clear_persisting(&self) -> bool {
let mut d = self.inner.write();
d.persisting = None;
d.snapshots.is_empty() && d.buffer.is_empty()
}
/// Snapshot whatever is in the buffer and return a new vec of the
/// arc cloned snapshots
pub fn snapshot(&self) -> Result<Vec<Arc<SnapshotBatch>>> {
@ -511,6 +740,47 @@ impl DataBuffer {
Ok(())
}
/// Returns true if there are no batches in the buffer or snapshots or persisting data
fn is_empty(&self) -> bool {
self.snapshots.is_empty() && self.buffer.is_empty() && self.persisting.is_none()
}
/// Snapshots the buffer and moves snapshots over to the `PersistingBatch`. Returns error
/// if there is already a persisting batch.
pub fn snapshot_to_persisting(
&mut self,
sequencer_id: SequencerId,
table_id: TableId,
partition_id: PartitionId,
table_name: &str,
) -> Arc<PersistingBatch> {
if self.persisting.is_some() {
panic!("Unable to snapshot while persisting. This is an unexpected state.")
}
self.snapshot()
.expect("This mutable batch snapshot error should be impossible.");
let mut data = vec![];
std::mem::swap(&mut data, &mut self.snapshots);
let mut deletes = vec![];
std::mem::swap(&mut deletes, &mut self.deletes);
let queryable_batch = QueryableBatch::new(table_name, data, deletes);
let persisting_batch = Arc::new(PersistingBatch {
sequencer_id,
table_id,
partition_id,
object_store_id: Uuid::new_v4(),
data: Arc::new(queryable_batch),
});
self.persisting = Some(Arc::clone(&persisting_batch));
persisting_batch
}
/// Add a persiting batch into the buffer persisting list
/// Note: For now, there is at most one persisting batch at a time but
/// the plan is to process several of them a time as needed
@ -614,7 +884,7 @@ pub struct PersistingBatch {
#[derive(Debug, PartialEq)]
pub struct QueryableBatch {
/// data
pub data: Vec<SnapshotBatch>,
pub data: Vec<Arc<SnapshotBatch>>,
/// Tomstones to be applied on data
pub deletes: Vec<Tombstone>,
@ -678,9 +948,20 @@ pub struct QueryData {}
#[cfg(test)]
mod tests {
use super::*;
use crate::lifecycle::LifecycleConfig;
use data_types::sequence::Sequence;
use datafusion::logical_plan::col;
use dml::{DmlMeta, DmlWrite};
use futures::TryStreamExt;
use iox_catalog::interface::NamespaceSchema;
use iox_catalog::mem::MemCatalog;
use iox_catalog::validate_or_insert_schema;
use mutable_batch_lp::lines_to_batches;
use mutable_batch_lp::test_helpers::lp_to_mutable_batch;
use object_store::ObjectStoreApi;
use std::ops::DerefMut;
use test_helpers::assert_error;
use time::Time;
#[test]
fn query_from_protobuf() {
@ -864,4 +1145,189 @@ mod tests {
assert_eq!(data_buffer.buffer.len(), 2);
assert!(data_buffer.snapshots.is_empty());
}
#[tokio::test]
async fn buffer_write_updates_lifecycle_manager_indicates_pause() {
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new());
let mut repos = catalog.repositories().await;
let kafka_topic = repos.kafka_topics().create_or_get("whatevs").await.unwrap();
let query_pool = repos.query_pools().create_or_get("whatevs").await.unwrap();
let kafka_partition = KafkaPartition::new(0);
let namespace = repos
.namespaces()
.create("foo", "inf", kafka_topic.id, query_pool.id)
.await
.unwrap();
let sequencer1 = repos
.sequencers()
.create_or_get(&kafka_topic, kafka_partition)
.await
.unwrap();
let mut sequencers = BTreeMap::new();
sequencers.insert(sequencer1.id, SequencerData::default());
let object_store = Arc::new(ObjectStore::new_in_memory());
let data = Arc::new(IngesterData {
object_store: Arc::clone(&object_store),
catalog: Arc::clone(&catalog),
sequencers,
exec: Executor::new(1),
});
let schema = NamespaceSchema::new(namespace.id, kafka_topic.id, query_pool.id);
let ignored_ts = Time::from_timestamp_millis(42);
let w1 = DmlWrite::new(
"foo",
lines_to_batches("mem foo=1 10", 0).unwrap(),
DmlMeta::sequenced(Sequence::new(1, 1), ignored_ts, None, 50),
);
let _ = validate_or_insert_schema(w1.tables(), &schema, repos.deref_mut())
.await
.unwrap()
.unwrap();
std::mem::drop(repos);
let pause_size = w1.size() + 1;
let manager = LifecycleManager::new(
LifecycleConfig::new(pause_size, 0, 0, Duration::from_secs(1)),
Arc::new(SystemProvider::new()),
);
let should_pause = data
.buffer_operation(sequencer1.id, DmlOperation::Write(w1.clone()), &manager)
.await
.unwrap();
assert!(!should_pause);
let should_pause = data
.buffer_operation(sequencer1.id, DmlOperation::Write(w1), &manager)
.await
.unwrap();
assert!(should_pause);
}
#[tokio::test]
async fn persist() {
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new());
let mut repos = catalog.repositories().await;
let kafka_topic = repos.kafka_topics().create_or_get("whatevs").await.unwrap();
let query_pool = repos.query_pools().create_or_get("whatevs").await.unwrap();
let kafka_partition = KafkaPartition::new(0);
let namespace = repos
.namespaces()
.create("foo", "inf", kafka_topic.id, query_pool.id)
.await
.unwrap();
let sequencer1 = repos
.sequencers()
.create_or_get(&kafka_topic, kafka_partition)
.await
.unwrap();
let sequencer2 = repos
.sequencers()
.create_or_get(&kafka_topic, kafka_partition)
.await
.unwrap();
let mut sequencers = BTreeMap::new();
sequencers.insert(sequencer1.id, SequencerData::default());
sequencers.insert(sequencer2.id, SequencerData::default());
let object_store = Arc::new(ObjectStore::new_in_memory());
let data = Arc::new(IngesterData {
object_store: Arc::clone(&object_store),
catalog: Arc::clone(&catalog),
sequencers,
exec: Executor::new(1),
});
let schema = NamespaceSchema::new(namespace.id, kafka_topic.id, query_pool.id);
let ignored_ts = Time::from_timestamp_millis(42);
let w1 = DmlWrite::new(
"foo",
lines_to_batches("mem foo=1 10", 0).unwrap(),
DmlMeta::sequenced(Sequence::new(1, 1), ignored_ts, None, 50),
);
let schema = validate_or_insert_schema(w1.tables(), &schema, repos.deref_mut())
.await
.unwrap()
.unwrap();
let w2 = DmlWrite::new(
"foo",
lines_to_batches("cpu foo=1 10", 1).unwrap(),
DmlMeta::sequenced(Sequence::new(2, 1), ignored_ts, None, 50),
);
let _ = validate_or_insert_schema(w2.tables(), &schema, repos.deref_mut())
.await
.unwrap()
.unwrap();
let w3 = DmlWrite::new(
"foo",
lines_to_batches("mem foo=1 30", 2).unwrap(),
DmlMeta::sequenced(Sequence::new(1, 2), ignored_ts, None, 50),
);
// drop repos so the mem catalog won't deadlock.
std::mem::drop(repos);
let manager = LifecycleManager::new(
LifecycleConfig::new(1, 0, 0, Duration::from_secs(1)),
Arc::new(SystemProvider::new()),
);
data.buffer_operation(sequencer1.id, DmlOperation::Write(w1), &manager)
.await
.unwrap();
data.buffer_operation(sequencer2.id, DmlOperation::Write(w2), &manager)
.await
.unwrap();
data.buffer_operation(sequencer1.id, DmlOperation::Write(w3), &manager)
.await
.unwrap();
let sd = data.sequencers.get(&sequencer1.id).unwrap();
let n = sd.namespace("foo").unwrap();
let mem_table = n.table_data("mem").unwrap();
assert!(n.table_data("cpu").is_some());
let p = mem_table.partition_data("1970-01-01").unwrap();
data.persist(p.id).await;
// verify that a file got put into object store
let file_paths: Vec<_> = object_store
.list(None)
.await
.unwrap()
.try_collect()
.await
.unwrap();
assert_eq!(file_paths.len(), 1);
let mut repos = catalog.repositories().await;
// verify it put the record in the catalog
let parquet_files = repos
.parquet_files()
.list_by_sequencer_greater_than(sequencer1.id, SequenceNumber::new(0))
.await
.unwrap();
assert_eq!(parquet_files.len(), 1);
let pf = parquet_files.first().unwrap();
assert_eq!(pf.partition_id, p.id);
assert_eq!(pf.table_id, mem_table.table_id);
assert_eq!(pf.min_time, Timestamp::new(10));
assert_eq!(pf.max_time, Timestamp::new(30));
assert_eq!(pf.min_sequence_number, SequenceNumber::new(1));
assert_eq!(pf.max_sequence_number, SequenceNumber::new(2));
assert_eq!(pf.sequencer_id, sequencer1.id);
assert!(!pf.to_delete);
// verify that the partition got removed from the table because it is now empty
assert!(mem_table.partition_data("1970-01-01").is_none());
}
}

View File

@ -8,7 +8,8 @@ use db::write_buffer::metrics::{SequencerMetrics, WriteBufferIngestMetrics};
use futures::StreamExt;
use iox_catalog::interface::{Catalog, KafkaPartition, KafkaTopic, Sequencer, SequencerId};
use object_store::ObjectStore;
use observability_deps::tracing::{debug, warn};
use observability_deps::tracing::{debug, info, warn};
use query::exec::Executor;
use snafu::{ResultExt, Snafu};
use std::collections::BTreeMap;
use std::{
@ -40,6 +41,11 @@ pub enum Error {
},
}
/// When the lifecycle manager indicates that ingest should be paused because of
/// memory pressure, the sequencer will loop, sleeping this long before checking
/// with the manager if it can resume ingest.
const INGEST_PAUSE_DELAY: Duration = Duration::from_millis(100);
/// A specialized `Error` for Catalog errors
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -70,6 +76,7 @@ impl std::fmt::Debug for IngestHandlerImpl {
impl IngestHandlerImpl {
/// Initialize the Ingester
#[allow(clippy::too_many_arguments)]
pub async fn new(
lifecycle_config: LifecycleConfig,
topic: KafkaTopic,
@ -77,6 +84,7 @@ impl IngestHandlerImpl {
catalog: Arc<dyn Catalog>,
object_store: Arc<ObjectStore>,
write_buffer: Arc<dyn WriteBufferReading>,
exec: Executor,
registry: &metric::Registry,
) -> Result<Self> {
// build the initial ingester data state
@ -88,34 +96,13 @@ impl IngestHandlerImpl {
object_store,
catalog,
sequencers,
exec,
});
let ingester_data = Arc::clone(&data);
let kafka_topic_name = topic.name.clone();
let ingest_metrics = WriteBufferIngestMetrics::new(registry, &topic.name);
let mut join_handles = Vec::with_capacity(sequencer_states.len());
for (kafka_partition, sequencer) in sequencer_states {
let metrics = ingest_metrics.new_sequencer_metrics(kafka_partition.get() as u32);
let ingester_data = Arc::clone(&ingester_data);
let kafka_topic_name = kafka_topic_name.clone();
let stream_handler = write_buffer
.stream_handler(kafka_partition.get() as u32)
.await
.context(WriteBufferSnafu)?;
join_handles.push(tokio::task::spawn(stream_in_sequenced_entries(
ingester_data,
sequencer.id,
kafka_topic_name,
kafka_partition,
Arc::clone(&write_buffer),
stream_handler,
metrics,
)));
}
// start the lifecycle manager
let persister = Arc::clone(&data);
let lifecycle_manager = Arc::new(LifecycleManager::new(
@ -126,8 +113,36 @@ impl IngestHandlerImpl {
let handle = tokio::task::spawn(async move {
run_lifecycle_manager(manager, persister).await;
});
info!(
"ingester handler and lifecycle started with config {:?}",
lifecycle_config
);
let mut join_handles = Vec::with_capacity(sequencer_states.len());
join_handles.push(handle);
for (kafka_partition, sequencer) in sequencer_states {
let metrics = ingest_metrics.new_sequencer_metrics(kafka_partition.get() as u32);
let ingester_data = Arc::clone(&ingester_data);
let kafka_topic_name = kafka_topic_name.clone();
let stream_handler = write_buffer
.stream_handler(kafka_partition.get() as u32)
.await
.context(WriteBufferSnafu)?;
join_handles.push(tokio::task::spawn(stream_in_sequenced_entries(
Arc::clone(&lifecycle_manager),
ingester_data,
sequencer.id,
kafka_topic_name,
kafka_partition,
Arc::clone(&write_buffer),
stream_handler,
metrics,
)));
}
Ok(Self {
data,
kafka_topic: topic,
@ -156,7 +171,9 @@ impl Drop for IngestHandlerImpl {
///
/// Note all errors reading / parsing / writing entries from the write
/// buffer are ignored.
#[allow(clippy::too_many_arguments)]
async fn stream_in_sequenced_entries(
lifecycle_manager: Arc<LifecycleManager>,
ingester_data: Arc<IngesterData>,
sequencer_id: SequencerId,
kafka_topic: String,
@ -226,13 +243,21 @@ async fn stream_in_sequenced_entries(
);
let result = ingester_data
.buffer_operation(sequencer_id, dml_operation.clone())
.buffer_operation(sequencer_id, dml_operation.clone(), &lifecycle_manager)
.await;
match result {
Ok(_) => {
Ok(should_pause) => {
ingest_recorder.success();
span_recorder.ok("stored write");
if should_pause {
warn!(%sequencer_id, "pausing ingest until persistence has run");
while !lifecycle_manager.can_resume_ingest() {
tokio::time::sleep(INGEST_PAUSE_DELAY).await;
}
warn!(%sequencer_id, "resuming ingest");
}
}
Err(e) => {
// skip over invalid data in the write buffer so recovery can succeed
@ -323,6 +348,7 @@ mod tests {
Arc::new(catalog),
object_store,
reading,
Executor::new(1),
&metrics,
)
.await

View File

@ -16,7 +16,7 @@ use time::{Time, TimeProvider};
/// The lifecycle manager keeps track of the size and age of partitions across all sequencers.
/// It triggers persistence based on keeping total memory usage around a set amount while
/// ensuring that partitions don't get too old or large before being persisted.
pub(crate) struct LifecycleManager {
pub struct LifecycleManager {
config: LifecycleConfig,
time_provider: Arc<dyn TimeProvider>,
state: Mutex<LifecycleState>,

View File

@ -66,7 +66,7 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
impl QueryableBatch {
/// Initilaize a QueryableBatch
pub fn new(table_name: &str, data: Vec<SnapshotBatch>, deletes: Vec<Tombstone>) -> Self {
pub fn new(table_name: &str, data: Vec<Arc<SnapshotBatch>>, deletes: Vec<Tombstone>) -> Self {
let mut delete_predicates = vec![];
for delete in &deletes {
let delete_predicate = Arc::new(

View File

@ -183,7 +183,7 @@ pub fn make_queryable_batch_with_deletes(
let mut seq_num = seq_num_start;
for batch in batches {
let seq = SequenceNumber::new(seq_num);
snapshots.push(make_snapshot_batch(batch, seq, seq));
snapshots.push(Arc::new(make_snapshot_batch(batch, seq, seq)));
seq_num += 1;
}