diff --git a/ingester/src/data.rs b/ingester/src/data.rs index 5afaae4e5e..bb87be67fc 100644 --- a/ingester/src/data.rs +++ b/ingester/src/data.rs @@ -1,13 +1,15 @@ //! Data for the lifecycle of the Ingester use crate::{ - compact::compact_persisting_batch, lifecycle::LifecycleHandle, persist::persist, + compact::compact_persisting_batch, + lifecycle::LifecycleHandle, + partioning::{Partitioner, PartitionerError}, + persist::persist, querier_handler::query, }; use arrow::record_batch::RecordBatch; use async_trait::async_trait; use backoff::{Backoff, BackoffConfig}; -use chrono::{format::StrftimeItems, TimeZone, Utc}; use data_types2::{ DeletePredicate, KafkaPartition, NamespaceId, PartitionId, PartitionInfo, SequenceNumber, SequencerId, TableId, Timestamp, Tombstone, @@ -17,13 +19,13 @@ use dml::DmlOperation; use iox_catalog::interface::Catalog; use iox_time::SystemProvider; use metric::U64Counter; -use mutable_batch::{column::ColumnData, MutableBatch}; +use mutable_batch::MutableBatch; use object_store::DynObjectStore; use observability_deps::tracing::warn; use parking_lot::RwLock; use predicate::Predicate; use query::exec::Executor; -use schema::{selection::Selection, Schema, TIME_COLUMN_NAME}; +use schema::{selection::Selection, Schema}; use snafu::{OptionExt, ResultExt, Snafu}; use std::{ collections::{btree_map::Entry, BTreeMap}, @@ -80,8 +82,8 @@ pub enum Error { #[snafu(display("The given batch does not match any in the Persisting list. Nothing is removed from the Persisting list"))] PersistingNotMatch, - #[snafu(display("Time column not present"))] - TimeColumnNotPresent, + #[snafu(display("Cannot partition data: {}", source))] + Partitioning { source: PartitionerError }, #[snafu(display("Snapshot error: {}", source))] Snapshot { source: mutable_batch::Error }, @@ -106,20 +108,61 @@ pub type Result = std::result::Result; #[derive(Debug)] pub struct IngesterData { /// Object store for persistence of parquet files - pub(crate) object_store: Arc, + object_store: Arc, + /// The global catalog for schema, parquet files and tombstones - pub(crate) catalog: Arc, + catalog: Arc, + /// This map gets set up on initialization of the ingester so it won't ever be modified. /// The content of each SequenceData will get changed when more namespaces and tables /// get ingested. - pub(crate) sequencers: BTreeMap, + sequencers: BTreeMap, + + /// Partitioner. + partitioner: Arc, + /// Executor for running queries and compacting and persisting - pub(crate) exec: Arc, + exec: Arc, + /// Backoff config - pub(crate) backoff_config: BackoffConfig, + backoff_config: BackoffConfig, } impl IngesterData { + /// Create new instance. + pub fn new( + object_store: Arc, + catalog: Arc, + sequencers: BTreeMap, + partitioner: Arc, + exec: Arc, + backoff_config: BackoffConfig, + ) -> Self { + Self { + object_store, + catalog, + sequencers, + partitioner, + exec, + backoff_config, + } + } + + /// Executor for running queries and compacting and persisting + pub(crate) fn exec(&self) -> &Arc { + &self.exec + } + + /// Get sequencer data for specific sequencer. + pub(crate) fn sequencer(&self, sequencer_id: SequencerId) -> Option<&SequencerData> { + self.sequencers.get(&sequencer_id) + } + + /// Get iterator over sequencers (ID and data). + pub(crate) fn sequencers(&self) -> impl Iterator { + self.sequencers.iter() + } + /// Store the write or delete in the in memory buffer. Deletes will /// be written into the catalog before getting stored in the buffer. /// Any writes that create new IOx partitions will have those records @@ -130,7 +173,7 @@ impl IngesterData { &self, sequencer_id: SequencerId, dml_operation: DmlOperation, - lifecycle_handle: &LifecycleHandle, + lifecycle_handle: &dyn LifecycleHandle, ) -> Result { let sequencer_data = self .sequencers @@ -142,6 +185,7 @@ impl IngesterData { sequencer_id, self.catalog.as_ref(), lifecycle_handle, + self.partitioner.as_ref(), &self.exec, ) .await @@ -384,7 +428,8 @@ impl SequencerData { dml_operation: DmlOperation, sequencer_id: SequencerId, catalog: &dyn Catalog, - lifecycle_handle: &LifecycleHandle, + lifecycle_handle: &dyn LifecycleHandle, + partitioner: &dyn Partitioner, executor: &Executor, ) -> Result { let namespace_data = match self.namespace(dml_operation.namespace()) { @@ -401,6 +446,7 @@ impl SequencerData { sequencer_id, catalog, lifecycle_handle, + partitioner, executor, ) .await @@ -500,7 +546,8 @@ impl NamespaceData { dml_operation: DmlOperation, sequencer_id: SequencerId, catalog: &dyn Catalog, - lifecycle_handle: &LifecycleHandle, + lifecycle_handle: &dyn LifecycleHandle, + partitioner: &dyn Partitioner, executor: &Executor, ) -> Result { let sequence_number = dml_operation @@ -529,6 +576,7 @@ impl NamespaceData { sequencer_id, catalog, lifecycle_handle, + partitioner, ) .await?; @@ -742,22 +790,12 @@ impl TableData { batch: MutableBatch, sequencer_id: SequencerId, catalog: &dyn Catalog, - lifecycle_handle: &LifecycleHandle, + lifecycle_handle: &dyn LifecycleHandle, + partitioner: &dyn Partitioner, ) -> Result { - let (_, col) = batch - .columns() - .find(|(name, _)| *name == TIME_COLUMN_NAME) - .unwrap(); - let timestamp = match col.data() { - ColumnData::I64(_, s) => s.min.unwrap(), - _ => return Err(Error::TimeColumnNotPresent), - }; - - let partition_key = format!( - "{}", - Utc.timestamp_nanos(timestamp) - .format_with_items(StrftimeItems::new("%Y-%m-%d")) - ); + let partition_key = partitioner + .partition_key(&batch) + .context(PartitioningSnafu)?; let partition_data = match self.partition_data.get_mut(&partition_key) { Some(p) => p, @@ -1456,6 +1494,7 @@ mod tests { use super::*; use crate::{ lifecycle::{LifecycleConfig, LifecycleManager}, + partioning::DefaultPartitioner, test_util::create_tombstone, }; use arrow_util::assert_batches_sorted_eq; @@ -1572,13 +1611,14 @@ mod tests { let object_store: Arc = Arc::new(ObjectStoreImpl::new_in_memory()); - let data = Arc::new(IngesterData { - object_store: Arc::clone(&object_store), - catalog: Arc::clone(&catalog), + let data = Arc::new(IngesterData::new( + Arc::clone(&object_store), + Arc::clone(&catalog), sequencers, - exec: Arc::new(Executor::new(1)), - backoff_config: BackoffConfig::default(), - }); + Arc::new(DefaultPartitioner::default()), + Arc::new(Executor::new(1)), + BackoffConfig::default(), + )); let schema = NamespaceSchema::new(namespace.id, kafka_topic.id, query_pool.id); @@ -1659,13 +1699,14 @@ mod tests { let object_store: Arc = Arc::new(ObjectStoreImpl::new_in_memory()); - let data = Arc::new(IngesterData { - object_store: Arc::clone(&object_store), - catalog: Arc::clone(&catalog), + let data = Arc::new(IngesterData::new( + Arc::clone(&object_store), + Arc::clone(&catalog), sequencers, - exec: Arc::new(Executor::new(1)), - backoff_config: BackoffConfig::default(), - }); + Arc::new(DefaultPartitioner::default()), + Arc::new(Executor::new(1)), + BackoffConfig::default(), + )); let schema = NamespaceSchema::new(namespace.id, kafka_topic.id, query_pool.id); @@ -2161,6 +2202,7 @@ mod tests { Arc::clone(&metrics), Arc::new(SystemProvider::new()), ); + let partitioner = DefaultPartitioner::default(); let exec = Executor::new(1); let data = NamespaceData::new(namespace.id, &*metrics); @@ -2172,6 +2214,7 @@ mod tests { sequencer.id, catalog.as_ref(), &manager.handle(), + &partitioner, &exec, ) .await @@ -2194,6 +2237,7 @@ mod tests { sequencer.id, catalog.as_ref(), &manager.handle(), + &partitioner, &exec, ) .await diff --git a/ingester/src/handler.rs b/ingester/src/handler.rs index 00af2bd1c4..e6a25dc1ef 100644 --- a/ingester/src/handler.rs +++ b/ingester/src/handler.rs @@ -3,6 +3,7 @@ use crate::{ data::{IngesterData, IngesterQueryResponse, SequencerData}, lifecycle::{run_lifecycle_manager, LifecycleConfig, LifecycleManager}, + partioning::DefaultPartitioner, poison::PoisonCabinet, querier_handler::prepare_data_to_querier, stream_handler::{ @@ -129,13 +130,14 @@ impl IngestHandlerImpl { SequencerData::new(s.kafka_partition, Arc::clone(&metric_registry)), ); } - let data = Arc::new(IngesterData { + let data = Arc::new(IngesterData::new( object_store, catalog, sequencers, + Arc::new(DefaultPartitioner::default()), exec, - backoff_config: BackoffConfig::default(), - }); + BackoffConfig::default(), + )); let ingester_data = Arc::clone(&data); let kafka_topic_name = topic.name.clone(); @@ -263,12 +265,12 @@ impl IngestHandler for IngestHandlerImpl { } } - self.data.exec.join().await; + self.data.exec().join().await; } fn shutdown(&self) { self.shutdown.cancel(); - self.data.exec.shutdown(); + self.data.exec().shutdown(); } /// Return the ingestion progress from each sequencer @@ -363,12 +365,7 @@ mod tests { loop { let mut has_measurement = false; - if let Some(data) = ingester - .ingester - .data - .sequencers - .get(&ingester.sequencer.id) - { + if let Some(data) = ingester.ingester.data.sequencer(ingester.sequencer.id) { if let Some(data) = data.namespace(&ingester.namespace.name) { // verify there's data in the buffer if let Some((b, _)) = data.snapshot("a", "1970-01-01").await { @@ -600,8 +597,7 @@ mod tests { if let Some(data) = ingester .data - .sequencers - .get(&sequencer.id) + .sequencer(sequencer.id) { if let Some(data) = data.namespace(&namespace.name) { // verify there's data in the buffer diff --git a/ingester/src/lib.rs b/ingester/src/lib.rs index d839525911..97824b36bd 100644 --- a/ingester/src/lib.rs +++ b/ingester/src/lib.rs @@ -19,6 +19,7 @@ pub mod data; pub mod handler; mod job; pub mod lifecycle; +pub mod partioning; pub mod persist; mod poison; pub mod querier_handler; diff --git a/ingester/src/lifecycle.rs b/ingester/src/lifecycle.rs index 084036c7f8..dfcc80828a 100644 --- a/ingester/src/lifecycle.rs +++ b/ingester/src/lifecycle.rs @@ -19,13 +19,31 @@ use std::{collections::BTreeMap, sync::Arc, time::Duration}; use tokio_util::sync::CancellationToken; use tracker::TrackedFutureExt; +/// API suitable for ingester tasks to query and update the [`LifecycleManager`] state. +pub trait LifecycleHandle: Send + Sync + 'static { + /// Logs bytes written into a partition so that it can be tracked for the manager to + /// trigger persistence. Returns true if the ingester should pause consuming from the + /// write buffer so that persistence can catch up and free up memory. + fn log_write( + &self, + partition_id: PartitionId, + sequencer_id: SequencerId, + sequence_number: SequenceNumber, + bytes_written: usize, + ) -> bool; + + /// Returns true if the `total_bytes` tracked by the manager is less than the pause amount. + /// As persistence runs, the `total_bytes` go down. + fn can_resume_ingest(&self) -> bool; +} + /// A handle for sequencer consumers to interact with the global /// [`LifecycleManager`] instance. /// /// This handle presents an API suitable for ingester tasks to query and update /// the [`LifecycleManager`] state. #[derive(Debug, Clone)] -pub struct LifecycleHandle { +pub struct LifecycleHandleImpl { time_provider: Arc, config: Arc, @@ -34,11 +52,8 @@ pub struct LifecycleHandle { state: Arc>, } -impl LifecycleHandle { - /// Logs bytes written into a partition so that it can be tracked for the manager to - /// trigger persistence. Returns true if the ingester should pause consuming from the - /// write buffer so that persistence can catch up and free up memory. - pub fn log_write( +impl LifecycleHandle for LifecycleHandleImpl { + fn log_write( &self, partition_id: PartitionId, sequencer_id: SequencerId, @@ -67,9 +82,7 @@ impl LifecycleHandle { s.total_bytes > self.config.pause_ingest_size } - /// Returns true if the `total_bytes` tracked by the manager is less than the pause amount. - /// As persistence runs, the `total_bytes` go down. - pub fn can_resume_ingest(&self) -> bool { + fn can_resume_ingest(&self) -> bool { let s = self.state.lock(); s.total_bytes < self.config.pause_ingest_size } @@ -239,8 +252,8 @@ impl LifecycleManager { } /// Acquire a shareable [`LifecycleHandle`] for this manager instance. - pub fn handle(&self) -> LifecycleHandle { - LifecycleHandle { + pub fn handle(&self) -> LifecycleHandleImpl { + LifecycleHandleImpl { time_provider: Arc::clone(&self.time_provider), config: Arc::clone(&self.config), state: Arc::clone(&self.state), diff --git a/ingester/src/partioning.rs b/ingester/src/partioning.rs new file mode 100644 index 0000000000..a0d950140f --- /dev/null +++ b/ingester/src/partioning.rs @@ -0,0 +1,39 @@ +//! Temporary partioning implementation until partitions are properly transmitted by the router. +use chrono::{format::StrftimeItems, TimeZone, Utc}; +use mutable_batch::{column::ColumnData, MutableBatch}; +use schema::TIME_COLUMN_NAME; + +/// Error for [`Partitioner`] +pub type PartitionerError = Box; + +/// Interface to partition data within the ingester. +/// +/// This is a temporary solution until the partition keys/IDs are properly transmitted by the router. +pub trait Partitioner: std::fmt::Debug + Send + Sync + 'static { + /// Calculate partition key for given mutable batch. + fn partition_key(&self, batch: &MutableBatch) -> Result; +} + +/// Default partitioner that matches the current router implementation. +#[derive(Debug, Default)] +#[allow(missing_copy_implementations)] +pub struct DefaultPartitioner {} + +impl Partitioner for DefaultPartitioner { + fn partition_key(&self, batch: &MutableBatch) -> Result { + let (_, col) = batch + .columns() + .find(|(name, _)| *name == TIME_COLUMN_NAME) + .unwrap(); + let timestamp = match col.data() { + ColumnData::I64(_, s) => s.min.unwrap(), + _ => return Err(String::from("Time column not present").into()), + }; + + Ok(format!( + "{}", + Utc.timestamp_nanos(timestamp) + .format_with_items(StrftimeItems::new("%Y-%m-%d")) + )) + } +} diff --git a/ingester/src/querier_handler.rs b/ingester/src/querier_handler.rs index 3d31b38c33..16f5bd8bb7 100644 --- a/ingester/src/querier_handler.rs +++ b/ingester/src/querier_handler.rs @@ -93,7 +93,7 @@ pub async fn prepare_data_to_querier( let mut found_namespace = false; let mut batches = vec![]; let mut batch_partition_ids = vec![]; - for sequencer_data in ingest_data.sequencers.values() { + for (_sequencer_id, sequencer_data) in ingest_data.sequencers() { let namespace_data = match sequencer_data.namespace(&request.namespace) { Some(namespace_data) => { found_namespace = true; @@ -125,7 +125,7 @@ pub async fn prepare_data_to_querier( // extract payload let partition_id = partition.partition_id; let (schema, batch) = - prepare_data_to_querier_for_partition(&ingest_data.exec, partition, request) + prepare_data_to_querier_for_partition(ingest_data.exec(), partition, request) .await?; schema_merger = schema_merger .merge(schema.as_ref()) diff --git a/ingester/src/stream_handler/handler.rs b/ingester/src/stream_handler/handler.rs index 173eb7346f..fdde490f7c 100644 --- a/ingester/src/stream_handler/handler.rs +++ b/ingester/src/stream_handler/handler.rs @@ -9,7 +9,7 @@ use observability_deps::tracing::*; use tokio_util::sync::CancellationToken; use write_buffer::core::{WriteBufferError, WriteBufferErrorKind}; -use crate::lifecycle::LifecycleHandle; +use crate::lifecycle::{LifecycleHandle, LifecycleHandleImpl}; use super::DmlSink; @@ -44,7 +44,7 @@ pub struct SequencedStreamHandler { /// request ingest be paused to control memory pressure. /// /// [`LifecycleManager`]: crate::lifecycle::LifecycleManager - lifecycle_handle: LifecycleHandle, + lifecycle_handle: LifecycleHandleImpl, // Metrics time_provider: T, @@ -72,7 +72,7 @@ impl SequencedStreamHandler { pub fn new( stream: I, sink: O, - lifecycle_handle: LifecycleHandle, + lifecycle_handle: LifecycleHandleImpl, kafka_topic_name: String, kafka_partition: KafkaPartition, metrics: &metric::Registry, @@ -694,7 +694,7 @@ mod tests { Ok(DmlOperation::Write(make_write("good_op", 2))) ], sink_rets = [ - Err(crate::data::Error::TimeColumnNotPresent), + Err(crate::data::Error::Partitioning{source: String::from("Time column not present").into()}), Ok(true), ], want_ttbr = 2, diff --git a/ingester/src/stream_handler/sink_adaptor.rs b/ingester/src/stream_handler/sink_adaptor.rs index b0f7cf838b..6cfbca5270 100644 --- a/ingester/src/stream_handler/sink_adaptor.rs +++ b/ingester/src/stream_handler/sink_adaptor.rs @@ -6,7 +6,7 @@ use async_trait::async_trait; use data_types2::SequencerId; use dml::DmlOperation; -use crate::{data::IngesterData, lifecycle::LifecycleHandle}; +use crate::{data::IngesterData, lifecycle::LifecycleHandleImpl}; use super::DmlSink; @@ -14,7 +14,7 @@ use super::DmlSink; #[derive(Debug)] pub struct IngestSinkAdaptor { ingest_data: Arc, - lifecycle_handle: LifecycleHandle, + lifecycle_handle: LifecycleHandleImpl, sequencer_id: SequencerId, } @@ -23,7 +23,7 @@ impl IngestSinkAdaptor { /// implementation. pub fn new( ingest_data: Arc, - lifecycle_handle: LifecycleHandle, + lifecycle_handle: LifecycleHandleImpl, sequencer_id: SequencerId, ) -> Self { Self { diff --git a/ingester/src/test_util.rs b/ingester/src/test_util.rs index 4bc59cfc59..b799ea3284 100644 --- a/ingester/src/test_util.rs +++ b/ingester/src/test_util.rs @@ -1,9 +1,12 @@ //! Test setups and data for ingester crate #![allow(missing_docs)] -use crate::data::{ - IngesterData, NamespaceData, PartitionData, PersistingBatch, QueryableBatch, SequencerData, - SnapshotBatch, TableData, +use crate::{ + data::{ + IngesterData, NamespaceData, PartitionData, PersistingBatch, QueryableBatch, SequencerData, + SnapshotBatch, TableData, + }, + partioning::DefaultPartitioner, }; use arrow::record_batch::RecordBatch; use arrow_util::assert_batches_eq; @@ -710,13 +713,14 @@ pub fn make_ingester_data(two_partitions: bool, loc: DataLocation) -> IngesterDa sequencers.insert(seq_id, seq_data); // Ingester data that inlcudes one sequencer/shard - IngesterData { + IngesterData::new( object_store, catalog, sequencers, + Arc::new(DefaultPartitioner::default()), exec, - backoff_config: backoff::BackoffConfig::default(), - } + backoff::BackoffConfig::default(), + ) } pub async fn make_ingester_data_with_tombstones(loc: DataLocation) -> IngesterData { @@ -754,13 +758,14 @@ pub async fn make_ingester_data_with_tombstones(loc: DataLocation) -> IngesterDa sequencers.insert(seq_id, seq_data); // Ingester data that inlcudes one sequencer/shard - IngesterData { + IngesterData::new( object_store, catalog, sequencers, + Arc::new(DefaultPartitioner::default()), exec, - backoff_config: backoff::BackoffConfig::default(), - } + backoff::BackoffConfig::default(), + ) } /// Make data for one or two partitions per requested diff --git a/iox_tests/src/util.rs b/iox_tests/src/util.rs index 840d99bd88..b7e0b6d3ad 100644 --- a/iox_tests/src/util.rs +++ b/iox_tests/src/util.rs @@ -219,6 +219,7 @@ impl TestCatalog { } /// A test namespace +#[derive(Debug)] #[allow(missing_docs)] pub struct TestNamespace { pub catalog: Arc, @@ -264,6 +265,7 @@ impl TestNamespace { } /// A test sequencer with ist namespace in the catalog +#[derive(Debug)] #[allow(missing_docs)] pub struct TestSequencer { pub catalog: Arc,