From 11f4a1dee83527e4a151ef1865facde8ab223a99 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Fri, 5 Nov 2021 11:03:48 +0100 Subject: [PATCH 1/4] feat: add connection management for router --- Cargo.lock | 7 + data_types/src/write_buffer.rs | 12 +- generated_types/src/write_buffer.rs | 8 +- router/Cargo.toml | 9 +- router/src/connection_pool.rs | 148 ++++++++++++++++++ router/src/grpc_client.rs | 226 ++++++++++++++++++++++++++++ router/src/lib.rs | 2 + write_buffer/src/kafka.rs | 22 +-- 8 files changed, 412 insertions(+), 22 deletions(-) create mode 100644 router/src/connection_pool.rs create mode 100644 router/src/grpc_client.rs diff --git a/Cargo.lock b/Cargo.lock index 27b5b0f43d..63d8f4f4f1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3506,16 +3506,23 @@ dependencies = [ name = "router" version = "0.1.0" dependencies = [ + "async-trait", + "cache_loader_async", "data_types", "hashbrown", + "influxdb_iox_client", "metric", "mutable_batch", "mutable_batch_lp", + "mutable_batch_pb", + "observability_deps", "parking_lot", "regex", "snafu", "time 0.1.0", + "tokio", "trace", + "write_buffer", ] [[package]] diff --git a/data_types/src/write_buffer.rs b/data_types/src/write_buffer.rs index 0d08f7c172..7a3c56c98d 100644 --- a/data_types/src/write_buffer.rs +++ b/data_types/src/write_buffer.rs @@ -1,7 +1,7 @@ -use std::{collections::HashMap, num::NonZeroU32}; +use std::{collections::BTreeMap, num::NonZeroU32}; /// If the buffer is used for reading or writing. -#[derive(Debug, Eq, PartialEq, Clone)] +#[derive(Debug, Eq, PartialEq, Clone, Hash)] pub enum WriteBufferDirection { /// Writes into the buffer aka "producer". Write, @@ -13,7 +13,7 @@ pub enum WriteBufferDirection { pub const DEFAULT_N_SEQUENCERS: u32 = 1; /// Configures the use of a write buffer. -#[derive(Debug, Eq, PartialEq, Clone)] +#[derive(Debug, Eq, PartialEq, Clone, Hash)] pub struct WriteBufferConnection { /// If the buffer is used for reading or writing. pub direction: WriteBufferDirection, @@ -27,7 +27,7 @@ pub struct WriteBufferConnection { /// Special configs to be applied when establishing the connection. /// /// This depends on [`type_`](Self::type_) and can configure aspects like timeouts. - pub connection_config: HashMap, + pub connection_config: BTreeMap, /// Specifies if the sequencers (e.g. for Kafka in form of a topic) should be automatically created if they do not /// existing prior to reading or writing. @@ -50,7 +50,7 @@ impl Default for WriteBufferConnection { /// /// What that means depends on the used write buffer, e.g. for Kafka this will create a new topic w/ /// [`n_sequencers`](Self::n_sequencers) partitions. -#[derive(Debug, Eq, PartialEq, Clone)] +#[derive(Debug, Eq, PartialEq, Clone, Hash)] pub struct WriteBufferCreationConfig { /// Number of sequencers. /// @@ -61,7 +61,7 @@ pub struct WriteBufferCreationConfig { /// Special configs to by applied when sequencers are created. /// /// This depends on [type](WriteBufferConnection::type_) and can setup parameters like retention policy. - pub options: HashMap, + pub options: BTreeMap, } impl Default for WriteBufferCreationConfig { diff --git a/generated_types/src/write_buffer.rs b/generated_types/src/write_buffer.rs index d56d467261..d938e579ce 100644 --- a/generated_types/src/write_buffer.rs +++ b/generated_types/src/write_buffer.rs @@ -17,7 +17,7 @@ impl From for write_buffer::WriteBufferConnection { direction: direction.into(), r#type: v.type_, connection: v.connection, - connection_config: v.connection_config, + connection_config: v.connection_config.into_iter().collect(), creation_config: v.creation_config.map(|x| x.into()), } } @@ -36,7 +36,7 @@ impl From for write_buffer::WriteBufferCreationConfig fn from(v: WriteBufferCreationConfig) -> Self { Self { n_sequencers: v.n_sequencers.get(), - options: v.options, + options: v.options.into_iter().collect(), } } } @@ -57,7 +57,7 @@ impl TryFrom for WriteBufferConnection { direction: direction.try_into()?, type_: proto.r#type, connection: proto.connection, - connection_config: proto.connection_config, + connection_config: proto.connection_config.into_iter().collect(), creation_config: proto.creation_config.optional("creation_config")?, }) } @@ -86,7 +86,7 @@ impl TryFrom for WriteBufferCreationCon Ok(Self { n_sequencers: NonZeroU32::try_from(proto.n_sequencers) .unwrap_or_else(|_| NonZeroU32::try_from(DEFAULT_N_SEQUENCERS).unwrap()), - options: proto.options, + options: proto.options.into_iter().collect(), }) } } diff --git a/router/Cargo.toml b/router/Cargo.toml index a1d0af7f55..8122cda697 100644 --- a/router/Cargo.toml +++ b/router/Cargo.toml @@ -4,15 +4,22 @@ version = "0.1.0" edition = "2021" [dependencies] +async-trait = "0.1" +cache_loader_async = "0.1.2" data_types = { path = "../data_types" } hashbrown = "0.11" +influxdb_iox_client = { path = "../influxdb_iox_client" } metric = { path = "../metric" } mutable_batch = { path = "../mutable_batch" } +mutable_batch_pb = { path = "../mutable_batch_pb" } +observability_deps = { path = "../observability_deps" } trace = { path = "../trace" } parking_lot = "0.11.2" snafu = "0.6" +time = { path = "../time" } +write_buffer = { path = "../write_buffer" } [dev-dependencies] mutable_batch_lp = { path = "../mutable_batch_lp" } regex = "1.4" -time = { path = "../time" } +tokio = { version = "1.13", features = ["macros", "time"] } diff --git a/router/src/connection_pool.rs b/router/src/connection_pool.rs new file mode 100644 index 0000000000..8605a6a0b3 --- /dev/null +++ b/router/src/connection_pool.rs @@ -0,0 +1,148 @@ +use std::sync::Arc; + +use cache_loader_async::cache_api::LoadingCache; +use data_types::write_buffer::WriteBufferConnection; +use observability_deps::tracing::debug; +use write_buffer::{ + config::WriteBufferConfigFactory, + core::{WriteBufferError, WriteBufferWriting}, +}; + +use crate::grpc_client::GrpcClient; + +type KeyWriteBufferProducer = (String, WriteBufferConnection); +pub type ConnectionError = Arc; + +/// Stupid hack to fit the `Box` in `WriteBufferError` into an `Arc` +struct EWrapper(WriteBufferError); + +impl std::fmt::Debug for EWrapper { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.0.fmt(f) + } +} + +impl std::fmt::Display for EWrapper { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.0.fmt(f) + } +} + +impl std::error::Error for EWrapper {} + +/// Connection pool for the entire routing server. +/// +/// This avoids: +/// 1. That every [`Router`](crate::router::Router) uses their own connections +/// 2. That we open too many connections in total. +#[derive(Debug)] +pub struct ConnectionPool { + grpc_clients: LoadingCache, ConnectionError>, + write_buffer_producers: + LoadingCache, ConnectionError>, +} + +impl ConnectionPool { + /// Create new connection pool. + /// + /// If `use_mock_grpc` is set only mock gRPC clients are created. + pub fn new(use_mock_grpc: bool, wb_factory: WriteBufferConfigFactory) -> Self { + let grpc_clients = if use_mock_grpc { + LoadingCache::new(|_connection_string: String| async move { + use crate::grpc_client::MockClient; + + Ok(Arc::new(MockClient::default()) as Arc) + }) + } else { + LoadingCache::new(|connection_string: String| async move { + use crate::grpc_client::RealClient; + use influxdb_iox_client::connection::Builder; + + let connection = Builder::default() + .build(&connection_string) + .await + .map_err(|e| Arc::new(e) as ConnectionError)?; + Ok(Arc::new(RealClient::new(connection)) as Arc) + }) + }; + + let wb_factory = Arc::new(wb_factory); + let write_buffer_producers = LoadingCache::new(move |key: KeyWriteBufferProducer| { + let wb_factory = Arc::clone(&wb_factory); + async move { + wb_factory + .new_config_write(&key.0, &key.1) + .await + .map_err(|e| Arc::new(EWrapper(e)) as ConnectionError) + } + }); + + Self { + grpc_clients, + write_buffer_producers, + } + } + + /// Create new connection factory for testing purposes. + #[cfg(test)] + pub fn new_testing() -> Self { + use time::SystemProvider; + + let time_provider = Arc::new(SystemProvider::new()); + Self::new(true, WriteBufferConfigFactory::new(time_provider)) + } + + /// Get gRPC client given a connection string. + pub async fn grpc_client( + &self, + connection_string: &str, + ) -> Result, ConnectionError> { + let res = self + .grpc_clients + .get_with_meta(connection_string.to_string()) + .await + .map_err(|e| Arc::new(e) as ConnectionError)?; + debug!(was_cached=%res.cached, %connection_string, "getting IOx write client"); + Ok(res.result) + } + + /// Get write buffer producer given a DB name and config. + pub async fn write_buffer_producer( + &self, + db_name: &str, + cfg: &WriteBufferConnection, + ) -> Result, ConnectionError> { + let res = self + .write_buffer_producers + .get_with_meta((db_name.to_string(), cfg.clone())) + .await + .map_err(|e| Arc::new(e) as ConnectionError)?; + debug!(was_cached=%res.cached, %db_name, "getting write buffer"); + Ok(res.result) + } +} + +#[cfg(test)] +mod tests { + use time::{SystemProvider, TimeProvider}; + + use crate::grpc_client::MockClient; + + use super::*; + + #[tokio::test] + async fn test_grpc_mocking() { + let time_provider: Arc = Arc::new(SystemProvider::new()); + + let pool1 = ConnectionPool::new( + false, + WriteBufferConfigFactory::new(Arc::clone(&time_provider)), + ); + // connection will fail + pool1.grpc_client("foo").await.unwrap_err(); + + let pool2 = ConnectionPool::new(true, WriteBufferConfigFactory::new(time_provider)); + let client2 = pool2.grpc_client("foo").await.unwrap(); + client2.as_any().downcast_ref::().unwrap(); + } +} diff --git a/router/src/grpc_client.rs b/router/src/grpc_client.rs new file mode 100644 index 0000000000..5565096a62 --- /dev/null +++ b/router/src/grpc_client.rs @@ -0,0 +1,226 @@ +//! gRPC clients abastraction. +//! +//! This abstraction was created for easier testing. +use std::{ + any::Any, + sync::atomic::{AtomicBool, Ordering}, +}; + +use async_trait::async_trait; +use mutable_batch::DbWrite; +use parking_lot::RwLock; + +/// Generic write error. +pub type WriteError = Box; + +/// An abstract IOx gRPC client. +#[async_trait] +pub trait GrpcClient: Sync + Send + std::fmt::Debug + 'static { + /// Write data to the given database. + async fn write(&self, db_name: &str, write: &DbWrite) -> Result<(), WriteError>; + + /// Cast client to [`Any`], useful for downcasting. + fn as_any(&self) -> &dyn Any; +} + +/// A real, network-driven gRPC client. +#[derive(Debug)] +pub struct RealClient { + /// Write client for IOx. + write_client: influxdb_iox_client::write::Client, +} + +impl RealClient { + /// Create new client from established connection. + pub fn new(connection: influxdb_iox_client::connection::Connection) -> Self { + Self { + write_client: influxdb_iox_client::write::Client::new(connection), + } + } +} + +#[async_trait] +impl GrpcClient for RealClient { + async fn write(&self, db_name: &str, write: &DbWrite) -> Result<(), WriteError> { + use influxdb_iox_client::write::generated_types::WriteRequest; + use mutable_batch_pb::encode::encode_write; + + let write_request = WriteRequest { + database_batch: Some(encode_write(db_name, write)), + }; + + // cheap, see https://docs.rs/tonic/0.4.2/tonic/client/index.html#concurrent-usage + let mut client = self.write_client.clone(); + + client + .write_pb(write_request) + .await + .map_err(|e| Box::new(e) as _) + } + + fn as_any(&self) -> &dyn Any { + self + } +} + +/// Mock client for testing. +#[derive(Debug, Default)] +pub struct MockClient { + /// All writes recorded by this client. + writes: RwLock>, + + /// Poisen pill. + /// + /// If set to `true` all writes will fail. + poisoned: AtomicBool, +} + +impl MockClient { + /// Take poison pill. + /// + /// All subsequent writes will fail. + pub fn poison(&self) { + self.poisoned.store(true, Ordering::SeqCst) + } + + /// Get a copy of all recorded writes. + pub fn writes(&self) -> Vec<(String, DbWrite)> { + self.writes.read().clone() + } + + /// Assert that writes are as expected. + pub fn assert_writes(&self, expected: &[(String, DbWrite)]) { + use mutable_batch::test_util::assert_writes_eq; + + let actual = self.writes(); + + assert_eq!( + actual.len(), + expected.len(), + "number of writes differ ({} VS {})", + actual.len(), + expected.len() + ); + + for ((actual_db, actual_write), (expected_db, expected_write)) in + actual.iter().zip(expected) + { + assert_eq!( + actual_db, expected_db, + "database names differ (\"{}\" VS \"{}\")", + actual_db, expected_db + ); + assert_writes_eq(actual_write, expected_write); + } + } +} + +#[async_trait] +impl GrpcClient for MockClient { + async fn write(&self, db_name: &str, write: &DbWrite) -> Result<(), WriteError> { + if self.poisoned.load(Ordering::SeqCst) { + return Err("poisened".to_string().into()); + } + + self.writes + .write() + .push((db_name.to_string(), write.clone())); + Ok(()) + } + + fn as_any(&self) -> &dyn Any { + self + } +} + +#[cfg(test)] +mod tests { + use mutable_batch_lp::lines_to_batches; + + use super::*; + + #[tokio::test] + async fn test_mock() { + let client = MockClient::default(); + + let write1 = DbWrite::new( + lines_to_batches("foo x=1 1", 0).unwrap(), + Default::default(), + ); + let write2 = DbWrite::new( + lines_to_batches("foo x=2 2", 0).unwrap(), + Default::default(), + ); + let write3 = DbWrite::new( + lines_to_batches("foo x=3 3", 0).unwrap(), + Default::default(), + ); + + client.write("db1", &write1).await.unwrap(); + client.write("db2", &write1).await.unwrap(); + client.write("db1", &write2).await.unwrap(); + + let expected_writes = vec![ + (String::from("db1"), write1.clone()), + (String::from("db2"), write1.clone()), + (String::from("db1"), write2.clone()), + ]; + client.assert_writes(&expected_writes); + + client.poison(); + client.write("db1", &write3).await.unwrap_err(); + client.assert_writes(&expected_writes); + } + + #[tokio::test] + #[should_panic(expected = "number of writes differ (1 VS 0)")] + async fn test_assert_writes_fail_count() { + let client = MockClient::default(); + + let write1 = DbWrite::new( + lines_to_batches("foo x=1 1", 0).unwrap(), + Default::default(), + ); + + client.write("db1", &write1).await.unwrap(); + + let expected_writes = []; + client.assert_writes(&expected_writes); + } + + #[tokio::test] + #[should_panic(expected = "database names differ (\"db1\" VS \"db2\")")] + async fn test_assert_writes_fail_db_name() { + let client = MockClient::default(); + + let write = DbWrite::new( + lines_to_batches("foo x=1 1", 0).unwrap(), + Default::default(), + ); + + client.write("db1", &write).await.unwrap(); + + let expected_writes = vec![(String::from("db2"), write)]; + client.assert_writes(&expected_writes); + } + + #[tokio::test] + #[should_panic(expected = "batches for table \"foo\" differ")] + async fn test_assert_writes_fail_batch() { + let client = MockClient::default(); + + let write1 = DbWrite::new( + lines_to_batches("foo x=1 1", 0).unwrap(), + Default::default(), + ); + let write2 = DbWrite::new( + lines_to_batches("foo x=2 2", 0).unwrap(), + Default::default(), + ); + + client.write("db1", &write1).await.unwrap(); + + let expected_writes = vec![(String::from("db1"), write2)]; + client.assert_writes(&expected_writes); + } +} diff --git a/router/src/lib.rs b/router/src/lib.rs index ee35ddfb29..4e7a0cca78 100644 --- a/router/src/lib.rs +++ b/router/src/lib.rs @@ -8,6 +8,8 @@ clippy::clone_on_ref_ptr )] +pub mod connection_pool; +pub mod grpc_client; pub mod resolver; pub mod router; pub mod server; diff --git a/write_buffer/src/kafka.rs b/write_buffer/src/kafka.rs index 264f44ff26..7001968491 100644 --- a/write_buffer/src/kafka.rs +++ b/write_buffer/src/kafka.rs @@ -1,5 +1,5 @@ use std::{ - collections::{BTreeMap, BTreeSet, HashMap}, + collections::{BTreeMap, BTreeSet}, convert::{TryFrom, TryInto}, num::NonZeroU32, sync::Arc, @@ -129,7 +129,7 @@ impl KafkaBufferProducer { pub async fn new( conn: impl Into + Send, database_name: impl Into + Send, - connection_config: &HashMap, + connection_config: &BTreeMap, creation_config: Option<&WriteBufferCreationConfig>, time_provider: Arc, ) -> Result { @@ -313,7 +313,7 @@ impl KafkaBufferConsumer { conn: impl Into + Send + Sync, server_id: ServerId, database_name: impl Into + Send + Sync, - connection_config: &HashMap, + connection_config: &BTreeMap, creation_config: Option<&WriteBufferCreationConfig>, // `trace_collector` has to be a reference due to https://github.com/rust-lang/rust/issues/63033 trace_collector: Option<&Arc>, @@ -426,7 +426,7 @@ async fn create_kafka_topic( kafka_connection: &str, database_name: &str, n_sequencers: NonZeroU32, - cfg: &HashMap, + cfg: &BTreeMap, ) -> Result<(), WriteBufferError> { let admin = admin_client(kafka_connection)?; @@ -489,7 +489,7 @@ async fn maybe_auto_create_topics( } pub mod test_utils { - use std::{collections::HashMap, time::Duration}; + use std::{collections::BTreeMap, time::Duration}; use rdkafka::admin::{AdminOptions, AlterConfig, ResourceSpecifier}; use uuid::Uuid; @@ -544,12 +544,12 @@ pub mod test_utils { } /// Create topic creation config that is ideal for testing and works with [`purge_kafka_topic`] - pub fn kafka_sequencer_options() -> HashMap { - let mut cfg: HashMap = Default::default(); - cfg.insert("cleanup.policy".to_string(), "delete".to_string()); - cfg.insert("retention.ms".to_string(), "-1".to_string()); - cfg.insert("segment.ms".to_string(), "10".to_string()); - cfg + pub fn kafka_sequencer_options() -> BTreeMap { + BTreeMap::from([ + ("cleanup.policy".to_string(), "delete".to_string()), + ("retention.ms".to_string(), "-1".to_string()), + ("segment.ms".to_string(), "10".to_string()), + ]) } /// Purge all records from given topic. From 7f6280305d2c5cf6bbb735d3b699656f6a8ebef5 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Fri, 5 Nov 2021 11:22:43 +0100 Subject: [PATCH 2/4] fix: `ConnectionPool::new` requires tokio --- router/src/connection_pool.rs | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/router/src/connection_pool.rs b/router/src/connection_pool.rs index 8605a6a0b3..0e83838d6d 100644 --- a/router/src/connection_pool.rs +++ b/router/src/connection_pool.rs @@ -46,7 +46,10 @@ impl ConnectionPool { /// Create new connection pool. /// /// If `use_mock_grpc` is set only mock gRPC clients are created. - pub fn new(use_mock_grpc: bool, wb_factory: WriteBufferConfigFactory) -> Self { + pub async fn new(use_mock_grpc: bool, wb_factory: WriteBufferConfigFactory) -> Self { + // Note: this function is async even though it does not contain any `.await` calls because `LoadingCache::new` + // requires tokio to be running and even if documented people will forget about this. + let grpc_clients = if use_mock_grpc { LoadingCache::new(|_connection_string: String| async move { use crate::grpc_client::MockClient; @@ -85,11 +88,11 @@ impl ConnectionPool { /// Create new connection factory for testing purposes. #[cfg(test)] - pub fn new_testing() -> Self { + pub async fn new_testing() -> Self { use time::SystemProvider; let time_provider = Arc::new(SystemProvider::new()); - Self::new(true, WriteBufferConfigFactory::new(time_provider)) + Self::new(true, WriteBufferConfigFactory::new(time_provider)).await } /// Get gRPC client given a connection string. @@ -137,11 +140,12 @@ mod tests { let pool1 = ConnectionPool::new( false, WriteBufferConfigFactory::new(Arc::clone(&time_provider)), - ); + ) + .await; // connection will fail pool1.grpc_client("foo").await.unwrap_err(); - let pool2 = ConnectionPool::new(true, WriteBufferConfigFactory::new(time_provider)); + let pool2 = ConnectionPool::new(true, WriteBufferConfigFactory::new(time_provider)).await; let client2 = pool2.grpc_client("foo").await.unwrap(); client2.as_any().downcast_ref::().unwrap(); } From 9984e4d3860179265d418be6202c3ae581663a67 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Mon, 8 Nov 2021 09:45:02 +0100 Subject: [PATCH 3/4] fix: debug msg Co-authored-by: Andrew Lamb --- router/src/connection_pool.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/router/src/connection_pool.rs b/router/src/connection_pool.rs index 0e83838d6d..64eeb38117 100644 --- a/router/src/connection_pool.rs +++ b/router/src/connection_pool.rs @@ -105,7 +105,7 @@ impl ConnectionPool { .get_with_meta(connection_string.to_string()) .await .map_err(|e| Arc::new(e) as ConnectionError)?; - debug!(was_cached=%res.cached, %connection_string, "getting IOx write client"); + debug!(was_cached=%res.cached, %connection_string, "getting IOx client"); Ok(res.result) } From 5b01f9b3a502dcdbc14d7b98397244bfa1fc750d Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Mon, 8 Nov 2021 09:48:12 +0100 Subject: [PATCH 4/4] docs: explain btree map decision --- data_types/src/write_buffer.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/data_types/src/write_buffer.rs b/data_types/src/write_buffer.rs index 7a3c56c98d..c223828ea8 100644 --- a/data_types/src/write_buffer.rs +++ b/data_types/src/write_buffer.rs @@ -27,6 +27,8 @@ pub struct WriteBufferConnection { /// Special configs to be applied when establishing the connection. /// /// This depends on [`type_`](Self::type_) and can configure aspects like timeouts. + /// + /// Note: This config should be a [`BTreeMap`] to ensure that a stable hash. pub connection_config: BTreeMap, /// Specifies if the sequencers (e.g. for Kafka in form of a topic) should be automatically created if they do not @@ -61,6 +63,8 @@ pub struct WriteBufferCreationConfig { /// Special configs to by applied when sequencers are created. /// /// This depends on [type](WriteBufferConnection::type_) and can setup parameters like retention policy. + /// + /// Note: This config should be a [`BTreeMap`] to ensure that a stable hash. pub options: BTreeMap, }