2021-06-16 18:53:02 +00:00
|
|
|
use crate::{
|
|
|
|
common::server_fixture::ServerFixture,
|
2021-07-15 12:25:21 +00:00
|
|
|
end_to_end_cases::scenario::{rand_name, DatabaseBuilder},
|
2021-06-16 18:53:02 +00:00
|
|
|
};
|
2021-06-21 15:30:29 +00:00
|
|
|
use arrow_util::assert_batches_sorted_eq;
|
|
|
|
use entry::{test_helpers::lp_to_entry, Entry};
|
2021-06-17 21:06:12 +00:00
|
|
|
use generated_types::influxdata::iox::management::v1::database_rules::WriteBufferConnection;
|
2021-06-21 15:30:29 +00:00
|
|
|
use influxdb_iox_client::write::WriteError;
|
2021-06-09 17:35:59 +00:00
|
|
|
use rdkafka::{
|
2021-07-02 20:22:27 +00:00
|
|
|
admin::{AdminClient, AdminOptions, NewTopic, TopicReplication},
|
|
|
|
client::DefaultClientContext,
|
2021-06-09 17:35:59 +00:00
|
|
|
consumer::{Consumer, StreamConsumer},
|
2021-06-21 15:30:29 +00:00
|
|
|
producer::{FutureProducer, FutureRecord},
|
2021-06-09 17:35:59 +00:00
|
|
|
ClientConfig, Message, Offset, TopicPartitionList,
|
|
|
|
};
|
2021-06-16 18:53:02 +00:00
|
|
|
use std::convert::TryFrom;
|
2021-06-21 15:30:29 +00:00
|
|
|
use test_helpers::assert_contains;
|
2021-07-15 12:02:49 +00:00
|
|
|
use write_buffer::maybe_skip_kafka_integration;
|
2021-06-09 17:35:59 +00:00
|
|
|
|
|
|
|
#[tokio::test]
|
2021-06-16 18:53:02 +00:00
|
|
|
async fn writes_go_to_kafka() {
|
2021-07-15 12:02:49 +00:00
|
|
|
let kafka_connection = maybe_skip_kafka_integration!();
|
2021-06-09 17:35:59 +00:00
|
|
|
|
2021-06-16 18:53:02 +00:00
|
|
|
// set up a database with a write buffer pointing at kafka
|
|
|
|
let server = ServerFixture::create_shared().await;
|
|
|
|
let db_name = rand_name();
|
2021-06-17 21:06:12 +00:00
|
|
|
let write_buffer_connection = WriteBufferConnection::Writing(kafka_connection.to_string());
|
2021-07-15 12:25:21 +00:00
|
|
|
|
|
|
|
DatabaseBuilder::new(db_name.clone())
|
|
|
|
.write_buffer(write_buffer_connection)
|
|
|
|
.build(server.grpc_channel())
|
|
|
|
.await;
|
2021-06-16 18:53:02 +00:00
|
|
|
|
|
|
|
// write some points
|
|
|
|
let mut write_client = server.write_client();
|
|
|
|
|
|
|
|
let lp_lines = [
|
|
|
|
"cpu,region=west user=23.2 100",
|
|
|
|
"cpu,region=west user=21.0 150",
|
|
|
|
"disk,region=east bytes=99i 200",
|
|
|
|
];
|
|
|
|
|
|
|
|
let num_lines_written = write_client
|
|
|
|
.write(&db_name, lp_lines.join("\n"))
|
|
|
|
.await
|
|
|
|
.expect("cannot write");
|
|
|
|
assert_eq!(num_lines_written, 3);
|
|
|
|
|
|
|
|
// check the data is in kafka
|
2021-06-09 17:35:59 +00:00
|
|
|
let mut cfg = ClientConfig::new();
|
|
|
|
cfg.set("bootstrap.servers", kafka_connection);
|
2021-06-16 18:53:02 +00:00
|
|
|
cfg.set("session.timeout.ms", "6000");
|
|
|
|
cfg.set("enable.auto.commit", "false");
|
|
|
|
cfg.set("group.id", "placeholder");
|
2021-06-09 17:35:59 +00:00
|
|
|
|
2021-06-16 18:53:02 +00:00
|
|
|
let consumer: StreamConsumer = cfg.create().unwrap();
|
2021-06-09 17:35:59 +00:00
|
|
|
let mut topics = TopicPartitionList::new();
|
2021-06-16 18:53:02 +00:00
|
|
|
topics.add_partition(&db_name, 0);
|
2021-06-09 17:35:59 +00:00
|
|
|
topics
|
2021-06-16 18:53:02 +00:00
|
|
|
.set_partition_offset(&db_name, 0, Offset::Beginning)
|
2021-06-09 17:35:59 +00:00
|
|
|
.unwrap();
|
|
|
|
consumer.assign(&topics).unwrap();
|
|
|
|
|
2021-06-16 18:53:02 +00:00
|
|
|
let message = consumer.recv().await.unwrap();
|
|
|
|
assert_eq!(message.topic(), db_name);
|
2021-06-09 17:35:59 +00:00
|
|
|
|
2021-06-16 18:53:02 +00:00
|
|
|
let entry = Entry::try_from(message.payload().unwrap().to_vec()).unwrap();
|
|
|
|
let partition_writes = entry.partition_writes().unwrap();
|
|
|
|
assert_eq!(partition_writes.len(), 2);
|
2021-06-09 17:35:59 +00:00
|
|
|
}
|
2021-06-21 15:30:29 +00:00
|
|
|
|
2021-07-02 20:22:27 +00:00
|
|
|
async fn produce_to_kafka_directly(
|
|
|
|
producer: &FutureProducer,
|
|
|
|
lp: &str,
|
|
|
|
topic: &str,
|
|
|
|
partition: Option<i32>,
|
|
|
|
) {
|
|
|
|
let entry = lp_to_entry(lp);
|
|
|
|
let mut record: FutureRecord<'_, String, _> = FutureRecord::to(topic).payload(entry.data());
|
|
|
|
|
|
|
|
if let Some(pid) = partition {
|
|
|
|
record = record.partition(pid);
|
|
|
|
}
|
|
|
|
|
|
|
|
producer
|
|
|
|
.send_result(record)
|
|
|
|
.unwrap()
|
|
|
|
.await
|
|
|
|
.unwrap()
|
|
|
|
.unwrap();
|
|
|
|
}
|
|
|
|
|
2021-06-21 15:30:29 +00:00
|
|
|
#[tokio::test]
|
|
|
|
async fn reads_come_from_kafka() {
|
2021-07-15 12:02:49 +00:00
|
|
|
let kafka_connection = maybe_skip_kafka_integration!();
|
2021-06-21 15:30:29 +00:00
|
|
|
|
|
|
|
// set up a database to read from Kafka
|
|
|
|
let server = ServerFixture::create_shared().await;
|
|
|
|
let db_name = rand_name();
|
|
|
|
let write_buffer_connection = WriteBufferConnection::Reading(kafka_connection.to_string());
|
2021-07-15 12:25:21 +00:00
|
|
|
|
2021-07-02 20:22:27 +00:00
|
|
|
// Common Kafka config
|
|
|
|
let mut cfg = ClientConfig::new();
|
|
|
|
cfg.set("bootstrap.servers", kafka_connection);
|
|
|
|
cfg.set("message.timeout.ms", "5000");
|
|
|
|
|
2021-07-19 08:34:58 +00:00
|
|
|
// Create a partition with 2 topics in Kafka BEFORE creating the DB
|
2021-07-02 20:22:27 +00:00
|
|
|
let num_partitions = 2;
|
|
|
|
let admin: AdminClient<DefaultClientContext> = cfg.clone().create().unwrap();
|
|
|
|
let topic = NewTopic::new(&db_name, num_partitions, TopicReplication::Fixed(1));
|
|
|
|
let opts = AdminOptions::default();
|
|
|
|
admin.create_topics(&[topic], &opts).await.unwrap();
|
|
|
|
|
2021-07-19 08:34:58 +00:00
|
|
|
DatabaseBuilder::new(db_name.clone())
|
|
|
|
.write_buffer(write_buffer_connection)
|
|
|
|
.build(server.grpc_channel())
|
|
|
|
.await;
|
|
|
|
|
2021-06-21 15:30:29 +00:00
|
|
|
// put some points in Kafka
|
2021-07-02 20:22:27 +00:00
|
|
|
let producer: FutureProducer = cfg.create().unwrap();
|
2021-06-21 15:30:29 +00:00
|
|
|
|
2021-07-07 17:55:01 +00:00
|
|
|
// Kafka partitions must be configured based on the primary key because ordering across Kafka
|
|
|
|
// partitions is undefined, so the upsert semantics would be undefined. Entries that can
|
|
|
|
// potentially be merged must end up in the same Kafka partition. This test follows that
|
|
|
|
// constraint, but doesn't actually encode it.
|
|
|
|
|
|
|
|
// Put some data for `upc,region=west` in partition 0
|
2021-06-21 15:30:29 +00:00
|
|
|
let lp_lines = [
|
|
|
|
"upc,region=west user=23.2 100",
|
2021-07-07 17:55:01 +00:00
|
|
|
"upc,region=west user=21.0 150",
|
2021-06-21 15:30:29 +00:00
|
|
|
];
|
2021-07-02 20:22:27 +00:00
|
|
|
produce_to_kafka_directly(&producer, &lp_lines.join("\n"), &db_name, Some(0)).await;
|
2021-06-21 15:30:29 +00:00
|
|
|
|
2021-07-07 17:55:01 +00:00
|
|
|
// Put some data for `upc,region=east` in partition 1
|
2021-07-02 20:22:27 +00:00
|
|
|
let lp_lines = [
|
2021-07-07 17:55:01 +00:00
|
|
|
"upc,region=east user=76.2 300",
|
2021-07-02 20:22:27 +00:00
|
|
|
"upc,region=east user=88.7 350",
|
|
|
|
];
|
|
|
|
produce_to_kafka_directly(&producer, &lp_lines.join("\n"), &db_name, Some(1)).await;
|
|
|
|
|
2021-06-21 15:30:29 +00:00
|
|
|
let check = async {
|
|
|
|
let mut interval = tokio::time::interval(std::time::Duration::from_millis(500));
|
|
|
|
|
|
|
|
loop {
|
|
|
|
// query for the data
|
|
|
|
let query_results = server
|
|
|
|
.flight_client()
|
|
|
|
.perform_query(&db_name, "select * from upc")
|
|
|
|
.await;
|
|
|
|
|
|
|
|
if let Ok(mut results) = query_results {
|
|
|
|
let mut batches = Vec::new();
|
2021-07-19 08:34:58 +00:00
|
|
|
let mut num_rows = 0;
|
2021-06-21 15:30:29 +00:00
|
|
|
while let Some(data) = results.next().await.unwrap() {
|
2021-07-19 08:34:58 +00:00
|
|
|
num_rows += data.num_rows();
|
2021-06-21 15:30:29 +00:00
|
|
|
batches.push(data);
|
|
|
|
}
|
|
|
|
|
2021-07-19 08:34:58 +00:00
|
|
|
// Since data is streamed using two partitions, only a subset of the data might be present. If that's
|
|
|
|
// the case, ignore that record batch and try again.
|
|
|
|
if num_rows < 4 {
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
2021-06-21 15:30:29 +00:00
|
|
|
let expected = vec![
|
|
|
|
"+--------+-------------------------------+------+",
|
|
|
|
"| region | time | user |",
|
|
|
|
"+--------+-------------------------------+------+",
|
2021-07-07 17:55:01 +00:00
|
|
|
"| east | 1970-01-01 00:00:00.000000300 | 76.2 |",
|
2021-07-02 20:22:27 +00:00
|
|
|
"| east | 1970-01-01 00:00:00.000000350 | 88.7 |",
|
|
|
|
"| west | 1970-01-01 00:00:00.000000100 | 23.2 |",
|
2021-07-07 17:55:01 +00:00
|
|
|
"| west | 1970-01-01 00:00:00.000000150 | 21 |",
|
2021-06-21 15:30:29 +00:00
|
|
|
"+--------+-------------------------------+------+",
|
|
|
|
];
|
|
|
|
assert_batches_sorted_eq!(&expected, &batches);
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
|
|
|
interval.tick().await;
|
|
|
|
}
|
|
|
|
};
|
|
|
|
let check = tokio::time::timeout(std::time::Duration::from_secs(10), check);
|
|
|
|
check.await.unwrap();
|
|
|
|
}
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
async fn cant_write_to_db_reading_from_kafka() {
|
2021-07-15 12:02:49 +00:00
|
|
|
let kafka_connection = maybe_skip_kafka_integration!();
|
2021-06-21 15:30:29 +00:00
|
|
|
|
|
|
|
// set up a database to read from Kafka
|
|
|
|
let server = ServerFixture::create_shared().await;
|
|
|
|
let db_name = rand_name();
|
|
|
|
let write_buffer_connection = WriteBufferConnection::Reading(kafka_connection.to_string());
|
2021-07-15 12:25:21 +00:00
|
|
|
|
|
|
|
DatabaseBuilder::new(db_name.clone())
|
|
|
|
.write_buffer(write_buffer_connection)
|
|
|
|
.build(server.grpc_channel())
|
|
|
|
.await;
|
2021-06-21 15:30:29 +00:00
|
|
|
|
|
|
|
// Writing to this database is an error; all data comes from Kafka
|
|
|
|
let mut write_client = server.write_client();
|
|
|
|
let err = write_client
|
|
|
|
.write(&db_name, "temp,region=south color=1")
|
|
|
|
.await
|
|
|
|
.expect_err("expected write to fail");
|
|
|
|
|
|
|
|
assert_contains!(
|
|
|
|
err.to_string(),
|
|
|
|
format!(
|
|
|
|
r#"Cannot write to database {}, it's configured to only read from the write buffer"#,
|
|
|
|
db_name
|
|
|
|
)
|
|
|
|
);
|
|
|
|
assert!(matches!(dbg!(err), WriteError::ServerError(_)));
|
|
|
|
}
|