fix: Remove router

pull/24376/head
Carol (Nichols || Goulding) 2022-05-04 09:05:29 -04:00
parent ba8191c1eb
commit e0bc1801ac
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
12 changed files with 0 additions and 1773 deletions

27
Cargo.lock generated
View File

@ -2218,7 +2218,6 @@ dependencies = [
"querier",
"query",
"read_buffer",
"router",
"router2",
"rustyline",
"schema",
@ -4694,31 +4693,6 @@ dependencies = [
"winapi",
]
[[package]]
name = "router"
version = "0.1.0"
dependencies = [
"async-trait",
"cache_loader_async",
"data_types",
"dml",
"hashbrown 0.12.0",
"influxdb_iox_client",
"iox_time",
"metric",
"mutable_batch",
"mutable_batch_lp",
"mutable_batch_pb",
"observability_deps",
"parking_lot 0.12.0",
"regex",
"snafu",
"tokio",
"trace",
"workspace-hack",
"write_buffer",
]
[[package]]
name = "router2"
version = "0.1.0"
@ -5267,7 +5241,6 @@ dependencies = [
"query",
"rand",
"regex",
"router",
"service_common",
"snafu",
"test_helpers",

View File

@ -56,7 +56,6 @@ members = [
"query_functions",
"query_tests",
"read_buffer",
"router",
"router2",
"schema",
"server",

View File

@ -42,7 +42,6 @@ predicate = { path = "../predicate" }
querier = { path = "../querier" }
query = { path = "../query" }
read_buffer = { path = "../read_buffer" }
router = { path = "../router" }
router2 = { path = "../router2" }
schema = { path = "../schema" }
server = { path = "../server" }

View File

@ -1,27 +0,0 @@
[package]
name = "router"
version = "0.1.0"
edition = "2021"
[dependencies]
async-trait = "0.1"
cache_loader_async = { version = "0.2.0", features = ["ttl-cache"] }
data_types = { path = "../data_types" }
dml = { path = "../dml" }
hashbrown = "0.12"
influxdb_iox_client = { path = "../influxdb_iox_client" }
metric = { path = "../metric" }
mutable_batch = { path = "../mutable_batch" }
mutable_batch_pb = { path = "../mutable_batch_pb" }
observability_deps = { path = "../observability_deps" }
trace = { path = "../trace" }
parking_lot = "0.12"
snafu = "0.7"
iox_time = { path = "../iox_time" }
write_buffer = { path = "../write_buffer" }
workspace-hack = { path = "../workspace-hack"}
[dev-dependencies]
mutable_batch_lp = { path = "../mutable_batch_lp" }
regex = "1"
tokio = { version = "1.18", features = ["macros", "parking_lot"] }

View File

@ -1,210 +0,0 @@
use std::sync::Arc;
use cache_loader_async::cache_api::LoadingCache;
use data_types::write_buffer::WriteBufferConnection;
use observability_deps::tracing::debug;
use trace::TraceCollector;
use write_buffer::{
config::WriteBufferConfigFactory,
core::{WriteBufferError, WriteBufferWriting},
};
use crate::grpc_client::GrpcClient;
type KeyWriteBufferProducer = (String, WriteBufferConnection);
pub type ConnectionError = Arc<dyn std::error::Error + Send + Sync + 'static>;
/// Stupid hack to fit the `Box<dyn ...>` in `WriteBufferError` into an `Arc`
struct EWrapper(WriteBufferError);
impl std::fmt::Debug for EWrapper {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}
impl std::fmt::Display for EWrapper {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}
impl std::error::Error for EWrapper {}
// ==================== Hacks ====================
// We need to hide a few types from rustc because otherwise we end up with a bunch of "implementation of ... is not
// general enough" errors downstream. This is very likely a bug:
//
// - https://github.com/rust-lang/rust/issues/41078
// - https://github.com/rust-lang/rust/issues/71723
// - https://github.com/rust-lang/rust/issues/87425
#[derive(Debug, Clone)]
struct ArcDynGrpcClient(Arc<dyn GrpcClient>);
#[derive(Debug, Clone)]
struct ArcDynWriteBufferWriting(Arc<dyn WriteBufferWriting>);
#[derive(Debug, Clone)]
struct ConnectionErrorWrapped(ConnectionError);
// ===============================================
type CacheEntry<T> = cache_loader_async::cache_api::CacheEntry<T, ConnectionErrorWrapped>;
type HashMapBacking<K, V> = cache_loader_async::backing::HashMapBacking<K, CacheEntry<V>>;
type LoadingCacheHash<K, V> = LoadingCache<K, V, ConnectionErrorWrapped, HashMapBacking<K, V>>;
/// Connection pool for the entire routing server.
///
/// This avoids:
/// 1. That every [`Router`](crate::router::Router) uses their own connections
/// 2. That we open too many connections in total.
pub struct ConnectionPool {
grpc_clients: LoadingCacheHash<String, ArcDynGrpcClient>,
write_buffer_producers: LoadingCacheHash<KeyWriteBufferProducer, ArcDynWriteBufferWriting>,
}
impl std::fmt::Debug for ConnectionPool {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ConnectionPool").finish_non_exhaustive()
}
}
impl ConnectionPool {
/// Create new connection pool.
///
/// If `use_mock_grpc` is set only mock gRPC clients are created.
pub async fn new(
use_mock_grpc: bool,
wb_factory: Arc<WriteBufferConfigFactory>,
trace_collector: Option<Arc<dyn TraceCollector>>,
) -> Self {
// Note: this function is async even though it does not contain any `.await` calls because `LoadingCache::new`
// requires tokio to be running and even if documented people will forget about this.
// let grpc_clients = Arc::new(LoadingCache::new(foo));
let grpc_clients = if use_mock_grpc {
LoadingCache::new(|_connection_string: String| async move {
use crate::grpc_client::MockClient;
Ok(ArcDynGrpcClient(
Arc::new(MockClient::default()) as Arc<dyn GrpcClient>
))
})
} else {
LoadingCache::new(|connection_string: String| async move {
use crate::grpc_client::RealClient;
use influxdb_iox_client::connection::Builder;
let connection = Builder::default()
.build(&connection_string)
.await
.map_err(|e| ConnectionErrorWrapped(Arc::new(e) as ConnectionError))?;
Ok(ArcDynGrpcClient(
Arc::new(RealClient::new(connection)) as Arc<dyn GrpcClient>
))
})
};
let write_buffer_producers = LoadingCache::new(move |key: KeyWriteBufferProducer| {
let wb_factory = Arc::clone(&wb_factory);
let trace_collector = trace_collector.clone();
async move {
wb_factory
.new_config_write(&key.0, trace_collector.as_ref(), &key.1)
.await
.map(ArcDynWriteBufferWriting)
.map_err(|e| ConnectionErrorWrapped(Arc::new(EWrapper(e)) as ConnectionError))
}
});
Self {
grpc_clients,
write_buffer_producers,
}
}
/// Create new connection factory for testing purposes.
#[cfg(test)]
pub async fn new_testing() -> Self {
use iox_time::SystemProvider;
let time_provider = Arc::new(SystemProvider::new());
let metric_registry = Arc::new(metric::Registry::new());
Self::new(
true,
Arc::new(WriteBufferConfigFactory::new(
time_provider,
metric_registry,
)),
None,
)
.await
}
/// Get gRPC client given a connection string.
pub async fn grpc_client(
&self,
connection_string: &str,
) -> Result<Arc<dyn GrpcClient>, ConnectionError> {
let res = self
.grpc_clients
.get_with_meta(connection_string.to_string())
.await
.map_err(|e| Arc::new(e) as ConnectionError)?;
debug!(was_cached=%res.cached, %connection_string, "getting IOx client");
Ok(res.result.0)
}
/// Get write buffer producer given a DB name and config.
pub async fn write_buffer_producer(
&self,
db_name: &str,
cfg: &WriteBufferConnection,
) -> Result<Arc<dyn WriteBufferWriting>, ConnectionError> {
let res = self
.write_buffer_producers
.get_with_meta((db_name.to_string(), cfg.clone()))
.await
.map_err(|e| Arc::new(e) as ConnectionError)?;
debug!(was_cached=%res.cached, %db_name, "getting write buffer");
Ok(res.result.0)
}
}
#[cfg(test)]
mod tests {
use iox_time::{SystemProvider, TimeProvider};
use crate::grpc_client::MockClient;
use super::*;
#[tokio::test]
async fn test_grpc_mocking() {
let time_provider: Arc<dyn TimeProvider> = Arc::new(SystemProvider::new());
let metric_registry = Arc::new(metric::Registry::new());
let pool1 = ConnectionPool::new(
false,
Arc::new(WriteBufferConfigFactory::new(
Arc::clone(&time_provider),
Arc::clone(&metric_registry),
)),
None,
)
.await;
// connection will fail
pool1.grpc_client("foo").await.unwrap_err();
let pool2 = ConnectionPool::new(
true,
Arc::new(WriteBufferConfigFactory::new(
time_provider,
metric_registry,
)),
None,
)
.await;
let client2 = pool2.grpc_client("foo").await.unwrap();
client2.as_any().downcast_ref::<MockClient>().unwrap();
}
}

View File

@ -1,259 +0,0 @@
//! gRPC clients abastraction.
//!
//! This abstraction was created for easier testing.
use std::{
any::Any,
sync::atomic::{AtomicBool, Ordering},
};
use async_trait::async_trait;
use dml::DmlOperation;
use parking_lot::RwLock;
/// Generic write error.
pub type WriteError = Box<dyn std::error::Error + Send + Sync>;
/// An abstract IOx gRPC client.
#[async_trait]
pub trait GrpcClient: Sync + Send + std::fmt::Debug + 'static {
/// Send DML operation to the given database.
async fn write(&self, db_name: &str, write: &DmlOperation) -> Result<(), WriteError>;
/// Cast client to [`Any`], useful for downcasting.
fn as_any(&self) -> &dyn Any;
}
/// A real, network-driven gRPC client.
#[derive(Debug)]
pub struct RealClient {
/// Delete client for IOx.
delete_client: influxdb_iox_client::delete::Client,
/// Write client for IOx.
write_client: influxdb_iox_client::write::Client,
}
impl RealClient {
/// Create new client from established connection.
pub fn new(connection: influxdb_iox_client::connection::Connection) -> Self {
Self {
delete_client: influxdb_iox_client::delete::Client::new(connection.clone()),
write_client: influxdb_iox_client::write::Client::new(connection),
}
}
}
#[async_trait]
impl GrpcClient for RealClient {
async fn write(&self, db_name: &str, write: &DmlOperation) -> Result<(), WriteError> {
use influxdb_iox_client::write::generated_types::WriteRequest;
use mutable_batch_pb::encode::encode_write;
match write {
DmlOperation::Write(write) => {
let write_request = WriteRequest {
database_batch: Some(encode_write(db_name, write)),
};
// cheap, see https://docs.rs/tonic/0.4.2/tonic/client/index.html#concurrent-usage
let mut client = self.write_client.clone();
client
.write_pb(write_request)
.await
.map_err(|e| Box::new(e) as WriteError)?;
Ok(())
}
DmlOperation::Delete(delete) => {
// cheap, see https://docs.rs/tonic/0.4.2/tonic/client/index.html#concurrent-usage
let mut client = self.delete_client.clone();
client
.delete(
db_name.to_owned(),
delete
.table_name()
.map(|s| s.to_owned())
.unwrap_or_default(),
delete.predicate().clone().into(),
)
.await
.map_err(|e| Box::new(e) as _)
}
}
}
fn as_any(&self) -> &dyn Any {
self
}
}
/// Mock client for testing.
#[derive(Debug, Default)]
pub struct MockClient {
/// All writes recorded by this client.
writes: RwLock<Vec<(String, DmlOperation)>>,
/// Poisen pill.
///
/// If set to `true` all writes will fail.
poisoned: AtomicBool,
}
impl MockClient {
/// Take poison pill.
///
/// All subsequent writes will fail.
pub fn poison(&self) {
self.poisoned.store(true, Ordering::SeqCst)
}
/// Get a copy of all recorded writes.
pub fn writes(&self) -> Vec<(String, DmlOperation)> {
self.writes.read().clone()
}
/// Assert that writes are as expected.
pub fn assert_writes(&self, expected: &[(String, DmlOperation)]) {
use dml::test_util::assert_op_eq;
let actual = self.writes();
assert_eq!(
actual.len(),
expected.len(),
"number of writes differ ({} VS {})",
actual.len(),
expected.len()
);
for ((actual_db, actual_write), (expected_db, expected_write)) in
actual.iter().zip(expected)
{
assert_eq!(
actual_db, expected_db,
"database names differ (\"{}\" VS \"{}\")",
actual_db, expected_db
);
assert_op_eq(actual_write, expected_write);
}
}
}
#[async_trait]
impl GrpcClient for MockClient {
async fn write(&self, db_name: &str, write: &DmlOperation) -> Result<(), WriteError> {
if self.poisoned.load(Ordering::SeqCst) {
return Err("poisened".to_string().into());
}
self.writes
.write()
.push((db_name.to_string(), write.clone()));
Ok(())
}
fn as_any(&self) -> &dyn Any {
self
}
}
#[cfg(test)]
mod tests {
use dml::DmlWrite;
use mutable_batch_lp::lines_to_batches;
use super::*;
#[tokio::test]
async fn test_mock() {
let client = MockClient::default();
let write1 = DmlOperation::Write(DmlWrite::new(
"test_db",
lines_to_batches("foo x=1 1", 0).unwrap(),
Default::default(),
));
let write2 = DmlOperation::Write(DmlWrite::new(
"test_db",
lines_to_batches("foo x=2 2", 0).unwrap(),
Default::default(),
));
let write3 = DmlOperation::Write(DmlWrite::new(
"test_db",
lines_to_batches("foo x=3 3", 0).unwrap(),
Default::default(),
));
client.write("db1", &write1).await.unwrap();
client.write("db2", &write1).await.unwrap();
client.write("db1", &write2).await.unwrap();
let expected_writes = vec![
(String::from("db1"), write1.clone()),
(String::from("db2"), write1.clone()),
(String::from("db1"), write2.clone()),
];
client.assert_writes(&expected_writes);
client.poison();
client.write("db1", &write3).await.unwrap_err();
client.assert_writes(&expected_writes);
}
#[tokio::test]
#[should_panic(expected = "number of writes differ (1 VS 0)")]
async fn test_assert_writes_fail_count() {
let client = MockClient::default();
let write1 = DmlOperation::Write(DmlWrite::new(
"test_db",
lines_to_batches("foo x=1 1", 0).unwrap(),
Default::default(),
));
client.write("db1", &write1).await.unwrap();
let expected_writes = [];
client.assert_writes(&expected_writes);
}
#[tokio::test]
#[should_panic(expected = "database names differ (\"db1\" VS \"db2\")")]
async fn test_assert_writes_fail_db_name() {
let client = MockClient::default();
let write = DmlOperation::Write(DmlWrite::new(
"test_db",
lines_to_batches("foo x=1 1", 0).unwrap(),
Default::default(),
));
client.write("db1", &write).await.unwrap();
let expected_writes = vec![(String::from("db2"), write)];
client.assert_writes(&expected_writes);
}
#[tokio::test]
#[should_panic(expected = "batches for table \"foo\" differ")]
async fn test_assert_writes_fail_batch() {
let client = MockClient::default();
let write1 = DmlOperation::Write(DmlWrite::new(
"test_db",
lines_to_batches("foo x=1 1", 0).unwrap(),
Default::default(),
));
let write2 = DmlOperation::Write(DmlWrite::new(
"test_db",
lines_to_batches("foo x=2 2", 0).unwrap(),
Default::default(),
));
client.write("db1", &write1).await.unwrap();
let expected_writes = vec![(String::from("db1"), write2)];
client.assert_writes(&expected_writes);
}
}

View File

@ -1,16 +0,0 @@
#![deny(rustdoc::broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
#![warn(
missing_copy_implementations,
missing_debug_implementations,
clippy::explicit_iter_loop,
clippy::future_not_send,
clippy::use_self,
clippy::clone_on_ref_ptr
)]
pub mod connection_pool;
pub mod grpc_client;
pub mod resolver;
pub mod router;
pub mod server;
pub mod write_sink;

View File

@ -1,156 +0,0 @@
use data_types::server_id::ServerId;
use parking_lot::RwLock;
use std::collections::BTreeMap;
/// A RemoteTemplate string is a remote connection template string.
/// Occurrences of the substring "{id}" in the template will be replaced
/// by the server ID.
#[derive(Debug)]
pub struct RemoteTemplate {
template: String,
}
impl RemoteTemplate {
pub fn new(template: impl Into<String>) -> Self {
let template = template.into();
Self { template }
}
fn get(&self, id: &ServerId) -> String {
self.template.replace("{id}", &format!("{}", id.get_u32()))
}
}
/// The Resolver provides a mapping between ServerId and GRpcConnectionString
#[derive(Debug)]
pub struct Resolver {
/// Map between remote IOx server IDs and management API connection strings.
remotes: RwLock<BTreeMap<ServerId, String>>,
/// Static map between remote server IDs and hostnames based on a template
remote_template: Option<RemoteTemplate>,
}
impl Resolver {
pub fn new(remote_template: Option<RemoteTemplate>) -> Self {
Self {
remotes: Default::default(),
remote_template,
}
}
/// Get all registered remote servers, sorted by server ID.
pub fn remotes(&self) -> Vec<(ServerId, String)> {
self.remotes
.read()
.iter()
.map(|(&a, b)| (a, b.clone()))
.collect()
}
/// Update given remote server.
pub fn update_remote(&self, id: ServerId, addr: String) -> bool {
self.remotes.write().insert(id, addr).is_some()
}
/// Delete remote server by ID.
pub fn delete_remote(&self, id: ServerId) -> bool {
self.remotes.write().remove(&id).is_some()
}
/// Get remote server by ID.
pub fn resolve_remote(&self, id: ServerId) -> Option<String> {
self.remotes
.read()
.get(&id)
.cloned()
.or_else(|| self.remote_template.as_ref().map(|t| t.get(&id)))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_remote_crud() {
let resolver = Resolver::new(None);
let id1 = ServerId::try_from(1).unwrap();
let id2 = ServerId::try_from(2).unwrap();
// no remotes
assert_eq!(resolver.remotes().len(), 0);
assert!(!resolver.delete_remote(id1));
// add remotes
assert!(!resolver.update_remote(id2, String::from("bar")));
assert!(!resolver.update_remote(id1, String::from("foo")));
let remotes = resolver.remotes();
assert_eq!(remotes.len(), 2);
assert_eq!(&remotes[0], &(id1, String::from("foo")));
assert_eq!(&remotes[1], &(id2, String::from("bar")));
// update remote
assert!(resolver.update_remote(id1, String::from(":)")));
let remotes = resolver.remotes();
assert_eq!(remotes.len(), 2);
assert_eq!(&remotes[0], &(id1, String::from(":)")));
assert_eq!(&remotes[1], &(id2, String::from("bar")));
// delete remotes
assert!(resolver.delete_remote(id1));
let remotes = resolver.remotes();
assert_eq!(remotes.len(), 1);
assert_eq!(&remotes[0], &(id2, String::from("bar")));
assert!(!resolver.delete_remote(id1));
}
#[test]
fn resolve_remote() {
let resolver = Resolver::new(Some(RemoteTemplate::new("http://iox-query-{id}:8082")));
resolver.update_remote(
ServerId::try_from(1).unwrap(),
String::from("http://iox-foo:1234"),
);
resolver.update_remote(
ServerId::try_from(2).unwrap(),
String::from("http://iox-bar:5678"),
);
assert_eq!(
resolver.resolve_remote(ServerId::try_from(1).unwrap()),
Some(String::from("http://iox-foo:1234"))
);
assert_eq!(
resolver.resolve_remote(ServerId::try_from(2).unwrap()),
Some(String::from("http://iox-bar:5678"))
);
assert_eq!(
resolver.resolve_remote(ServerId::try_from(42).unwrap()),
Some(String::from("http://iox-query-42:8082"))
);
assert_eq!(
resolver.resolve_remote(ServerId::try_from(24).unwrap()),
Some(String::from("http://iox-query-24:8082"))
);
}
#[test]
fn resolve_remote_without_template() {
let resolver = Resolver::new(None);
resolver.update_remote(
ServerId::try_from(1).unwrap(),
String::from("http://iox-foo:1234"),
);
assert_eq!(
resolver.resolve_remote(ServerId::try_from(1).unwrap()),
Some(String::from("http://iox-foo:1234"))
);
assert_eq!(
resolver.resolve_remote(ServerId::try_from(42).unwrap()),
None,
);
}
}

View File

@ -1,432 +0,0 @@
use std::{
collections::{BTreeMap, HashMap},
fmt::Write,
sync::Arc,
};
use data_types::router::{Router as RouterConfig, ShardId};
use dml::DmlOperation;
use snafu::{ResultExt, Snafu};
use crate::{connection_pool::ConnectionPool, resolver::Resolver, write_sink::WriteSinkSet};
#[derive(Debug, Snafu)]
pub enum WriteErrorShard {
#[snafu(display("Did not find sink set for shard ID {}", shard_id.get()))]
NoSinkSetFound { shard_id: ShardId },
#[snafu(display("Write to sink set failed: {}", source))]
SinkSetFailure { source: crate::write_sink::Error },
}
#[derive(Debug, Snafu)]
pub enum WriteError {
#[snafu(display("One or more writes failed: {}", fmt_write_errors(errors)))]
MultiWriteFailure {
errors: BTreeMap<ShardId, WriteErrorShard>,
},
}
fn fmt_write_errors(errors: &BTreeMap<ShardId, WriteErrorShard>) -> String {
const MAX_ERRORS: usize = 2;
let mut out = String::new();
for (shard_id, error) in errors.iter().take(MAX_ERRORS) {
if !out.is_empty() {
write!(&mut out, ", ").expect("write to string failed?!");
}
write!(&mut out, "{} => \"{}\"", shard_id, error).expect("write to string failed?!");
}
if errors.len() > MAX_ERRORS {
write!(&mut out, "...").expect("write to string failed?!");
}
out
}
/// Router for a single database.
#[derive(Debug)]
pub struct Router {
/// Router config.
config: RouterConfig,
/// We use a [`HashMap`] here for `O(1)` lookups. Do not rely on the iteration order.
write_sink_sets: HashMap<ShardId, WriteSinkSet>,
}
impl Router {
/// Create new router from config.
pub fn new(
config: RouterConfig,
resolver: Arc<Resolver>,
connection_pool: Arc<ConnectionPool>,
) -> Self {
let write_sink_sets = config
.write_sinks
.iter()
.map(|(shard_id, set_config)| {
(
*shard_id,
WriteSinkSet::new(
&config.name,
set_config.clone(),
Arc::clone(&resolver),
Arc::clone(&connection_pool),
),
)
})
.collect();
Self {
config,
write_sink_sets,
}
}
/// Router config.
pub fn config(&self) -> &RouterConfig {
&self.config
}
/// Router name.
///
/// This is the same as the database that this router acts for.
pub fn name(&self) -> &str {
&self.config.name
}
/// Shard and write data.
pub async fn write(&self, operation: DmlOperation) -> Result<(), WriteError> {
let mut errors: BTreeMap<ShardId, WriteErrorShard> = Default::default();
// The iteration order is stable here, so we ensure deterministic behavior and error order.
for (shard_id, operation) in operation.shard(&self.config.write_sharder) {
if let Err(e) = self.write_shard(shard_id, &operation).await {
errors.insert(shard_id, e);
}
}
if errors.is_empty() {
Ok(())
} else {
Err(WriteError::MultiWriteFailure { errors })
}
}
/// Write operation to the specified shard.
async fn write_shard(
&self,
shard_id: ShardId,
operation: &DmlOperation,
) -> Result<(), WriteErrorShard> {
match self.write_sink_sets.get(&shard_id) {
Some(sink_set) => sink_set.write(operation).await.context(SinkSetFailureSnafu),
None => Err(WriteErrorShard::NoSinkSetFound { shard_id }),
}
}
}
#[cfg(test)]
mod tests {
use crate::{grpc_client::MockClient, resolver::RemoteTemplate};
use super::*;
use data_types::{
delete_predicate::DeletePredicate,
non_empty::NonEmptyString,
router::{
Matcher, MatcherToShard, ShardConfig, WriteSink as WriteSinkConfig,
WriteSinkSet as WriteSinkSetConfig, WriteSinkVariant as WriteSinkVariantConfig,
},
sequence::Sequence,
server_id::ServerId,
timestamp::TimestampRange,
};
use dml::{DmlDelete, DmlMeta, DmlWrite};
use iox_time::Time;
use mutable_batch_lp::lines_to_batches;
use regex::Regex;
#[tokio::test]
async fn test_getters() {
let resolver = Arc::new(Resolver::new(None));
let connection_pool = Arc::new(ConnectionPool::new_testing().await);
let cfg = RouterConfig {
name: String::from("my_router"),
write_sharder: Default::default(),
write_sinks: Default::default(),
query_sinks: Default::default(),
};
let router = Router::new(cfg.clone(), resolver, connection_pool);
assert_eq!(router.config(), &cfg);
assert_eq!(router.name(), "my_router");
}
#[tokio::test]
async fn test_write() {
let server_id_1 = ServerId::try_from(1).unwrap();
let server_id_2 = ServerId::try_from(2).unwrap();
let server_id_3 = ServerId::try_from(3).unwrap();
let resolver = Arc::new(Resolver::new(Some(RemoteTemplate::new("{id}"))));
let connection_pool = Arc::new(ConnectionPool::new_testing().await);
let client_1 = connection_pool.grpc_client("1").await.unwrap();
let client_2 = connection_pool.grpc_client("2").await.unwrap();
let client_3 = connection_pool.grpc_client("3").await.unwrap();
let client_1 = client_1.as_any().downcast_ref::<MockClient>().unwrap();
let client_2 = client_2.as_any().downcast_ref::<MockClient>().unwrap();
let client_3 = client_3.as_any().downcast_ref::<MockClient>().unwrap();
let cfg = RouterConfig {
name: String::from("my_router"),
write_sharder: ShardConfig {
specific_targets: vec![
MatcherToShard {
matcher: Matcher {
table_name_regex: Some(Regex::new("foo_bar").unwrap()),
},
shard: ShardId::new(10),
},
MatcherToShard {
matcher: Matcher {
table_name_regex: Some(Regex::new("foo_three").unwrap()),
},
shard: ShardId::new(30),
},
MatcherToShard {
matcher: Matcher {
table_name_regex: Some(Regex::new("foo_.*").unwrap()),
},
shard: ShardId::new(20),
},
MatcherToShard {
matcher: Matcher {
table_name_regex: Some(Regex::new("doom").unwrap()),
},
shard: ShardId::new(40),
},
MatcherToShard {
matcher: Matcher {
table_name_regex: Some(Regex::new("nooo").unwrap()),
},
shard: ShardId::new(50),
},
MatcherToShard {
matcher: Matcher {
table_name_regex: Some(Regex::new(".*").unwrap()),
},
shard: ShardId::new(20),
},
],
hash_ring: None,
},
write_sinks: BTreeMap::from([
(
ShardId::new(10),
WriteSinkSetConfig {
sinks: vec![WriteSinkConfig {
sink: WriteSinkVariantConfig::GrpcRemote(server_id_1),
ignore_errors: false,
}],
},
),
(
ShardId::new(20),
WriteSinkSetConfig {
sinks: vec![WriteSinkConfig {
sink: WriteSinkVariantConfig::GrpcRemote(server_id_2),
ignore_errors: false,
}],
},
),
(
ShardId::new(30),
WriteSinkSetConfig {
sinks: vec![WriteSinkConfig {
sink: WriteSinkVariantConfig::GrpcRemote(server_id_3),
ignore_errors: false,
}],
},
),
]),
query_sinks: Default::default(),
};
let router = Router::new(cfg.clone(), resolver, connection_pool);
// clean write
let meta_1 = DmlMeta::sequenced(
Sequence::new(1, 2),
Time::from_timestamp_nanos(1337),
None,
10,
);
let write_1 = db_write(
&["foo_x x=2 2", "foo_bar x=1 1", "foo_y x=3 3", "www x=4 4"],
&meta_1,
);
router.write(write_1).await.unwrap();
client_1.assert_writes(&[(
String::from("my_router"),
db_write(&["foo_bar x=1 1"], &meta_1),
)]);
client_2.assert_writes(&[(
String::from("my_router"),
db_write(&["foo_x x=2 2", "foo_y x=3 3", "www x=4 4"], &meta_1),
)]);
// write w/ errors
client_2.poison();
let meta_2 = DmlMeta::sequenced(
Sequence::new(3, 4),
Time::from_timestamp_nanos(42),
None,
20,
);
let write_2 = db_write(
&[
"foo_bar x=5 5",
"doom x=6 6",
"foo_bar x=7 7",
"www x=8 8",
"foo_bar x=9 9",
"nooo x=10 10",
"foo_bar x=11 11",
"foo_three x=12 12",
"doom x=13 13",
"foo_three x=14 14",
"www x=15 15",
"foo_three x=16 16",
"nooo x=17 17",
"foo_three x=18 18",
],
&meta_2,
);
let err = router.write(write_2).await.unwrap_err();
assert_eq!(err.to_string(), "One or more writes failed: ShardId(20) => \"Write to sink set failed: Cannot write: poisened\", ShardId(40) => \"Did not find sink set for shard ID 40\"...");
client_1.assert_writes(&[
(
String::from("my_router"),
db_write(&["foo_bar x=1 1"], &meta_1),
),
(
String::from("my_router"),
db_write(
&[
"foo_bar x=5 5",
"foo_bar x=7 7",
"foo_bar x=9 9",
"foo_bar x=11 11",
],
&meta_2,
),
),
]);
client_2.assert_writes(&[(
String::from("my_router"),
db_write(&["foo_x x=2 2", "foo_y x=3 3", "www x=4 4"], &meta_1),
)]);
client_3.assert_writes(&[(
String::from("my_router"),
db_write(
&[
"foo_three x=12 12",
"foo_three x=14 14",
"foo_three x=16 16",
"foo_three x=18 18",
],
&meta_2,
),
)]);
}
#[tokio::test]
async fn test_delete() {
let server_id_1 = ServerId::try_from(1).unwrap();
let server_id_2 = ServerId::try_from(2).unwrap();
let resolver = Arc::new(Resolver::new(Some(RemoteTemplate::new("{id}"))));
let connection_pool = Arc::new(ConnectionPool::new_testing().await);
let client_1 = connection_pool.grpc_client("1").await.unwrap();
let client_2 = connection_pool.grpc_client("2").await.unwrap();
let client_1 = client_1.as_any().downcast_ref::<MockClient>().unwrap();
let client_2 = client_2.as_any().downcast_ref::<MockClient>().unwrap();
let cfg = RouterConfig {
name: String::from("my_router"),
write_sharder: ShardConfig {
specific_targets: vec![
MatcherToShard {
matcher: Matcher {
table_name_regex: Some(Regex::new("foo_bar").unwrap()),
},
shard: ShardId::new(10),
},
MatcherToShard {
matcher: Matcher {
table_name_regex: Some(Regex::new("foo_.*").unwrap()),
},
shard: ShardId::new(20),
},
],
hash_ring: None,
},
write_sinks: BTreeMap::from([
(
ShardId::new(10),
WriteSinkSetConfig {
sinks: vec![WriteSinkConfig {
sink: WriteSinkVariantConfig::GrpcRemote(server_id_1),
ignore_errors: false,
}],
},
),
(
ShardId::new(20),
WriteSinkSetConfig {
sinks: vec![WriteSinkConfig {
sink: WriteSinkVariantConfig::GrpcRemote(server_id_2),
ignore_errors: false,
}],
},
),
]),
query_sinks: Default::default(),
};
let router = Router::new(cfg.clone(), resolver, connection_pool);
// clean write
let meta = DmlMeta::sequenced(
Sequence::new(1, 2),
Time::from_timestamp_nanos(1337),
None,
10,
);
let delete = DmlOperation::Delete(DmlDelete::new(
"test_db",
DeletePredicate {
range: TimestampRange::new(1, 2),
exprs: vec![],
},
Some(NonEmptyString::new("foo_foo").unwrap()),
meta,
));
router.write(delete.clone()).await.unwrap();
client_1.assert_writes(&[]);
client_2.assert_writes(&[(String::from("my_router"), delete)]);
}
fn db_write(lines: &[&str], meta: &DmlMeta) -> DmlOperation {
DmlOperation::Write(DmlWrite::new(
"test_db",
lines_to_batches(&lines.join("\n"), 0).unwrap(),
meta.clone(),
))
}
}

View File

@ -1,266 +0,0 @@
use std::{collections::BTreeMap, sync::Arc};
use data_types::{router::Router as RouterConfig, server_id::ServerId};
use iox_time::TimeProvider;
use metric::Registry as MetricRegistry;
use parking_lot::RwLock;
use snafu::Snafu;
use trace::TraceCollector;
use write_buffer::config::WriteBufferConfigFactory;
use crate::{
connection_pool::ConnectionPool,
resolver::{RemoteTemplate, Resolver},
router::Router,
};
#[derive(Debug, Snafu)]
#[allow(missing_copy_implementations)]
pub enum SetServerIdError {
#[snafu(display("id already set: {}", server_id))]
AlreadySet { server_id: ServerId },
}
/// Main entry point to manage a router node.
#[derive(Debug)]
pub struct RouterServer {
server_id: RwLock<Option<ServerId>>,
metric_registry: Arc<MetricRegistry>,
trace_collector: Option<Arc<dyn TraceCollector>>,
routers: RwLock<BTreeMap<String, Arc<Router>>>,
resolver: Arc<Resolver>,
connection_pool: Arc<ConnectionPool>,
}
impl RouterServer {
pub async fn new(
remote_template: Option<RemoteTemplate>,
trace_collector: Option<Arc<dyn TraceCollector>>,
time_provider: Arc<dyn TimeProvider>,
) -> Self {
Self::new_inner(remote_template, trace_collector, time_provider, None, false).await
}
pub async fn for_testing(
remote_template: Option<RemoteTemplate>,
trace_collector: Option<Arc<dyn TraceCollector>>,
time_provider: Arc<dyn TimeProvider>,
wb_factory: Option<Arc<WriteBufferConfigFactory>>,
) -> Self {
Self::new_inner(
remote_template,
trace_collector,
time_provider,
wb_factory,
true,
)
.await
}
async fn new_inner(
remote_template: Option<RemoteTemplate>,
trace_collector: Option<Arc<dyn TraceCollector>>,
time_provider: Arc<dyn TimeProvider>,
wb_factory: Option<Arc<WriteBufferConfigFactory>>,
use_mock_grpc: bool,
) -> Self {
let metric_registry = Arc::new(metric::Registry::new());
let wb_factory = wb_factory.unwrap_or_else(|| {
Arc::new(WriteBufferConfigFactory::new(
time_provider,
Arc::clone(&metric_registry),
))
});
let connection_pool =
Arc::new(ConnectionPool::new(use_mock_grpc, wb_factory, trace_collector.clone()).await);
Self {
server_id: RwLock::new(None),
metric_registry,
trace_collector,
routers: Default::default(),
resolver: Arc::new(Resolver::new(remote_template)),
connection_pool,
}
}
/// Get server ID, if any.
pub fn server_id(&self) -> Option<ServerId> {
*self.server_id.read()
}
/// Set server ID.
///
/// # Error
/// This will fail when an ID is already set to a different ID.
pub fn set_server_id(&self, server_id: ServerId) -> Result<(), SetServerIdError> {
let mut guard = self.server_id.write();
match *guard {
Some(existing) if existing == server_id => Ok(()),
Some(existing) => Err(SetServerIdError::AlreadySet {
server_id: existing,
}),
None => {
*guard = Some(server_id);
Ok(())
}
}
}
/// Metric registry associated with this server.
pub fn metric_registry(&self) -> &Arc<MetricRegistry> {
&self.metric_registry
}
/// Trace collector associated with this server.
pub fn trace_collector(&self) -> &Option<Arc<dyn TraceCollector>> {
&self.trace_collector
}
/// List all routers, sorted by name,
pub fn routers(&self) -> Vec<Arc<Router>> {
self.routers.read().values().cloned().collect()
}
/// Update or create router.
///
/// Returns `true` if the router already existed.
pub fn update_router(&self, config: RouterConfig) -> bool {
let router = Router::new(
config,
Arc::clone(&self.resolver),
Arc::clone(&self.connection_pool),
);
self.routers
.write()
.insert(router.name().to_string(), Arc::new(router))
.is_some()
}
/// Delete router.
///
/// Returns `true` if the router existed.
pub fn delete_router(&self, router_name: &str) -> bool {
self.routers.write().remove(router_name).is_some()
}
/// Get registered router, if any.
///
/// The router name is identical to the database for which this router handles data.
pub fn router(&self, router_name: &str) -> Option<Arc<Router>> {
self.routers.read().get(router_name).cloned()
}
/// Resolver associated with this server.
pub fn resolver(&self) -> &Arc<Resolver> {
&self.resolver
}
/// Connection pool associated with this server.
pub fn connection_pool(&self) -> &Arc<ConnectionPool> {
&self.connection_pool
}
}
pub mod test_utils {
use std::sync::Arc;
use iox_time::SystemProvider;
use super::RouterServer;
pub async fn make_router_server() -> RouterServer {
RouterServer::new(None, None, Arc::new(SystemProvider::new())).await
}
}
#[cfg(test)]
mod tests {
use data_types::router::QuerySinks;
use crate::server::test_utils::make_router_server;
use super::*;
#[tokio::test]
async fn test_server_id() {
let id13 = ServerId::try_from(13).unwrap();
let id42 = ServerId::try_from(42).unwrap();
// server starts w/o any ID
let server = make_router_server().await;
assert_eq!(server.server_id(), None);
// setting ID
server.set_server_id(id13).unwrap();
assert_eq!(server.server_id(), Some(id13));
// setting a 2nd time to the same value should work
server.set_server_id(id13).unwrap();
assert_eq!(server.server_id(), Some(id13));
// chaning the ID fails
let err = server.set_server_id(id42).unwrap_err();
assert!(matches!(err, SetServerIdError::AlreadySet { .. }));
}
#[tokio::test]
async fn test_router_crud() {
let server = make_router_server().await;
let cfg_foo_1 = RouterConfig {
name: String::from("foo"),
write_sharder: Default::default(),
write_sinks: Default::default(),
query_sinks: Default::default(),
};
let cfg_foo_2 = RouterConfig {
query_sinks: QuerySinks {
grpc_remotes: vec![ServerId::try_from(1).unwrap()],
},
..cfg_foo_1.clone()
};
assert_ne!(cfg_foo_1, cfg_foo_2);
let cfg_bar = RouterConfig {
name: String::from("bar"),
write_sharder: Default::default(),
write_sinks: Default::default(),
query_sinks: Default::default(),
};
// no routers
assert_eq!(server.routers().len(), 0);
assert!(!server.delete_router("foo"));
// add routers
assert!(!server.update_router(cfg_foo_1.clone()));
assert!(!server.update_router(cfg_bar.clone()));
let routers = server.routers();
assert_eq!(routers.len(), 2);
assert_eq!(routers[0].config(), &cfg_bar);
assert_eq!(routers[1].config(), &cfg_foo_1);
assert_eq!(server.router("bar").unwrap().config(), &cfg_bar);
assert_eq!(server.router("foo").unwrap().config(), &cfg_foo_1);
// update router
assert!(server.update_router(cfg_foo_2.clone()));
let routers = server.routers();
assert_eq!(routers.len(), 2);
assert_eq!(routers[0].config(), &cfg_bar);
assert_eq!(routers[1].config(), &cfg_foo_2);
assert_eq!(server.router("bar").unwrap().config(), &cfg_bar);
assert_eq!(server.router("foo").unwrap().config(), &cfg_foo_2);
// delete routers
assert!(server.delete_router("foo"));
let routers = server.routers();
assert_eq!(routers.len(), 1);
assert_eq!(routers[0].config(), &cfg_bar);
assert_eq!(server.router("bar").unwrap().config(), &cfg_bar);
assert!(server.router("foo").is_none());
// deleting router a 2nd time works
assert!(!server.delete_router("foo"));
}
}

View File

@ -1,377 +0,0 @@
use std::sync::Arc;
use data_types::{
router::{
WriteSink as WriteSinkConfig, WriteSinkSet as WriteSinkSetConfig,
WriteSinkVariant as WriteSinkVariantConfig,
},
server_id::ServerId,
write_buffer::WriteBufferConnection,
};
use dml::DmlOperation;
use snafu::{OptionExt, ResultExt, Snafu};
use crate::{
connection_pool::{ConnectionError, ConnectionPool},
resolver::Resolver,
};
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("No remote for server ID {}", server_id))]
NoRemote { server_id: ServerId },
#[snafu(display("Cannot connect: {}", source))]
ConnectionFailure { source: ConnectionError },
#[snafu(display("Cannot write: {}", source))]
WriteFailure {
source: Box<dyn std::error::Error + Send + Sync>,
},
}
#[derive(Debug)]
struct VariantGrpcRemote {
db_name: String,
server_id: ServerId,
resolver: Arc<Resolver>,
connection_pool: Arc<ConnectionPool>,
}
impl VariantGrpcRemote {
fn new(
db_name: String,
server_id: ServerId,
resolver: Arc<Resolver>,
connection_pool: Arc<ConnectionPool>,
) -> Self {
Self {
db_name,
server_id,
resolver,
connection_pool,
}
}
async fn write(&self, write: &DmlOperation) -> Result<(), Error> {
let connection_string =
self.resolver
.resolve_remote(self.server_id)
.context(NoRemoteSnafu {
server_id: self.server_id,
})?;
let client = self
.connection_pool
.grpc_client(&connection_string)
.await
.context(ConnectionFailureSnafu)?;
client
.write(&self.db_name, write)
.await
.context(WriteFailureSnafu)
}
}
#[derive(Debug)]
struct VariantWriteBuffer {
db_name: String,
write_buffer_cfg: WriteBufferConnection,
connection_pool: Arc<ConnectionPool>,
}
impl VariantWriteBuffer {
fn new(
db_name: String,
write_buffer_cfg: WriteBufferConnection,
connection_pool: Arc<ConnectionPool>,
) -> Self {
Self {
db_name,
write_buffer_cfg,
connection_pool,
}
}
async fn write(&self, operation: &DmlOperation) -> Result<(), Error> {
let write_buffer = self
.connection_pool
.write_buffer_producer(&self.db_name, &self.write_buffer_cfg)
.await
.context(ConnectionFailureSnafu)?;
// TODO(marco): use multiple sequencers
write_buffer
.store_operation(0, operation)
.await
.map_err(|e| Error::WriteFailure {
source: Box::new(e),
})?;
Ok(())
}
}
#[derive(Debug)]
enum WriteSinkVariant {
/// Send write to a remote server via gRPC
GrpcRemote(VariantGrpcRemote),
/// Send write to a write buffer (which may be backed by kafka, local disk, etc)
WriteBuffer(VariantWriteBuffer),
}
/// Write sink abstraction.
#[derive(Debug)]
pub struct WriteSink {
ignore_errors: bool,
variant: WriteSinkVariant,
}
impl WriteSink {
pub fn new(
db_name: &str,
config: WriteSinkConfig,
resolver: Arc<Resolver>,
connection_pool: Arc<ConnectionPool>,
) -> Self {
let variant = match config.sink {
WriteSinkVariantConfig::GrpcRemote(server_id) => WriteSinkVariant::GrpcRemote(
VariantGrpcRemote::new(db_name.to_string(), server_id, resolver, connection_pool),
),
WriteSinkVariantConfig::WriteBuffer(write_buffer_cfg) => WriteSinkVariant::WriteBuffer(
VariantWriteBuffer::new(db_name.to_string(), write_buffer_cfg, connection_pool),
),
};
Self {
ignore_errors: config.ignore_errors,
variant,
}
}
pub async fn write(&self, write: &DmlOperation) -> Result<(), Error> {
let res = match &self.variant {
WriteSinkVariant::GrpcRemote(v) => v.write(write).await,
WriteSinkVariant::WriteBuffer(v) => v.write(write).await,
};
match res {
Ok(()) => Ok(()),
Err(_) if self.ignore_errors => Ok(()),
e => e,
}
}
}
/// A set of [`WriteSink`]s.
#[derive(Debug)]
pub struct WriteSinkSet {
sinks: Vec<WriteSink>,
}
impl WriteSinkSet {
/// Create new set from config.
pub fn new(
db_name: &str,
config: WriteSinkSetConfig,
resolver: Arc<Resolver>,
connection_pool: Arc<ConnectionPool>,
) -> Self {
Self {
sinks: config
.sinks
.into_iter()
.map(|sink_config| {
WriteSink::new(
db_name,
sink_config,
Arc::clone(&resolver),
Arc::clone(&connection_pool),
)
})
.collect(),
}
}
/// Write to sinks. Fails on first error.
pub async fn write(&self, operation: &DmlOperation) -> Result<(), Error> {
for sink in &self.sinks {
sink.write(operation).await?;
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use dml::DmlWrite;
use iox_time::SystemProvider;
use mutable_batch_lp::lines_to_batches;
use write_buffer::config::WriteBufferConfigFactory;
use crate::grpc_client::MockClient;
use super::*;
#[tokio::test]
async fn test_write_sink_error_handling() {
let server_id = ServerId::try_from(1).unwrap();
let resolver = Arc::new(Resolver::new(None));
resolver.update_remote(server_id, String::from("1.2.3.4"));
let time_provider = Arc::new(SystemProvider::new());
let metric_registry = Arc::new(metric::Registry::new());
let wb_factory = Arc::new(WriteBufferConfigFactory::new(
time_provider,
metric_registry,
));
wb_factory.register_always_fail_mock(String::from("failing_wb"));
let connection_pool = Arc::new(ConnectionPool::new(true, wb_factory, None).await);
let client_grpc = connection_pool.grpc_client("1.2.3.4").await.unwrap();
let client_grpc = client_grpc.as_any().downcast_ref::<MockClient>().unwrap();
client_grpc.poison();
let write = DmlOperation::Write(DmlWrite::new(
"my_db",
lines_to_batches("foo x=1 1", 0).unwrap(),
Default::default(),
));
// gRPC, do NOT ignore errors
let config = WriteSinkConfig {
sink: WriteSinkVariantConfig::GrpcRemote(server_id),
ignore_errors: false,
};
let sink = WriteSink::new(
"my_db",
config,
Arc::clone(&resolver),
Arc::clone(&connection_pool),
);
sink.write(&write).await.unwrap_err();
// gRPC, ignore errors
let config = WriteSinkConfig {
sink: WriteSinkVariantConfig::GrpcRemote(server_id),
ignore_errors: true,
};
let sink = WriteSink::new(
"my_db",
config,
Arc::clone(&resolver),
Arc::clone(&connection_pool),
);
sink.write(&write).await.unwrap();
// write buffer, do NOT ignore errors
let write_buffer_cfg = WriteBufferConnection {
type_: String::from("mock"),
connection: String::from("failing_wb"),
..Default::default()
};
let config = WriteSinkConfig {
sink: WriteSinkVariantConfig::WriteBuffer(write_buffer_cfg.clone()),
ignore_errors: false,
};
let sink = WriteSink::new(
"my_db",
config,
Arc::clone(&resolver),
Arc::clone(&connection_pool),
);
sink.write(&write).await.unwrap_err();
// write buffer, ignore errors
let config = WriteSinkConfig {
sink: WriteSinkVariantConfig::WriteBuffer(write_buffer_cfg),
ignore_errors: true,
};
let sink = WriteSink::new(
"my_db",
config,
Arc::clone(&resolver),
Arc::clone(&connection_pool),
);
sink.write(&write).await.unwrap();
}
#[tokio::test]
async fn test_write_sink_set() {
let server_id_1 = ServerId::try_from(1).unwrap();
let server_id_2 = ServerId::try_from(2).unwrap();
let server_id_3 = ServerId::try_from(3).unwrap();
let resolver = Arc::new(Resolver::new(None));
resolver.update_remote(server_id_1, String::from("1"));
resolver.update_remote(server_id_2, String::from("2"));
resolver.update_remote(server_id_3, String::from("3"));
let connection_pool = Arc::new(ConnectionPool::new_testing().await);
let client_1 = connection_pool.grpc_client("1").await.unwrap();
let client_2 = connection_pool.grpc_client("2").await.unwrap();
let client_3 = connection_pool.grpc_client("3").await.unwrap();
let client_1 = client_1.as_any().downcast_ref::<MockClient>().unwrap();
let client_2 = client_2.as_any().downcast_ref::<MockClient>().unwrap();
let client_3 = client_3.as_any().downcast_ref::<MockClient>().unwrap();
let sink_set = WriteSinkSet::new(
"my_db",
WriteSinkSetConfig {
sinks: vec![
WriteSinkConfig {
sink: WriteSinkVariantConfig::GrpcRemote(server_id_1),
ignore_errors: false,
},
WriteSinkConfig {
sink: WriteSinkVariantConfig::GrpcRemote(server_id_2),
ignore_errors: false,
},
WriteSinkConfig {
sink: WriteSinkVariantConfig::GrpcRemote(server_id_3),
ignore_errors: false,
},
],
},
resolver,
connection_pool,
);
let write_1 = DmlOperation::Write(DmlWrite::new(
"my_db",
lines_to_batches("foo x=1 1", 0).unwrap(),
Default::default(),
));
sink_set.write(&write_1).await.unwrap();
let writes_1 = [(String::from("my_db"), write_1.clone())];
client_1.assert_writes(&writes_1);
client_2.assert_writes(&writes_1);
client_3.assert_writes(&writes_1);
client_2.poison();
let write_2 = DmlOperation::Write(DmlWrite::new(
"my_db",
lines_to_batches("foo x=2 2", 0).unwrap(),
Default::default(),
));
sink_set.write(&write_2).await.unwrap_err();
// The sink set stops on first non-ignored error. So
// - client 1 got the new data
// - client 2 failed, but still has the data from the first write
// - client 3 got skipped due to the failure, but still has the data from the first write
let writes_2 = [
(String::from("my_db"), write_1.clone()),
(String::from("my_db"), write_2.clone()),
];
client_1.assert_writes(&writes_2);
client_2.assert_writes(&writes_1);
client_3.assert_writes(&writes_1);
}
}

View File

@ -40,5 +40,4 @@ workspace-hack = { path = "../workspace-hack"}
dml = { path = "../dml" }
futures-util = { version = "0.3" }
regex = "1"
router = { path = "../router" }
test_helpers = { path = "../test_helpers" }