refactor: clean up write buffer end2end tests

Remove the ones that are covered by `write_pb.rs` or that are obsolete
(i.e. rely on behavior that only exist in the legacy router code).
Migrate the remaining ones to use an actual router.
pull/24376/head
Marco Neumann 2021-11-25 14:18:25 +01:00
parent 524dfa8ce2
commit c2a8baf824
3 changed files with 44 additions and 194 deletions

View File

@ -369,6 +369,13 @@ impl TestConfig {
server_type,
}
}
// change server type
pub fn with_server_type(mut self, server_type: ServerType) -> Self {
self.server_type = server_type;
self
}
// add a name=value environment variable when starting the server
pub fn with_env(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
self.env.push((name.into(), value.into()));

View File

@ -1,5 +1,4 @@
use std::collections::HashMap;
use std::iter::once;
use std::num::NonZeroU32;
use std::path::Path;
use std::time::Duration;
@ -25,13 +24,9 @@ use tempfile::TempDir;
use test_helpers::assert_contains;
use data_types::{names::org_and_bucket_to_database, DatabaseName};
use database_rules::RoutingRules;
use generated_types::google::protobuf::Empty;
use generated_types::{
influxdata::iox::{
management::v1::{self as management, *},
write_buffer::v1::WriteBufferCreationConfig,
},
influxdata::iox::{management::v1::*, write_buffer::v1::WriteBufferCreationConfig},
ReadSource, TimestampRange,
};
use influxdb_iox_client::{connection::Connection, flight::PerformQuery};
@ -312,7 +307,6 @@ pub struct DatabaseBuilder {
partition_template: PartitionTemplate,
lifecycle_rules: LifecycleRules,
write_buffer: Option<WriteBufferConnection>,
table_whitelist: Option<Vec<String>>,
}
impl DatabaseBuilder {
@ -331,7 +325,6 @@ impl DatabaseBuilder {
..Default::default()
},
write_buffer: None,
table_whitelist: None,
}
}
@ -375,11 +368,6 @@ impl DatabaseBuilder {
self
}
pub fn write_buffer_table_whitelist(mut self, whitelist: Vec<String>) -> Self {
self.table_whitelist = Some(whitelist);
self
}
pub fn worker_backoff_millis(mut self, millis: u64) -> Self {
self.lifecycle_rules.worker_backoff_millis = millis;
self
@ -389,58 +377,13 @@ impl DatabaseBuilder {
pub async fn try_build(self, channel: Connection) -> Result<(), CreateDatabaseError> {
let mut management_client = influxdb_iox_client::management::Client::new(channel);
let routing_rules = if self.write_buffer.is_some() {
const KAFKA_PRODUCER_SINK_ID: u32 = 0;
let kafka_producer_sink = management::Sink {
sink: Some(management::sink::Sink::Kafka(KafkaProducer {})),
};
const DEV_NULL_SINK_ID: u32 = 1;
let dev_null_sink = management::Sink {
sink: Some(management::sink::Sink::DevNull(DevNull {})),
};
let to_shard = |shard: u32| {
Box::new(move |i: String| MatcherToShard {
matcher: Some(Matcher {
table_name_regex: format!("^{}$", i),
}),
shard,
})
};
if let Some(table_whitelist) = self.table_whitelist {
Some(RoutingRules::ShardConfig(ShardConfig {
specific_targets: table_whitelist
.into_iter()
.map(to_shard(KAFKA_PRODUCER_SINK_ID))
.chain(once(to_shard(DEV_NULL_SINK_ID)(".*".to_string())))
.collect(),
shards: vec![
(KAFKA_PRODUCER_SINK_ID, kafka_producer_sink),
(DEV_NULL_SINK_ID, dev_null_sink),
]
.into_iter()
.collect(),
..Default::default()
}))
} else {
Some(RoutingRules::RoutingConfig(RoutingConfig {
sink: Some(management::Sink {
sink: Some(management::sink::Sink::Kafka(KafkaProducer {})),
}),
}))
}
} else {
None
};
management_client
.create_database(DatabaseRules {
name: self.name,
partition_template: Some(self.partition_template),
lifecycle_rules: Some(self.lifecycle_rules),
worker_cleanup_avg_sleep: None,
routing_rules,
routing_rules: None,
write_buffer_connection: self.write_buffer,
})
.await?;
@ -769,27 +712,25 @@ pub async fn fixture_replay_broken(db_name: &str, write_buffer_path: &Path) -> S
fixture
}
pub async fn create_router_to_write_buffer(
fixture: &ServerFixture,
pub fn wildcard_router_config(
db_name: &str,
) -> (TempDir, Box<dyn WriteBufferReading>) {
write_buffer_path: &Path,
) -> influxdb_iox_client::router::generated_types::Router {
use influxdb_iox_client::router::generated_types::{
write_sink::Sink, Matcher, MatcherToShard, Router, ShardConfig, WriteSink, WriteSinkSet,
};
let write_buffer_dir = TempDir::new().unwrap();
let write_buffer_connection = WriteBufferConnection {
direction: write_buffer_connection::Direction::Write.into(),
r#type: "file".to_string(),
connection: write_buffer_dir.path().display().to_string(),
connection: write_buffer_path.display().to_string(),
creation_config: Some(WriteBufferCreationConfig {
n_sequencers: 1,
..Default::default()
}),
..Default::default()
};
let router_cfg = Router {
Router {
name: db_name.to_string(),
write_sharder: Some(ShardConfig {
specific_targets: vec![MatcherToShard {
@ -810,7 +751,16 @@ pub async fn create_router_to_write_buffer(
},
)]),
query_sinks: Default::default(),
};
}
}
pub async fn create_router_to_write_buffer(
fixture: &ServerFixture,
db_name: &str,
) -> (TempDir, Box<dyn WriteBufferReading>) {
let write_buffer_dir = TempDir::new().unwrap();
let router_cfg = wildcard_router_config(db_name, write_buffer_dir.path());
fixture
.router_client()
.update_router(router_cfg)

View File

@ -3,11 +3,9 @@ use crate::{
server_fixture::{ServerFixture, ServerType, TestConfig},
udp_listener::UdpCapture,
},
end_to_end_cases::scenario::{rand_name, DatabaseBuilder},
end_to_end_cases::scenario::{rand_name, wildcard_router_config, DatabaseBuilder},
};
use arrow_util::assert_batches_sorted_eq;
use dml::DmlOperation;
use futures::StreamExt;
use generated_types::influxdata::iox::write_buffer::v1::{
write_buffer_connection::Direction as WriteBufferDirection, WriteBufferConnection,
};
@ -19,112 +17,7 @@ use std::{num::NonZeroU32, sync::Arc};
use tempfile::TempDir;
use test_helpers::assert_contains;
use time::SystemProvider;
use write_buffer::{
core::{WriteBufferReading, WriteBufferWriting},
file::{FileBufferConsumer, FileBufferProducer},
};
#[tokio::test]
async fn writes_go_to_write_buffer() {
let write_buffer_dir = TempDir::new().unwrap();
// set up a database with a write buffer pointing at write buffer
let server = ServerFixture::create_shared(ServerType::Database).await;
let db_name = rand_name();
let write_buffer_connection = WriteBufferConnection {
direction: WriteBufferDirection::Write.into(),
r#type: "file".to_string(),
connection: write_buffer_dir.path().display().to_string(),
creation_config: Some(WriteBufferCreationConfig {
n_sequencers: 1,
..Default::default()
}),
..Default::default()
};
DatabaseBuilder::new(db_name.clone())
.write_buffer(write_buffer_connection.clone())
.build(server.grpc_channel())
.await;
// write some points
let mut write_client = server.write_client();
let lp_lines = [
"cpu,region=west user=23.2 100",
"cpu,region=west user=21.0 150",
"disk,region=east bytes=99i 200",
];
let num_lines_written = write_client
.write_lp(&db_name, lp_lines.join("\n"), 0)
.await
.expect("cannot write");
assert_eq!(num_lines_written, 3);
// check the data is in write buffer
let mut consumer =
FileBufferConsumer::new(write_buffer_dir.path(), &db_name, Default::default(), None)
.await
.unwrap();
let (_, mut stream) = consumer.streams().into_iter().next().unwrap();
match stream.stream.next().await.unwrap().unwrap() {
DmlOperation::Write(write) => assert_eq!(write.table_count(), 2),
a => panic!("unexpected operation: {:?}", a),
}
}
#[tokio::test]
async fn writes_go_to_write_buffer_whitelist() {
let write_buffer_dir = TempDir::new().unwrap();
// set up a database with a write buffer pointing at write buffer
let server = ServerFixture::create_shared(ServerType::Database).await;
let db_name = rand_name();
let write_buffer_connection = WriteBufferConnection {
direction: WriteBufferDirection::Write.into(),
r#type: "file".to_string(),
connection: write_buffer_dir.path().display().to_string(),
creation_config: Some(WriteBufferCreationConfig {
n_sequencers: 1,
..Default::default()
}),
..Default::default()
};
DatabaseBuilder::new(db_name.clone())
.write_buffer(write_buffer_connection)
.write_buffer_table_whitelist(vec!["cpu".to_string()])
.build(server.grpc_channel())
.await;
// write some points
let mut write_client = server.write_client();
let lp_lines = [
"cpu,region=west user=23.2 100",
"cpu,region=west user=21.0 150",
"disk,region=east bytes=99i 200",
"mem,region=east bytes=123 250",
];
let num_lines_written = write_client
.write_lp(&db_name, lp_lines.join("\n"), 0)
.await
.expect("cannot write");
assert_eq!(num_lines_written, 4);
// check the data is in write buffer
let mut consumer =
FileBufferConsumer::new(write_buffer_dir.path(), &db_name, Default::default(), None)
.await
.unwrap();
let (_, mut stream) = consumer.streams().into_iter().next().unwrap();
match stream.stream.next().await.unwrap().unwrap() {
DmlOperation::Write(write) => assert_eq!(write.table_count(), 1),
a => panic!("unexpected operation: {:?}", a),
}
}
use write_buffer::{core::WriteBufferWriting, file::FileBufferProducer};
#[tokio::test]
async fn reads_come_from_write_buffer() {
@ -300,7 +193,7 @@ pub async fn test_cross_write_buffer_tracing() {
// setup tracing
let udp_capture = UdpCapture::new().await;
let test_config = TestConfig::new(ServerType::Database)
let test_config = TestConfig::new(ServerType::Router)
.with_env("TRACES_EXPORTER", "jaeger")
.with_env("TRACES_EXPORTER_JAEGER_AGENT_HOST", udp_capture.ip())
.with_env("TRACES_EXPORTER_JAEGER_AGENT_PORT", udp_capture.port())
@ -316,24 +209,18 @@ pub async fn test_cross_write_buffer_tracing() {
.update_server_id(NonZeroU32::new(1).unwrap())
.await
.unwrap();
server_write.wait_server_initialized().await;
let conn_write = WriteBufferConnection {
direction: WriteBufferDirection::Write.into(),
r#type: "file".to_string(),
connection: write_buffer_dir.path().display().to_string(),
creation_config: Some(WriteBufferCreationConfig {
n_sequencers: 1,
..Default::default()
}),
..Default::default()
};
DatabaseBuilder::new(db_name.clone())
.write_buffer(conn_write.clone())
.build(server_write.grpc_channel())
.await;
let router_cfg = wildcard_router_config(&db_name, write_buffer_dir.path());
server_write
.router_client()
.update_router(router_cfg)
.await
.unwrap();
// create consumer DB
let server_read = ServerFixture::create_single_use_with_config(test_config).await;
let server_read = ServerFixture::create_single_use_with_config(
test_config.with_server_type(ServerType::Database),
)
.await;
server_read
.deployment_client()
.update_server_id(NonZeroU32::new(2).unwrap())
@ -342,7 +229,13 @@ pub async fn test_cross_write_buffer_tracing() {
server_read.wait_server_initialized().await;
let conn_read = WriteBufferConnection {
direction: WriteBufferDirection::Read.into(),
..conn_write
r#type: "file".to_string(),
connection: write_buffer_dir.path().display().to_string(),
creation_config: Some(WriteBufferCreationConfig {
n_sequencers: 1,
..Default::default()
}),
..Default::default()
};
DatabaseBuilder::new(db_name.clone())
.write_buffer(conn_read)