diff --git a/Cargo.lock b/Cargo.lock index 9ee3e80efd..7f0235a06b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3910,6 +3910,8 @@ dependencies = [ "rand", "rand_distr", "read_buffer", + "regex", + "router", "schema", "serde", "serde_json", diff --git a/server/Cargo.toml b/server/Cargo.toml index 07451e2c12..21349b7358 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -59,6 +59,8 @@ write_buffer = { path = "../write_buffer" } workspace-hack = { path = "../workspace-hack"} [dev-dependencies] # In alphabetical order +regex = "1.4" +router = { path = "../router" } test_helpers = { path = "../test_helpers" } [features] diff --git a/server/tests/write_buffer_delete.rs b/server/tests/write_buffer_delete.rs index 9f6bcf85d2..c09be30957 100644 --- a/server/tests/write_buffer_delete.rs +++ b/server/tests/write_buffer_delete.rs @@ -1,20 +1,28 @@ +use std::collections::BTreeMap; use std::num::NonZeroU32; use std::sync::Arc; use std::time::{Duration, Instant}; use arrow_util::assert_batches_eq; use data_types::delete_predicate::{DeleteExpr, DeletePredicate, Op, Scalar}; +use data_types::router::{ + Matcher, MatcherToShard, QuerySinks, Router as RouterConfig, ShardConfig, ShardId, WriteSink, + WriteSinkSet, WriteSinkVariant, +}; use data_types::server_id::ServerId; use data_types::timestamp::TimestampRange; use data_types::DatabaseName; use dml::{DmlDelete, DmlOperation, DmlWrite}; use generated_types::influxdata::iox::{ - management::v1::{DatabaseRules, LifecycleRules}, + management::v1::DatabaseRules, write_buffer::v1::{write_buffer_connection::Direction, WriteBufferConnection}, }; use mutable_batch_lp::lines_to_batches; use query::exec::ExecutionContextProvider; use query::frontend::sql::SqlQueryPlanner; +use regex::Regex; +use router::router::Router; +use router::server::RouterServer; use server::connection::test_helpers::TestConnectionManager; use server::rules::ProvidedDatabaseRules; use server::test_utils::{make_application, make_initialized_server}; @@ -34,10 +42,7 @@ use write_buffer::mock::MockBufferSharedState; /// It primarily exists to test the routing logic. /// struct DistributedTest { - db_name: DatabaseName<'static>, - router: Arc>, - // TODO: Replace with router (#2980) - router_db: Arc, + router: Arc, consumer: Arc>, consumer_db: Arc, @@ -63,26 +68,41 @@ impl DistributedTest { }; // Create a router + let router_server = RouterServer::for_testing( + None, + None, + Arc::clone(application.time_provider()), + Some(Arc::clone(application.write_buffer_factory())), + ) + .await; let router_id = ServerId::new(NonZeroU32::new(1).unwrap()); - let router = make_initialized_server(router_id, Arc::clone(&application)).await; + router_server.set_server_id(router_id).unwrap(); - let router_db = router - .create_database( - ProvidedDatabaseRules::new_rules(DatabaseRules { - name: db_name.to_string(), - write_buffer_connection: Some(write_buffer_connection.clone()), - lifecycle_rules: Some(LifecycleRules { - immutable: true, - ..Default::default() - }), - ..Default::default() - }) - .unwrap(), - ) - .await - .unwrap() - .initialized_db() - .unwrap(); + router_server.update_router(RouterConfig { + name: db_name.to_string(), + write_sharder: ShardConfig { + specific_targets: vec![MatcherToShard { + matcher: Matcher { + table_name_regex: Some(Regex::new(".*").unwrap()), + }, + shard: ShardId::new(1), + }], + hash_ring: None, + }, + write_sinks: BTreeMap::from([( + ShardId::new(1), + WriteSinkSet { + sinks: vec![WriteSink { + sink: WriteSinkVariant::WriteBuffer( + write_buffer_connection.clone().try_into().unwrap(), + ), + ignore_errors: false, + }], + }, + )]), + query_sinks: QuerySinks::default(), + }); + let router = router_server.router(db_name).unwrap(); // Create a consumer let consumer_id = ServerId::new(NonZeroU32::new(2).unwrap()); @@ -105,9 +125,7 @@ impl DistributedTest { .unwrap(); Self { - db_name: db_name.clone(), router, - router_db, consumer, consumer_db, } @@ -136,18 +154,18 @@ impl DistributedTest { /// Write line protocol pub async fn write(&self, lp: &str) { self.router - .write( - &self.db_name, - DmlWrite::new(lines_to_batches(lp, 0).unwrap(), Default::default()), - ) + .write(DmlOperation::Write(DmlWrite::new( + lines_to_batches(lp, 0).unwrap(), + Default::default(), + ))) .await .unwrap(); } pub async fn delete(&self, delete: DmlDelete) { // TODO: Write to router not Db (#2980) - self.router_db - .route_operation(&DmlOperation::Delete(delete)) + self.router + .write(DmlOperation::Delete(delete)) .await .unwrap(); } @@ -164,9 +182,7 @@ impl DistributedTest { /// Shuts down the fixture and waits for the servers to exit pub async fn drain(&self) { - self.router.shutdown(); self.consumer.shutdown(); - self.router.join().await.unwrap(); self.consumer.join().await.unwrap(); } }