test: migrate `DistibutedTest` to use proper router

pull/24376/head
Marco Neumann 2021-11-23 11:07:12 +01:00
parent df19d67532
commit f9110eecad
3 changed files with 53 additions and 33 deletions

2
Cargo.lock generated
View File

@ -3910,6 +3910,8 @@ dependencies = [
"rand",
"rand_distr",
"read_buffer",
"regex",
"router",
"schema",
"serde",
"serde_json",

View File

@ -59,6 +59,8 @@ write_buffer = { path = "../write_buffer" }
workspace-hack = { path = "../workspace-hack"}
[dev-dependencies] # In alphabetical order
regex = "1.4"
router = { path = "../router" }
test_helpers = { path = "../test_helpers" }
[features]

View File

@ -1,20 +1,28 @@
use std::collections::BTreeMap;
use std::num::NonZeroU32;
use std::sync::Arc;
use std::time::{Duration, Instant};
use arrow_util::assert_batches_eq;
use data_types::delete_predicate::{DeleteExpr, DeletePredicate, Op, Scalar};
use data_types::router::{
Matcher, MatcherToShard, QuerySinks, Router as RouterConfig, ShardConfig, ShardId, WriteSink,
WriteSinkSet, WriteSinkVariant,
};
use data_types::server_id::ServerId;
use data_types::timestamp::TimestampRange;
use data_types::DatabaseName;
use dml::{DmlDelete, DmlOperation, DmlWrite};
use generated_types::influxdata::iox::{
management::v1::{DatabaseRules, LifecycleRules},
management::v1::DatabaseRules,
write_buffer::v1::{write_buffer_connection::Direction, WriteBufferConnection},
};
use mutable_batch_lp::lines_to_batches;
use query::exec::ExecutionContextProvider;
use query::frontend::sql::SqlQueryPlanner;
use regex::Regex;
use router::router::Router;
use router::server::RouterServer;
use server::connection::test_helpers::TestConnectionManager;
use server::rules::ProvidedDatabaseRules;
use server::test_utils::{make_application, make_initialized_server};
@ -34,10 +42,7 @@ use write_buffer::mock::MockBufferSharedState;
/// It primarily exists to test the routing logic.
///
struct DistributedTest {
db_name: DatabaseName<'static>,
router: Arc<Server<TestConnectionManager>>,
// TODO: Replace with router (#2980)
router_db: Arc<Db>,
router: Arc<Router>,
consumer: Arc<Server<TestConnectionManager>>,
consumer_db: Arc<Db>,
@ -63,26 +68,41 @@ impl DistributedTest {
};
// Create a router
let router_server = RouterServer::for_testing(
None,
None,
Arc::clone(application.time_provider()),
Some(Arc::clone(application.write_buffer_factory())),
)
.await;
let router_id = ServerId::new(NonZeroU32::new(1).unwrap());
let router = make_initialized_server(router_id, Arc::clone(&application)).await;
router_server.set_server_id(router_id).unwrap();
let router_db = router
.create_database(
ProvidedDatabaseRules::new_rules(DatabaseRules {
name: db_name.to_string(),
write_buffer_connection: Some(write_buffer_connection.clone()),
lifecycle_rules: Some(LifecycleRules {
immutable: true,
..Default::default()
}),
..Default::default()
})
.unwrap(),
)
.await
.unwrap()
.initialized_db()
.unwrap();
router_server.update_router(RouterConfig {
name: db_name.to_string(),
write_sharder: ShardConfig {
specific_targets: vec![MatcherToShard {
matcher: Matcher {
table_name_regex: Some(Regex::new(".*").unwrap()),
},
shard: ShardId::new(1),
}],
hash_ring: None,
},
write_sinks: BTreeMap::from([(
ShardId::new(1),
WriteSinkSet {
sinks: vec![WriteSink {
sink: WriteSinkVariant::WriteBuffer(
write_buffer_connection.clone().try_into().unwrap(),
),
ignore_errors: false,
}],
},
)]),
query_sinks: QuerySinks::default(),
});
let router = router_server.router(db_name).unwrap();
// Create a consumer
let consumer_id = ServerId::new(NonZeroU32::new(2).unwrap());
@ -105,9 +125,7 @@ impl DistributedTest {
.unwrap();
Self {
db_name: db_name.clone(),
router,
router_db,
consumer,
consumer_db,
}
@ -136,18 +154,18 @@ impl DistributedTest {
/// Write line protocol
pub async fn write(&self, lp: &str) {
self.router
.write(
&self.db_name,
DmlWrite::new(lines_to_batches(lp, 0).unwrap(), Default::default()),
)
.write(DmlOperation::Write(DmlWrite::new(
lines_to_batches(lp, 0).unwrap(),
Default::default(),
)))
.await
.unwrap();
}
pub async fn delete(&self, delete: DmlDelete) {
// TODO: Write to router not Db (#2980)
self.router_db
.route_operation(&DmlOperation::Delete(delete))
self.router
.write(DmlOperation::Delete(delete))
.await
.unwrap();
}
@ -164,9 +182,7 @@ impl DistributedTest {
/// Shuts down the fixture and waits for the servers to exit
pub async fn drain(&self) {
self.router.shutdown();
self.consumer.shutdown();
self.router.join().await.unwrap();
self.consumer.join().await.unwrap();
}
}