From c2a8baf824c49707c7c93d184a97e4fe1d242f8c Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Thu, 25 Nov 2021 14:18:25 +0100 Subject: [PATCH] refactor: clean up write buffer end2end tests Remove the ones that are covered by `write_pb.rs` or that are obsolete (i.e. rely on behavior that only exist in the legacy router code). Migrate the remaining ones to use an actual router. --- influxdb_iox/tests/common/server_fixture.rs | 7 + .../tests/end_to_end_cases/scenario.rs | 84 ++-------- .../tests/end_to_end_cases/write_buffer.rs | 147 +++--------------- 3 files changed, 44 insertions(+), 194 deletions(-) diff --git a/influxdb_iox/tests/common/server_fixture.rs b/influxdb_iox/tests/common/server_fixture.rs index 4ffd88ba3e..34aedf1b6f 100644 --- a/influxdb_iox/tests/common/server_fixture.rs +++ b/influxdb_iox/tests/common/server_fixture.rs @@ -369,6 +369,13 @@ impl TestConfig { server_type, } } + + // change server type + pub fn with_server_type(mut self, server_type: ServerType) -> Self { + self.server_type = server_type; + self + } + // add a name=value environment variable when starting the server pub fn with_env(mut self, name: impl Into, value: impl Into) -> Self { self.env.push((name.into(), value.into())); diff --git a/influxdb_iox/tests/end_to_end_cases/scenario.rs b/influxdb_iox/tests/end_to_end_cases/scenario.rs index c371b6c0a4..6622cb058c 100644 --- a/influxdb_iox/tests/end_to_end_cases/scenario.rs +++ b/influxdb_iox/tests/end_to_end_cases/scenario.rs @@ -1,5 +1,4 @@ use std::collections::HashMap; -use std::iter::once; use std::num::NonZeroU32; use std::path::Path; use std::time::Duration; @@ -25,13 +24,9 @@ use tempfile::TempDir; use test_helpers::assert_contains; use data_types::{names::org_and_bucket_to_database, DatabaseName}; -use database_rules::RoutingRules; use generated_types::google::protobuf::Empty; use generated_types::{ - influxdata::iox::{ - management::v1::{self as management, *}, - write_buffer::v1::WriteBufferCreationConfig, - }, + influxdata::iox::{management::v1::*, write_buffer::v1::WriteBufferCreationConfig}, ReadSource, TimestampRange, }; use influxdb_iox_client::{connection::Connection, flight::PerformQuery}; @@ -312,7 +307,6 @@ pub struct DatabaseBuilder { partition_template: PartitionTemplate, lifecycle_rules: LifecycleRules, write_buffer: Option, - table_whitelist: Option>, } impl DatabaseBuilder { @@ -331,7 +325,6 @@ impl DatabaseBuilder { ..Default::default() }, write_buffer: None, - table_whitelist: None, } } @@ -375,11 +368,6 @@ impl DatabaseBuilder { self } - pub fn write_buffer_table_whitelist(mut self, whitelist: Vec) -> Self { - self.table_whitelist = Some(whitelist); - self - } - pub fn worker_backoff_millis(mut self, millis: u64) -> Self { self.lifecycle_rules.worker_backoff_millis = millis; self @@ -389,58 +377,13 @@ impl DatabaseBuilder { pub async fn try_build(self, channel: Connection) -> Result<(), CreateDatabaseError> { let mut management_client = influxdb_iox_client::management::Client::new(channel); - let routing_rules = if self.write_buffer.is_some() { - const KAFKA_PRODUCER_SINK_ID: u32 = 0; - let kafka_producer_sink = management::Sink { - sink: Some(management::sink::Sink::Kafka(KafkaProducer {})), - }; - const DEV_NULL_SINK_ID: u32 = 1; - let dev_null_sink = management::Sink { - sink: Some(management::sink::Sink::DevNull(DevNull {})), - }; - - let to_shard = |shard: u32| { - Box::new(move |i: String| MatcherToShard { - matcher: Some(Matcher { - table_name_regex: format!("^{}$", i), - }), - shard, - }) - }; - - if let Some(table_whitelist) = self.table_whitelist { - Some(RoutingRules::ShardConfig(ShardConfig { - specific_targets: table_whitelist - .into_iter() - .map(to_shard(KAFKA_PRODUCER_SINK_ID)) - .chain(once(to_shard(DEV_NULL_SINK_ID)(".*".to_string()))) - .collect(), - shards: vec![ - (KAFKA_PRODUCER_SINK_ID, kafka_producer_sink), - (DEV_NULL_SINK_ID, dev_null_sink), - ] - .into_iter() - .collect(), - ..Default::default() - })) - } else { - Some(RoutingRules::RoutingConfig(RoutingConfig { - sink: Some(management::Sink { - sink: Some(management::sink::Sink::Kafka(KafkaProducer {})), - }), - })) - } - } else { - None - }; - management_client .create_database(DatabaseRules { name: self.name, partition_template: Some(self.partition_template), lifecycle_rules: Some(self.lifecycle_rules), worker_cleanup_avg_sleep: None, - routing_rules, + routing_rules: None, write_buffer_connection: self.write_buffer, }) .await?; @@ -769,27 +712,25 @@ pub async fn fixture_replay_broken(db_name: &str, write_buffer_path: &Path) -> S fixture } -pub async fn create_router_to_write_buffer( - fixture: &ServerFixture, +pub fn wildcard_router_config( db_name: &str, -) -> (TempDir, Box) { + write_buffer_path: &Path, +) -> influxdb_iox_client::router::generated_types::Router { use influxdb_iox_client::router::generated_types::{ write_sink::Sink, Matcher, MatcherToShard, Router, ShardConfig, WriteSink, WriteSinkSet, }; - let write_buffer_dir = TempDir::new().unwrap(); - let write_buffer_connection = WriteBufferConnection { direction: write_buffer_connection::Direction::Write.into(), r#type: "file".to_string(), - connection: write_buffer_dir.path().display().to_string(), + connection: write_buffer_path.display().to_string(), creation_config: Some(WriteBufferCreationConfig { n_sequencers: 1, ..Default::default() }), ..Default::default() }; - let router_cfg = Router { + Router { name: db_name.to_string(), write_sharder: Some(ShardConfig { specific_targets: vec![MatcherToShard { @@ -810,7 +751,16 @@ pub async fn create_router_to_write_buffer( }, )]), query_sinks: Default::default(), - }; + } +} + +pub async fn create_router_to_write_buffer( + fixture: &ServerFixture, + db_name: &str, +) -> (TempDir, Box) { + let write_buffer_dir = TempDir::new().unwrap(); + + let router_cfg = wildcard_router_config(db_name, write_buffer_dir.path()); fixture .router_client() .update_router(router_cfg) diff --git a/influxdb_iox/tests/end_to_end_cases/write_buffer.rs b/influxdb_iox/tests/end_to_end_cases/write_buffer.rs index 7ab245951c..8e5a39586b 100644 --- a/influxdb_iox/tests/end_to_end_cases/write_buffer.rs +++ b/influxdb_iox/tests/end_to_end_cases/write_buffer.rs @@ -3,11 +3,9 @@ use crate::{ server_fixture::{ServerFixture, ServerType, TestConfig}, udp_listener::UdpCapture, }, - end_to_end_cases::scenario::{rand_name, DatabaseBuilder}, + end_to_end_cases::scenario::{rand_name, wildcard_router_config, DatabaseBuilder}, }; use arrow_util::assert_batches_sorted_eq; -use dml::DmlOperation; -use futures::StreamExt; use generated_types::influxdata::iox::write_buffer::v1::{ write_buffer_connection::Direction as WriteBufferDirection, WriteBufferConnection, }; @@ -19,112 +17,7 @@ use std::{num::NonZeroU32, sync::Arc}; use tempfile::TempDir; use test_helpers::assert_contains; use time::SystemProvider; -use write_buffer::{ - core::{WriteBufferReading, WriteBufferWriting}, - file::{FileBufferConsumer, FileBufferProducer}, -}; - -#[tokio::test] -async fn writes_go_to_write_buffer() { - let write_buffer_dir = TempDir::new().unwrap(); - - // set up a database with a write buffer pointing at write buffer - let server = ServerFixture::create_shared(ServerType::Database).await; - let db_name = rand_name(); - let write_buffer_connection = WriteBufferConnection { - direction: WriteBufferDirection::Write.into(), - r#type: "file".to_string(), - connection: write_buffer_dir.path().display().to_string(), - creation_config: Some(WriteBufferCreationConfig { - n_sequencers: 1, - ..Default::default() - }), - ..Default::default() - }; - - DatabaseBuilder::new(db_name.clone()) - .write_buffer(write_buffer_connection.clone()) - .build(server.grpc_channel()) - .await; - - // write some points - let mut write_client = server.write_client(); - - let lp_lines = [ - "cpu,region=west user=23.2 100", - "cpu,region=west user=21.0 150", - "disk,region=east bytes=99i 200", - ]; - - let num_lines_written = write_client - .write_lp(&db_name, lp_lines.join("\n"), 0) - .await - .expect("cannot write"); - assert_eq!(num_lines_written, 3); - - // check the data is in write buffer - let mut consumer = - FileBufferConsumer::new(write_buffer_dir.path(), &db_name, Default::default(), None) - .await - .unwrap(); - let (_, mut stream) = consumer.streams().into_iter().next().unwrap(); - match stream.stream.next().await.unwrap().unwrap() { - DmlOperation::Write(write) => assert_eq!(write.table_count(), 2), - a => panic!("unexpected operation: {:?}", a), - } -} - -#[tokio::test] -async fn writes_go_to_write_buffer_whitelist() { - let write_buffer_dir = TempDir::new().unwrap(); - - // set up a database with a write buffer pointing at write buffer - let server = ServerFixture::create_shared(ServerType::Database).await; - let db_name = rand_name(); - let write_buffer_connection = WriteBufferConnection { - direction: WriteBufferDirection::Write.into(), - r#type: "file".to_string(), - connection: write_buffer_dir.path().display().to_string(), - creation_config: Some(WriteBufferCreationConfig { - n_sequencers: 1, - ..Default::default() - }), - ..Default::default() - }; - - DatabaseBuilder::new(db_name.clone()) - .write_buffer(write_buffer_connection) - .write_buffer_table_whitelist(vec!["cpu".to_string()]) - .build(server.grpc_channel()) - .await; - - // write some points - let mut write_client = server.write_client(); - - let lp_lines = [ - "cpu,region=west user=23.2 100", - "cpu,region=west user=21.0 150", - "disk,region=east bytes=99i 200", - "mem,region=east bytes=123 250", - ]; - - let num_lines_written = write_client - .write_lp(&db_name, lp_lines.join("\n"), 0) - .await - .expect("cannot write"); - assert_eq!(num_lines_written, 4); - - // check the data is in write buffer - let mut consumer = - FileBufferConsumer::new(write_buffer_dir.path(), &db_name, Default::default(), None) - .await - .unwrap(); - let (_, mut stream) = consumer.streams().into_iter().next().unwrap(); - match stream.stream.next().await.unwrap().unwrap() { - DmlOperation::Write(write) => assert_eq!(write.table_count(), 1), - a => panic!("unexpected operation: {:?}", a), - } -} +use write_buffer::{core::WriteBufferWriting, file::FileBufferProducer}; #[tokio::test] async fn reads_come_from_write_buffer() { @@ -300,7 +193,7 @@ pub async fn test_cross_write_buffer_tracing() { // setup tracing let udp_capture = UdpCapture::new().await; - let test_config = TestConfig::new(ServerType::Database) + let test_config = TestConfig::new(ServerType::Router) .with_env("TRACES_EXPORTER", "jaeger") .with_env("TRACES_EXPORTER_JAEGER_AGENT_HOST", udp_capture.ip()) .with_env("TRACES_EXPORTER_JAEGER_AGENT_PORT", udp_capture.port()) @@ -316,24 +209,18 @@ pub async fn test_cross_write_buffer_tracing() { .update_server_id(NonZeroU32::new(1).unwrap()) .await .unwrap(); - server_write.wait_server_initialized().await; - let conn_write = WriteBufferConnection { - direction: WriteBufferDirection::Write.into(), - r#type: "file".to_string(), - connection: write_buffer_dir.path().display().to_string(), - creation_config: Some(WriteBufferCreationConfig { - n_sequencers: 1, - ..Default::default() - }), - ..Default::default() - }; - DatabaseBuilder::new(db_name.clone()) - .write_buffer(conn_write.clone()) - .build(server_write.grpc_channel()) - .await; + let router_cfg = wildcard_router_config(&db_name, write_buffer_dir.path()); + server_write + .router_client() + .update_router(router_cfg) + .await + .unwrap(); // create consumer DB - let server_read = ServerFixture::create_single_use_with_config(test_config).await; + let server_read = ServerFixture::create_single_use_with_config( + test_config.with_server_type(ServerType::Database), + ) + .await; server_read .deployment_client() .update_server_id(NonZeroU32::new(2).unwrap()) @@ -342,7 +229,13 @@ pub async fn test_cross_write_buffer_tracing() { server_read.wait_server_initialized().await; let conn_read = WriteBufferConnection { direction: WriteBufferDirection::Read.into(), - ..conn_write + r#type: "file".to_string(), + connection: write_buffer_dir.path().display().to_string(), + creation_config: Some(WriteBufferCreationConfig { + n_sequencers: 1, + ..Default::default() + }), + ..Default::default() }; DatabaseBuilder::new(db_name.clone()) .write_buffer(conn_read)