Merge branch 'main' into dependabot/cargo/paste-1.0.6

pull/24376/head
kodiakhq[bot] 2021-11-08 10:20:56 +00:00 committed by GitHub
commit 6e8517cb1f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 420 additions and 22 deletions

7
Cargo.lock generated
View File

@ -3506,16 +3506,23 @@ dependencies = [
name = "router"
version = "0.1.0"
dependencies = [
"async-trait",
"cache_loader_async",
"data_types",
"hashbrown",
"influxdb_iox_client",
"metric",
"mutable_batch",
"mutable_batch_lp",
"mutable_batch_pb",
"observability_deps",
"parking_lot",
"regex",
"snafu",
"time 0.1.0",
"tokio",
"trace",
"write_buffer",
]
[[package]]

View File

@ -1,7 +1,7 @@
use std::{collections::HashMap, num::NonZeroU32};
use std::{collections::BTreeMap, num::NonZeroU32};
/// If the buffer is used for reading or writing.
#[derive(Debug, Eq, PartialEq, Clone)]
#[derive(Debug, Eq, PartialEq, Clone, Hash)]
pub enum WriteBufferDirection {
/// Writes into the buffer aka "producer".
Write,
@ -13,7 +13,7 @@ pub enum WriteBufferDirection {
pub const DEFAULT_N_SEQUENCERS: u32 = 1;
/// Configures the use of a write buffer.
#[derive(Debug, Eq, PartialEq, Clone)]
#[derive(Debug, Eq, PartialEq, Clone, Hash)]
pub struct WriteBufferConnection {
/// If the buffer is used for reading or writing.
pub direction: WriteBufferDirection,
@ -27,7 +27,9 @@ pub struct WriteBufferConnection {
/// Special configs to be applied when establishing the connection.
///
/// This depends on [`type_`](Self::type_) and can configure aspects like timeouts.
pub connection_config: HashMap<String, String>,
///
/// Note: This config should be a [`BTreeMap`] to ensure that a stable hash.
pub connection_config: BTreeMap<String, String>,
/// Specifies if the sequencers (e.g. for Kafka in form of a topic) should be automatically created if they do not
/// existing prior to reading or writing.
@ -50,7 +52,7 @@ impl Default for WriteBufferConnection {
///
/// What that means depends on the used write buffer, e.g. for Kafka this will create a new topic w/
/// [`n_sequencers`](Self::n_sequencers) partitions.
#[derive(Debug, Eq, PartialEq, Clone)]
#[derive(Debug, Eq, PartialEq, Clone, Hash)]
pub struct WriteBufferCreationConfig {
/// Number of sequencers.
///
@ -61,7 +63,9 @@ pub struct WriteBufferCreationConfig {
/// Special configs to by applied when sequencers are created.
///
/// This depends on [type](WriteBufferConnection::type_) and can setup parameters like retention policy.
pub options: HashMap<String, String>,
///
/// Note: This config should be a [`BTreeMap`] to ensure that a stable hash.
pub options: BTreeMap<String, String>,
}
impl Default for WriteBufferCreationConfig {

View File

@ -17,7 +17,7 @@ impl From<WriteBufferConnection> for write_buffer::WriteBufferConnection {
direction: direction.into(),
r#type: v.type_,
connection: v.connection,
connection_config: v.connection_config,
connection_config: v.connection_config.into_iter().collect(),
creation_config: v.creation_config.map(|x| x.into()),
}
}
@ -36,7 +36,7 @@ impl From<WriteBufferCreationConfig> for write_buffer::WriteBufferCreationConfig
fn from(v: WriteBufferCreationConfig) -> Self {
Self {
n_sequencers: v.n_sequencers.get(),
options: v.options,
options: v.options.into_iter().collect(),
}
}
}
@ -57,7 +57,7 @@ impl TryFrom<write_buffer::WriteBufferConnection> for WriteBufferConnection {
direction: direction.try_into()?,
type_: proto.r#type,
connection: proto.connection,
connection_config: proto.connection_config,
connection_config: proto.connection_config.into_iter().collect(),
creation_config: proto.creation_config.optional("creation_config")?,
})
}
@ -86,7 +86,7 @@ impl TryFrom<write_buffer::WriteBufferCreationConfig> for WriteBufferCreationCon
Ok(Self {
n_sequencers: NonZeroU32::try_from(proto.n_sequencers)
.unwrap_or_else(|_| NonZeroU32::try_from(DEFAULT_N_SEQUENCERS).unwrap()),
options: proto.options,
options: proto.options.into_iter().collect(),
})
}
}

View File

@ -4,15 +4,22 @@ version = "0.1.0"
edition = "2021"
[dependencies]
async-trait = "0.1"
cache_loader_async = "0.1.2"
data_types = { path = "../data_types" }
hashbrown = "0.11"
influxdb_iox_client = { path = "../influxdb_iox_client" }
metric = { path = "../metric" }
mutable_batch = { path = "../mutable_batch" }
mutable_batch_pb = { path = "../mutable_batch_pb" }
observability_deps = { path = "../observability_deps" }
trace = { path = "../trace" }
parking_lot = "0.11.2"
snafu = "0.6"
time = { path = "../time" }
write_buffer = { path = "../write_buffer" }
[dev-dependencies]
mutable_batch_lp = { path = "../mutable_batch_lp" }
regex = "1.4"
time = { path = "../time" }
tokio = { version = "1.13", features = ["macros", "time"] }

View File

@ -0,0 +1,152 @@
use std::sync::Arc;
use cache_loader_async::cache_api::LoadingCache;
use data_types::write_buffer::WriteBufferConnection;
use observability_deps::tracing::debug;
use write_buffer::{
config::WriteBufferConfigFactory,
core::{WriteBufferError, WriteBufferWriting},
};
use crate::grpc_client::GrpcClient;
type KeyWriteBufferProducer = (String, WriteBufferConnection);
pub type ConnectionError = Arc<dyn std::error::Error + Send + Sync + 'static>;
/// Stupid hack to fit the `Box<dyn ...>` in `WriteBufferError` into an `Arc`
struct EWrapper(WriteBufferError);
impl std::fmt::Debug for EWrapper {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}
impl std::fmt::Display for EWrapper {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}
impl std::error::Error for EWrapper {}
/// Connection pool for the entire routing server.
///
/// This avoids:
/// 1. That every [`Router`](crate::router::Router) uses their own connections
/// 2. That we open too many connections in total.
#[derive(Debug)]
pub struct ConnectionPool {
grpc_clients: LoadingCache<String, Arc<dyn GrpcClient>, ConnectionError>,
write_buffer_producers:
LoadingCache<KeyWriteBufferProducer, Arc<dyn WriteBufferWriting>, ConnectionError>,
}
impl ConnectionPool {
/// Create new connection pool.
///
/// If `use_mock_grpc` is set only mock gRPC clients are created.
pub async fn new(use_mock_grpc: bool, wb_factory: WriteBufferConfigFactory) -> Self {
// Note: this function is async even though it does not contain any `.await` calls because `LoadingCache::new`
// requires tokio to be running and even if documented people will forget about this.
let grpc_clients = if use_mock_grpc {
LoadingCache::new(|_connection_string: String| async move {
use crate::grpc_client::MockClient;
Ok(Arc::new(MockClient::default()) as Arc<dyn GrpcClient>)
})
} else {
LoadingCache::new(|connection_string: String| async move {
use crate::grpc_client::RealClient;
use influxdb_iox_client::connection::Builder;
let connection = Builder::default()
.build(&connection_string)
.await
.map_err(|e| Arc::new(e) as ConnectionError)?;
Ok(Arc::new(RealClient::new(connection)) as Arc<dyn GrpcClient>)
})
};
let wb_factory = Arc::new(wb_factory);
let write_buffer_producers = LoadingCache::new(move |key: KeyWriteBufferProducer| {
let wb_factory = Arc::clone(&wb_factory);
async move {
wb_factory
.new_config_write(&key.0, &key.1)
.await
.map_err(|e| Arc::new(EWrapper(e)) as ConnectionError)
}
});
Self {
grpc_clients,
write_buffer_producers,
}
}
/// Create new connection factory for testing purposes.
#[cfg(test)]
pub async fn new_testing() -> Self {
use time::SystemProvider;
let time_provider = Arc::new(SystemProvider::new());
Self::new(true, WriteBufferConfigFactory::new(time_provider)).await
}
/// Get gRPC client given a connection string.
pub async fn grpc_client(
&self,
connection_string: &str,
) -> Result<Arc<dyn GrpcClient>, ConnectionError> {
let res = self
.grpc_clients
.get_with_meta(connection_string.to_string())
.await
.map_err(|e| Arc::new(e) as ConnectionError)?;
debug!(was_cached=%res.cached, %connection_string, "getting IOx client");
Ok(res.result)
}
/// Get write buffer producer given a DB name and config.
pub async fn write_buffer_producer(
&self,
db_name: &str,
cfg: &WriteBufferConnection,
) -> Result<Arc<dyn WriteBufferWriting>, ConnectionError> {
let res = self
.write_buffer_producers
.get_with_meta((db_name.to_string(), cfg.clone()))
.await
.map_err(|e| Arc::new(e) as ConnectionError)?;
debug!(was_cached=%res.cached, %db_name, "getting write buffer");
Ok(res.result)
}
}
#[cfg(test)]
mod tests {
use time::{SystemProvider, TimeProvider};
use crate::grpc_client::MockClient;
use super::*;
#[tokio::test]
async fn test_grpc_mocking() {
let time_provider: Arc<dyn TimeProvider> = Arc::new(SystemProvider::new());
let pool1 = ConnectionPool::new(
false,
WriteBufferConfigFactory::new(Arc::clone(&time_provider)),
)
.await;
// connection will fail
pool1.grpc_client("foo").await.unwrap_err();
let pool2 = ConnectionPool::new(true, WriteBufferConfigFactory::new(time_provider)).await;
let client2 = pool2.grpc_client("foo").await.unwrap();
client2.as_any().downcast_ref::<MockClient>().unwrap();
}
}

226
router/src/grpc_client.rs Normal file
View File

@ -0,0 +1,226 @@
//! gRPC clients abastraction.
//!
//! This abstraction was created for easier testing.
use std::{
any::Any,
sync::atomic::{AtomicBool, Ordering},
};
use async_trait::async_trait;
use mutable_batch::DbWrite;
use parking_lot::RwLock;
/// Generic write error.
pub type WriteError = Box<dyn std::error::Error + Send + Sync>;
/// An abstract IOx gRPC client.
#[async_trait]
pub trait GrpcClient: Sync + Send + std::fmt::Debug + 'static {
/// Write data to the given database.
async fn write(&self, db_name: &str, write: &DbWrite) -> Result<(), WriteError>;
/// Cast client to [`Any`], useful for downcasting.
fn as_any(&self) -> &dyn Any;
}
/// A real, network-driven gRPC client.
#[derive(Debug)]
pub struct RealClient {
/// Write client for IOx.
write_client: influxdb_iox_client::write::Client,
}
impl RealClient {
/// Create new client from established connection.
pub fn new(connection: influxdb_iox_client::connection::Connection) -> Self {
Self {
write_client: influxdb_iox_client::write::Client::new(connection),
}
}
}
#[async_trait]
impl GrpcClient for RealClient {
async fn write(&self, db_name: &str, write: &DbWrite) -> Result<(), WriteError> {
use influxdb_iox_client::write::generated_types::WriteRequest;
use mutable_batch_pb::encode::encode_write;
let write_request = WriteRequest {
database_batch: Some(encode_write(db_name, write)),
};
// cheap, see https://docs.rs/tonic/0.4.2/tonic/client/index.html#concurrent-usage
let mut client = self.write_client.clone();
client
.write_pb(write_request)
.await
.map_err(|e| Box::new(e) as _)
}
fn as_any(&self) -> &dyn Any {
self
}
}
/// Mock client for testing.
#[derive(Debug, Default)]
pub struct MockClient {
/// All writes recorded by this client.
writes: RwLock<Vec<(String, DbWrite)>>,
/// Poisen pill.
///
/// If set to `true` all writes will fail.
poisoned: AtomicBool,
}
impl MockClient {
/// Take poison pill.
///
/// All subsequent writes will fail.
pub fn poison(&self) {
self.poisoned.store(true, Ordering::SeqCst)
}
/// Get a copy of all recorded writes.
pub fn writes(&self) -> Vec<(String, DbWrite)> {
self.writes.read().clone()
}
/// Assert that writes are as expected.
pub fn assert_writes(&self, expected: &[(String, DbWrite)]) {
use mutable_batch::test_util::assert_writes_eq;
let actual = self.writes();
assert_eq!(
actual.len(),
expected.len(),
"number of writes differ ({} VS {})",
actual.len(),
expected.len()
);
for ((actual_db, actual_write), (expected_db, expected_write)) in
actual.iter().zip(expected)
{
assert_eq!(
actual_db, expected_db,
"database names differ (\"{}\" VS \"{}\")",
actual_db, expected_db
);
assert_writes_eq(actual_write, expected_write);
}
}
}
#[async_trait]
impl GrpcClient for MockClient {
async fn write(&self, db_name: &str, write: &DbWrite) -> Result<(), WriteError> {
if self.poisoned.load(Ordering::SeqCst) {
return Err("poisened".to_string().into());
}
self.writes
.write()
.push((db_name.to_string(), write.clone()));
Ok(())
}
fn as_any(&self) -> &dyn Any {
self
}
}
#[cfg(test)]
mod tests {
use mutable_batch_lp::lines_to_batches;
use super::*;
#[tokio::test]
async fn test_mock() {
let client = MockClient::default();
let write1 = DbWrite::new(
lines_to_batches("foo x=1 1", 0).unwrap(),
Default::default(),
);
let write2 = DbWrite::new(
lines_to_batches("foo x=2 2", 0).unwrap(),
Default::default(),
);
let write3 = DbWrite::new(
lines_to_batches("foo x=3 3", 0).unwrap(),
Default::default(),
);
client.write("db1", &write1).await.unwrap();
client.write("db2", &write1).await.unwrap();
client.write("db1", &write2).await.unwrap();
let expected_writes = vec![
(String::from("db1"), write1.clone()),
(String::from("db2"), write1.clone()),
(String::from("db1"), write2.clone()),
];
client.assert_writes(&expected_writes);
client.poison();
client.write("db1", &write3).await.unwrap_err();
client.assert_writes(&expected_writes);
}
#[tokio::test]
#[should_panic(expected = "number of writes differ (1 VS 0)")]
async fn test_assert_writes_fail_count() {
let client = MockClient::default();
let write1 = DbWrite::new(
lines_to_batches("foo x=1 1", 0).unwrap(),
Default::default(),
);
client.write("db1", &write1).await.unwrap();
let expected_writes = [];
client.assert_writes(&expected_writes);
}
#[tokio::test]
#[should_panic(expected = "database names differ (\"db1\" VS \"db2\")")]
async fn test_assert_writes_fail_db_name() {
let client = MockClient::default();
let write = DbWrite::new(
lines_to_batches("foo x=1 1", 0).unwrap(),
Default::default(),
);
client.write("db1", &write).await.unwrap();
let expected_writes = vec![(String::from("db2"), write)];
client.assert_writes(&expected_writes);
}
#[tokio::test]
#[should_panic(expected = "batches for table \"foo\" differ")]
async fn test_assert_writes_fail_batch() {
let client = MockClient::default();
let write1 = DbWrite::new(
lines_to_batches("foo x=1 1", 0).unwrap(),
Default::default(),
);
let write2 = DbWrite::new(
lines_to_batches("foo x=2 2", 0).unwrap(),
Default::default(),
);
client.write("db1", &write1).await.unwrap();
let expected_writes = vec![(String::from("db1"), write2)];
client.assert_writes(&expected_writes);
}
}

View File

@ -8,6 +8,8 @@
clippy::clone_on_ref_ptr
)]
pub mod connection_pool;
pub mod grpc_client;
pub mod resolver;
pub mod router;
pub mod server;

View File

@ -1,5 +1,5 @@
use std::{
collections::{BTreeMap, BTreeSet, HashMap},
collections::{BTreeMap, BTreeSet},
convert::{TryFrom, TryInto},
num::NonZeroU32,
sync::Arc,
@ -129,7 +129,7 @@ impl KafkaBufferProducer {
pub async fn new(
conn: impl Into<String> + Send,
database_name: impl Into<String> + Send,
connection_config: &HashMap<String, String>,
connection_config: &BTreeMap<String, String>,
creation_config: Option<&WriteBufferCreationConfig>,
time_provider: Arc<dyn TimeProvider>,
) -> Result<Self, WriteBufferError> {
@ -313,7 +313,7 @@ impl KafkaBufferConsumer {
conn: impl Into<String> + Send + Sync,
server_id: ServerId,
database_name: impl Into<String> + Send + Sync,
connection_config: &HashMap<String, String>,
connection_config: &BTreeMap<String, String>,
creation_config: Option<&WriteBufferCreationConfig>,
// `trace_collector` has to be a reference due to https://github.com/rust-lang/rust/issues/63033
trace_collector: Option<&Arc<dyn TraceCollector>>,
@ -426,7 +426,7 @@ async fn create_kafka_topic(
kafka_connection: &str,
database_name: &str,
n_sequencers: NonZeroU32,
cfg: &HashMap<String, String>,
cfg: &BTreeMap<String, String>,
) -> Result<(), WriteBufferError> {
let admin = admin_client(kafka_connection)?;
@ -489,7 +489,7 @@ async fn maybe_auto_create_topics(
}
pub mod test_utils {
use std::{collections::HashMap, time::Duration};
use std::{collections::BTreeMap, time::Duration};
use rdkafka::admin::{AdminOptions, AlterConfig, ResourceSpecifier};
use uuid::Uuid;
@ -544,12 +544,12 @@ pub mod test_utils {
}
/// Create topic creation config that is ideal for testing and works with [`purge_kafka_topic`]
pub fn kafka_sequencer_options() -> HashMap<String, String> {
let mut cfg: HashMap<String, String> = Default::default();
cfg.insert("cleanup.policy".to_string(), "delete".to_string());
cfg.insert("retention.ms".to_string(), "-1".to_string());
cfg.insert("segment.ms".to_string(), "10".to_string());
cfg
pub fn kafka_sequencer_options() -> BTreeMap<String, String> {
BTreeMap::from([
("cleanup.policy".to_string(), "delete".to_string()),
("retention.ms".to_string(), "-1".to_string()),
("segment.ms".to_string(), "10".to_string()),
])
}
/// Purge all records from given topic.