diff --git a/clap_blocks/src/write_buffer.rs b/clap_blocks/src/write_buffer.rs index 248002cb02..3a8d753462 100644 --- a/clap_blocks/src/write_buffer.rs +++ b/clap_blocks/src/write_buffer.rs @@ -1,7 +1,7 @@ //! Config for [`write_buffer`]. use iox_time::SystemProvider; use observability_deps::tracing::*; -use std::{collections::BTreeMap, num::NonZeroU32, path::PathBuf, sync::Arc}; +use std::{collections::BTreeMap, num::NonZeroU32, ops::Range, path::PathBuf, sync::Arc}; use tempfile::TempDir; use trace::TraceCollector; use write_buffer::{ @@ -94,12 +94,13 @@ impl WriteBufferConfig { pub async fn writing( &self, metrics: Arc, + partitions: Option>, trace_collector: Option>, ) -> Result, WriteBufferError> { let conn = self.conn(); let factory = Self::factory(metrics); factory - .new_config_write(&self.topic, trace_collector.as_ref(), &conn) + .new_config_write(&self.topic, partitions, trace_collector.as_ref(), &conn) .await } @@ -107,12 +108,13 @@ impl WriteBufferConfig { pub async fn reading( &self, metrics: Arc, + partitions: Option>, trace_collector: Option>, ) -> Result, WriteBufferError> { let conn = self.conn(); let factory = Self::factory(metrics); factory - .new_config_read(&self.topic, trace_collector.as_ref(), &conn) + .new_config_read(&self.topic, partitions, trace_collector.as_ref(), &conn) .await } diff --git a/ioxd_ingester/src/lib.rs b/ioxd_ingester/src/lib.rs index a85f09bdf3..fe635f22af 100644 --- a/ioxd_ingester/src/lib.rs +++ b/ioxd_ingester/src/lib.rs @@ -156,10 +156,9 @@ pub async fn create_ingester_server_type( return Err(Error::ShardIndexRange); } - let shard_indexes: Vec<_> = (ingester_config.shard_index_range_start - ..=ingester_config.shard_index_range_end) - .map(ShardIndex::new) - .collect(); + let shard_range = + ingester_config.shard_index_range_start..(ingester_config.shard_index_range_end + 1); + let shard_indexes: Vec<_> = shard_range.clone().map(ShardIndex::new).collect(); let mut shards = BTreeMap::new(); for shard_index in shard_indexes { @@ -171,7 +170,11 @@ pub async fn create_ingester_server_type( let trace_collector = common_state.trace_collector(); let write_buffer = write_buffer_config - .reading(Arc::clone(&metric_registry), trace_collector.clone()) + .reading( + Arc::clone(&metric_registry), + Some(shard_range), + trace_collector.clone(), + ) .await?; let lifecycle_config = LifecycleConfig::new( diff --git a/ioxd_router/src/lib.rs b/ioxd_router/src/lib.rs index b0475fcec7..7856254ea4 100644 --- a/ioxd_router/src/lib.rs +++ b/ioxd_router/src/lib.rs @@ -320,7 +320,7 @@ async fn init_write_buffer( )> { let write_buffer = Arc::new( write_buffer_config - .writing(Arc::clone(&metrics), trace_collector) + .writing(Arc::clone(&metrics), None, trace_collector) .await?, ); diff --git a/write_buffer/src/config.rs b/write_buffer/src/config.rs index e22e91f816..1d02e60cc9 100644 --- a/write_buffer/src/config.rs +++ b/write_buffer/src/config.rs @@ -12,6 +12,7 @@ use parking_lot::RwLock; use std::{ collections::{btree_map::Entry, BTreeMap}, num::NonZeroU32, + ops::Range, path::PathBuf, sync::Arc, }; @@ -151,6 +152,7 @@ impl WriteBufferConfigFactory { pub async fn new_config_write( &self, db_name: &str, + partitions: Option>, trace_collector: Option<&Arc>, cfg: &WriteBufferConnection, ) -> Result, WriteBufferError> { @@ -173,6 +175,7 @@ impl WriteBufferConfigFactory { &cfg.connection_config, Arc::clone(&self.time_provider), cfg.creation_config.as_ref(), + partitions, trace_collector.map(Arc::clone), &*self.metric_registry, ) @@ -205,6 +208,7 @@ impl WriteBufferConfigFactory { pub async fn new_config_read( &self, db_name: &str, + partitions: Option>, trace_collector: Option<&Arc>, cfg: &WriteBufferConnection, ) -> Result, WriteBufferError> { @@ -226,6 +230,7 @@ impl WriteBufferConfigFactory { db_name.to_owned(), &cfg.connection_config, cfg.creation_config.as_ref(), + partitions, trace_collector.map(Arc::clone), ) .await?; @@ -275,7 +280,7 @@ mod tests { }; let conn = factory - .new_config_write(db_name.as_str(), None, &cfg) + .new_config_write(db_name.as_str(), None, None, &cfg) .await .unwrap(); assert_eq!(conn.type_name(), "file"); @@ -294,7 +299,7 @@ mod tests { }; let conn = factory - .new_config_read(db_name.as_str(), None, &cfg) + .new_config_read(db_name.as_str(), None, None, &cfg) .await .unwrap(); assert_eq!(conn.type_name(), "file"); @@ -316,7 +321,7 @@ mod tests { }; let conn = factory - .new_config_write(db_name.as_str(), None, &cfg) + .new_config_write(db_name.as_str(), None, None, &cfg) .await .unwrap(); assert_eq!(conn.type_name(), "mock"); @@ -328,7 +333,7 @@ mod tests { ..Default::default() }; let err = factory - .new_config_write(db_name.as_str(), None, &cfg) + .new_config_write(db_name.as_str(), None, None, &cfg) .await .unwrap_err(); assert!(err.to_string().contains("Unknown mock ID:")); @@ -350,7 +355,7 @@ mod tests { }; let conn = factory - .new_config_read(db_name.as_str(), None, &cfg) + .new_config_read(db_name.as_str(), None, None, &cfg) .await .unwrap(); assert_eq!(conn.type_name(), "mock"); @@ -362,7 +367,7 @@ mod tests { ..Default::default() }; let err = factory - .new_config_read(db_name.as_str(), None, &cfg) + .new_config_read(db_name.as_str(), None, None, &cfg) .await .unwrap_err(); assert!(err.to_string().contains("Unknown mock ID:")); @@ -383,7 +388,7 @@ mod tests { }; let conn = factory - .new_config_write(db_name.as_str(), None, &cfg) + .new_config_write(db_name.as_str(), None, None, &cfg) .await .unwrap(); assert_eq!(conn.type_name(), "mock_failing"); @@ -395,7 +400,7 @@ mod tests { ..Default::default() }; let err = factory - .new_config_write(db_name.as_str(), None, &cfg) + .new_config_write(db_name.as_str(), None, None, &cfg) .await .unwrap_err(); assert!(err.to_string().contains("Unknown mock ID:")); @@ -416,7 +421,7 @@ mod tests { }; let conn = factory - .new_config_read(db_name.as_str(), None, &cfg) + .new_config_read(db_name.as_str(), None, None, &cfg) .await .unwrap(); assert_eq!(conn.type_name(), "mock_failing"); @@ -428,7 +433,7 @@ mod tests { ..Default::default() }; let err = factory - .new_config_read(db_name.as_str(), None, &cfg) + .new_config_read(db_name.as_str(), None, None, &cfg) .await .unwrap_err(); assert!(err.to_string().contains("Unknown mock ID:")); @@ -464,7 +469,7 @@ mod tests { }; let conn = factory - .new_config_write(db_name.as_str(), None, &cfg) + .new_config_write(db_name.as_str(), None, None, &cfg) .await .unwrap(); assert_eq!(conn.type_name(), "kafka"); @@ -484,7 +489,7 @@ mod tests { }; let conn = factory - .new_config_read(db_name.as_str(), None, &cfg) + .new_config_read(db_name.as_str(), None, None, &cfg) .await .unwrap(); assert_eq!(conn.type_name(), "kafka"); diff --git a/write_buffer/src/kafka/mod.rs b/write_buffer/src/kafka/mod.rs index 788788548a..0d58eb9932 100644 --- a/write_buffer/src/kafka/mod.rs +++ b/write_buffer/src/kafka/mod.rs @@ -33,6 +33,7 @@ use rskafka::{ }; use std::{ collections::{BTreeMap, BTreeSet}, + ops::Range, sync::{ atomic::{AtomicBool, Ordering}, Arc, @@ -55,17 +56,25 @@ pub struct RSKafkaProducer { } impl RSKafkaProducer { + #[allow(clippy::too_many_arguments)] pub async fn new<'a>( conn: String, topic_name: String, connection_config: &'a BTreeMap, time_provider: Arc, creation_config: Option<&'a WriteBufferCreationConfig>, + partitions: Option>, _trace_collector: Option>, metric_registry: &'a metric::Registry, ) -> Result { - let partition_clients = - setup_topic(conn, topic_name.clone(), connection_config, creation_config).await?; + let partition_clients = setup_topic( + conn, + topic_name.clone(), + connection_config, + creation_config, + partitions, + ) + .await?; let producer_config = ProducerConfig::try_from(connection_config)?; @@ -358,10 +367,17 @@ impl RSKafkaConsumer { topic_name: String, connection_config: &BTreeMap, creation_config: Option<&WriteBufferCreationConfig>, + partitions: Option>, trace_collector: Option>, ) -> Result { - let partition_clients = - setup_topic(conn, topic_name.clone(), connection_config, creation_config).await?; + let partition_clients = setup_topic( + conn, + topic_name.clone(), + connection_config, + creation_config, + partitions, + ) + .await?; let partition_clients = partition_clients .into_iter() @@ -428,6 +444,7 @@ async fn setup_topic( topic_name: String, connection_config: &BTreeMap, creation_config: Option<&WriteBufferCreationConfig>, + partitions: Option>, ) -> Result> { let client_config = ClientConfig::try_from(connection_config)?; let mut client_builder = ClientBuilder::new(vec![conn]); @@ -447,17 +464,37 @@ async fn setup_topic( // Instantiate 10 partition clients concurrently until all are ready // speed up server init. let client_ref = &client; - return stream::iter(topic.partitions.into_iter().map(|p| { - let topic_name = topic_name.clone(); - async move { - let shard_index = ShardIndex::new(p); - let c = client_ref.partition_client(&topic_name, p).await?; - Ok((shard_index, c)) - } - })) + let clients = stream::iter( + topic + .partitions + .into_iter() + .filter(|p| { + partitions + .as_ref() + .map(|want| want.contains(p)) + .unwrap_or(true) + }) + .map(|p| { + let topic_name = topic_name.clone(); + async move { + let shard_index = ShardIndex::new(p); + let c = client_ref.partition_client(&topic_name, p).await?; + Result::<_, WriteBufferError>::Ok((shard_index, c)) + } + }), + ) .buffer_unordered(10) .try_collect::>() - .await; + .await?; + + if let Some(p) = partitions { + assert_eq!( + p.len(), + clients.len(), + "requested partition clients not initialised" + ); + } + return Ok(clients); } // create topic @@ -576,6 +613,7 @@ mod tests { &BTreeMap::default(), Arc::clone(&self.time_provider), self.creation_config(creation_config).as_ref(), + None, Some(self.trace_collector() as Arc<_>), &self.metrics, ) @@ -588,6 +626,7 @@ mod tests { self.topic_name.clone(), &BTreeMap::default(), self.creation_config(creation_config).as_ref(), + None, Some(self.trace_collector() as Arc<_>), ) .await @@ -625,6 +664,7 @@ mod tests { n_shards, ..Default::default() }), + None, ) .await .unwrap();