Merge pull request #2960 from influxdata/crepererum/no_kafka_end2end
test: remove Kafka from end2end testspull/24376/head
commit
23fa85994e
|
|
@ -1635,7 +1635,6 @@ dependencies = [
|
|||
"prost",
|
||||
"query",
|
||||
"rand",
|
||||
"rdkafka",
|
||||
"read_buffer",
|
||||
"reqwest",
|
||||
"rustyline",
|
||||
|
|
|
|||
|
|
@ -182,7 +182,6 @@ flate2 = "1.0"
|
|||
hex = "0.4.2"
|
||||
predicates = "2.0.3"
|
||||
rand = "0.8.3"
|
||||
rdkafka = "0.27.0"
|
||||
reqwest = "0.11"
|
||||
tempfile = "3.1.0"
|
||||
|
||||
|
|
|
|||
|
|
@ -21,4 +21,4 @@ ENV TEST_INTEGRATION=1
|
|||
ENV KAFKA_CONNECT=kafka:9092
|
||||
|
||||
# Run the integration tests that connect to Kafka that will be running in another container
|
||||
CMD ["sh", "-c", "./docker/integration_test.sh"]
|
||||
CMD ["sh", "-c", "cargo test -p write_buffer kafka -- --nocapture"]
|
||||
|
|
|
|||
|
|
@ -1,7 +0,0 @@
|
|||
#!/bin/bash
|
||||
|
||||
set -euxo pipefail
|
||||
|
||||
cargo test -p write_buffer kafka -- --nocapture
|
||||
cargo test -p influxdb_iox --test end_to_end skip_replay -- --nocapture
|
||||
cargo test -p influxdb_iox --test end_to_end write_buffer -- --nocapture
|
||||
|
|
@ -136,7 +136,7 @@ You can then run the tests with `KAFKA_CONNECT=localhost:9093`. To run just the
|
|||
tests, the full command would then be:
|
||||
|
||||
```
|
||||
TEST_INTEGRATION=1 KAFKA_CONNECT=localhost:9093 cargo test -p influxdb_iox --test end_to_end write_buffer
|
||||
TEST_INTEGRATION=1 KAFKA_CONNECT=localhost:9093 cargo test -p write_buffer kafka --nocapture
|
||||
```
|
||||
|
||||
### Running `cargo test` in a Docker container
|
||||
|
|
|
|||
|
|
@ -8,8 +8,8 @@ use generated_types::google::longrunning::IoxOperation;
|
|||
use generated_types::influxdata::iox::management::v1::{
|
||||
operation_metadata::Job, WipePreservedCatalog,
|
||||
};
|
||||
use tempfile::TempDir;
|
||||
use test_helpers::make_temp_file;
|
||||
use write_buffer::maybe_skip_kafka_integration;
|
||||
|
||||
use crate::{
|
||||
common::server_fixture::ServerFixture,
|
||||
|
|
@ -910,9 +910,9 @@ async fn test_wipe_persisted_catalog_error_db_exists() {
|
|||
|
||||
#[tokio::test]
|
||||
async fn test_skip_replay() {
|
||||
let kafka_connection = maybe_skip_kafka_integration!();
|
||||
let write_buffer_dir = TempDir::new().unwrap();
|
||||
let db_name = rand_name();
|
||||
let server_fixture = fixture_replay_broken(&db_name, &kafka_connection).await;
|
||||
let server_fixture = fixture_replay_broken(&db_name, write_buffer_dir.path()).await;
|
||||
let addr = server_fixture.grpc_base();
|
||||
|
||||
Command::cargo_bin("influxdb_iox")
|
||||
|
|
|
|||
|
|
@ -1,6 +1,5 @@
|
|||
use std::convert::TryFrom;
|
||||
use std::iter::once;
|
||||
use std::num::NonZeroU32;
|
||||
use std::path::Path;
|
||||
use std::time::Duration;
|
||||
use std::{convert::TryInto, str, u32};
|
||||
use std::{sync::Arc, time::SystemTime};
|
||||
|
|
@ -31,9 +30,9 @@ use generated_types::{
|
|||
ReadSource, TimestampRange,
|
||||
};
|
||||
use influxdb_iox_client::{connection::Connection, flight::PerformQuery};
|
||||
use time::SystemProvider;
|
||||
use write_buffer::core::WriteBufferWriting;
|
||||
use write_buffer::kafka::test_utils::{kafka_sequencer_options, purge_kafka_topic};
|
||||
use write_buffer::kafka::KafkaBufferProducer;
|
||||
use write_buffer::file::FileBufferProducer;
|
||||
|
||||
use crate::common::server_fixture::{ServerFixture, TestConfig, DEFAULT_SERVER_ID};
|
||||
|
||||
|
|
@ -655,7 +654,7 @@ pub async fn fixture_broken_catalog(db_name: &str) -> ServerFixture {
|
|||
}
|
||||
|
||||
/// Creates a database that cannot be replayed
|
||||
pub async fn fixture_replay_broken(db_name: &str, kafka_connection: &str) -> ServerFixture {
|
||||
pub async fn fixture_replay_broken(db_name: &str, write_buffer_path: &Path) -> ServerFixture {
|
||||
let server_id = DEFAULT_SERVER_ID;
|
||||
|
||||
let test_config = TestConfig::new().with_env("INFLUXDB_IOX_SKIP_REPLAY", "no");
|
||||
|
|
@ -680,11 +679,11 @@ pub async fn fixture_replay_broken(db_name: &str, kafka_connection: &str) -> Ser
|
|||
name: db_name.to_string(),
|
||||
write_buffer_connection: Some(WriteBufferConnection {
|
||||
direction: write_buffer_connection::Direction::Read.into(),
|
||||
r#type: "kafka".to_string(),
|
||||
connection: kafka_connection.to_string(),
|
||||
r#type: "file".to_string(),
|
||||
connection: write_buffer_path.display().to_string(),
|
||||
creation_config: Some(WriteBufferCreationConfig {
|
||||
n_sequencers: 1,
|
||||
options: kafka_sequencer_options(),
|
||||
..Default::default()
|
||||
}),
|
||||
..Default::default()
|
||||
}),
|
||||
|
|
@ -708,45 +707,42 @@ pub async fn fixture_replay_broken(db_name: &str, kafka_connection: &str) -> Ser
|
|||
.unwrap();
|
||||
|
||||
// ingest data as mixed throughput
|
||||
let creation_config = Some(data_types::database_rules::WriteBufferCreationConfig {
|
||||
n_sequencers: NonZeroU32::try_from(1).unwrap(),
|
||||
options: kafka_sequencer_options(),
|
||||
});
|
||||
let producer = KafkaBufferProducer::new(
|
||||
kafka_connection,
|
||||
let time_provider = Arc::new(SystemProvider::new());
|
||||
let producer = FileBufferProducer::new(
|
||||
write_buffer_path,
|
||||
db_name,
|
||||
&Default::default(),
|
||||
creation_config.as_ref(),
|
||||
Arc::new(time::SystemProvider::new()),
|
||||
Default::default(),
|
||||
time_provider,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
producer
|
||||
let sequencer_id = producer.sequencer_ids().into_iter().next().unwrap();
|
||||
let (sequence_1, _) = producer
|
||||
.store_entry(
|
||||
&lp_to_entries("table_1,partition_by=a foo=1 10", &partition_template)
|
||||
.pop()
|
||||
.unwrap(),
|
||||
0,
|
||||
sequencer_id,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
producer
|
||||
let (sequence_2, _) = producer
|
||||
.store_entry(
|
||||
&lp_to_entries("table_1,partition_by=b foo=2 20", &partition_template)
|
||||
.pop()
|
||||
.unwrap(),
|
||||
0,
|
||||
sequencer_id,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
producer
|
||||
let (sequence_3, _) = producer
|
||||
.store_entry(
|
||||
&lp_to_entries("table_1,partition_by=b foo=3 30", &partition_template)
|
||||
.pop()
|
||||
.unwrap(),
|
||||
0,
|
||||
sequencer_id,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
|
|
@ -766,8 +762,7 @@ pub async fn fixture_replay_broken(db_name: &str, kafka_connection: &str) -> Ser
|
|||
)
|
||||
.await;
|
||||
|
||||
// purge data from Kafka
|
||||
purge_kafka_topic(kafka_connection, db_name).await;
|
||||
// add new entry to the end
|
||||
producer
|
||||
.store_entry(
|
||||
&lp_to_entries("table_1,partition_by=c foo=4 40", &partition_template)
|
||||
|
|
@ -779,6 +774,29 @@ pub async fn fixture_replay_broken(db_name: &str, kafka_connection: &str) -> Ser
|
|||
.await
|
||||
.unwrap();
|
||||
|
||||
// purge data from write buffer
|
||||
write_buffer::file::test_utils::remove_entry(
|
||||
write_buffer_path,
|
||||
db_name,
|
||||
sequencer_id,
|
||||
sequence_1.number,
|
||||
)
|
||||
.await;
|
||||
write_buffer::file::test_utils::remove_entry(
|
||||
write_buffer_path,
|
||||
db_name,
|
||||
sequencer_id,
|
||||
sequence_2.number,
|
||||
)
|
||||
.await;
|
||||
write_buffer::file::test_utils::remove_entry(
|
||||
write_buffer_path,
|
||||
db_name,
|
||||
sequencer_id,
|
||||
sequence_3.number,
|
||||
)
|
||||
.await;
|
||||
|
||||
// Try to replay and error
|
||||
let fixture = fixture.restart_server().await;
|
||||
|
||||
|
|
|
|||
|
|
@ -6,7 +6,8 @@ use crate::{
|
|||
end_to_end_cases::scenario::{rand_name, DatabaseBuilder},
|
||||
};
|
||||
use arrow_util::assert_batches_sorted_eq;
|
||||
use entry::{test_helpers::lp_to_entry, Entry};
|
||||
use entry::test_helpers::lp_to_entry;
|
||||
use futures::StreamExt;
|
||||
use generated_types::influxdata::iox::management::v1::{
|
||||
write_buffer_connection::Direction as WriteBufferDirection, WriteBufferConnection,
|
||||
};
|
||||
|
|
@ -14,36 +15,35 @@ use influxdb_iox_client::{
|
|||
management::{generated_types::WriteBufferCreationConfig, CreateDatabaseError},
|
||||
write::WriteError,
|
||||
};
|
||||
use rdkafka::{
|
||||
consumer::{Consumer, StreamConsumer},
|
||||
producer::{FutureProducer, FutureRecord},
|
||||
ClientConfig, Message, Offset, TopicPartitionList,
|
||||
};
|
||||
use std::convert::TryFrom;
|
||||
use std::sync::Arc;
|
||||
use tempfile::TempDir;
|
||||
use test_helpers::assert_contains;
|
||||
use write_buffer::{kafka::test_utils::kafka_sequencer_options, maybe_skip_kafka_integration};
|
||||
use time::SystemProvider;
|
||||
use write_buffer::{
|
||||
core::{WriteBufferReading, WriteBufferWriting},
|
||||
file::{FileBufferConsumer, FileBufferProducer},
|
||||
};
|
||||
|
||||
#[tokio::test]
|
||||
async fn writes_go_to_kafka() {
|
||||
let kafka_connection = maybe_skip_kafka_integration!();
|
||||
async fn writes_go_to_write_buffer() {
|
||||
let write_buffer_dir = TempDir::new().unwrap();
|
||||
|
||||
// set up a database with a write buffer pointing at kafka
|
||||
// set up a database with a write buffer pointing at write buffer
|
||||
let server = ServerFixture::create_shared().await;
|
||||
let db_name = rand_name();
|
||||
let write_buffer_connection = WriteBufferConnection {
|
||||
direction: WriteBufferDirection::Write.into(),
|
||||
r#type: "kafka".to_string(),
|
||||
connection: kafka_connection.to_string(),
|
||||
r#type: "file".to_string(),
|
||||
connection: write_buffer_dir.path().display().to_string(),
|
||||
creation_config: Some(WriteBufferCreationConfig {
|
||||
n_sequencers: 1,
|
||||
options: kafka_sequencer_options(),
|
||||
..Default::default()
|
||||
}),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
DatabaseBuilder::new(db_name.clone())
|
||||
.write_buffer(write_buffer_connection)
|
||||
.write_buffer(write_buffer_connection.clone())
|
||||
.build(server.grpc_channel())
|
||||
.await;
|
||||
|
||||
|
|
@ -62,43 +62,32 @@ async fn writes_go_to_kafka() {
|
|||
.expect("cannot write");
|
||||
assert_eq!(num_lines_written, 3);
|
||||
|
||||
// check the data is in kafka
|
||||
let mut cfg = ClientConfig::new();
|
||||
cfg.set("bootstrap.servers", kafka_connection);
|
||||
cfg.set("session.timeout.ms", "6000");
|
||||
cfg.set("enable.auto.commit", "false");
|
||||
cfg.set("group.id", "placeholder");
|
||||
|
||||
let consumer: StreamConsumer = cfg.create().unwrap();
|
||||
let mut topics = TopicPartitionList::new();
|
||||
topics.add_partition(&db_name, 0);
|
||||
topics
|
||||
.set_partition_offset(&db_name, 0, Offset::Beginning)
|
||||
.unwrap();
|
||||
consumer.assign(&topics).unwrap();
|
||||
|
||||
let message = consumer.recv().await.unwrap();
|
||||
assert_eq!(message.topic(), db_name);
|
||||
|
||||
let entry = Entry::try_from(message.payload().unwrap().to_vec()).unwrap();
|
||||
// check the data is in write buffer
|
||||
let mut consumer =
|
||||
FileBufferConsumer::new(write_buffer_dir.path(), &db_name, Default::default(), None)
|
||||
.await
|
||||
.unwrap();
|
||||
let (_, mut stream) = consumer.streams().into_iter().next().unwrap();
|
||||
let sequenced_entry = stream.stream.next().await.unwrap().unwrap();
|
||||
let entry = sequenced_entry.entry();
|
||||
let partition_writes = entry.partition_writes().unwrap();
|
||||
assert_eq!(partition_writes.len(), 2);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn writes_go_to_kafka_whitelist() {
|
||||
let kafka_connection = maybe_skip_kafka_integration!();
|
||||
async fn writes_go_to_write_buffer_whitelist() {
|
||||
let write_buffer_dir = TempDir::new().unwrap();
|
||||
|
||||
// set up a database with a write buffer pointing at kafka
|
||||
// set up a database with a write buffer pointing at write buffer
|
||||
let server = ServerFixture::create_shared().await;
|
||||
let db_name = rand_name();
|
||||
let write_buffer_connection = WriteBufferConnection {
|
||||
direction: WriteBufferDirection::Write.into(),
|
||||
r#type: "kafka".to_string(),
|
||||
connection: kafka_connection.to_string(),
|
||||
r#type: "file".to_string(),
|
||||
connection: write_buffer_dir.path().display().to_string(),
|
||||
creation_config: Some(WriteBufferCreationConfig {
|
||||
n_sequencers: 1,
|
||||
options: kafka_sequencer_options(),
|
||||
..Default::default()
|
||||
}),
|
||||
..Default::default()
|
||||
};
|
||||
|
|
@ -125,99 +114,74 @@ async fn writes_go_to_kafka_whitelist() {
|
|||
.expect("cannot write");
|
||||
assert_eq!(num_lines_written, 4);
|
||||
|
||||
// check the data is in kafka
|
||||
let mut cfg = ClientConfig::new();
|
||||
cfg.set("bootstrap.servers", kafka_connection);
|
||||
cfg.set("session.timeout.ms", "6000");
|
||||
cfg.set("enable.auto.commit", "false");
|
||||
cfg.set("group.id", "placeholder");
|
||||
|
||||
let consumer: StreamConsumer = cfg.create().unwrap();
|
||||
let mut topics = TopicPartitionList::new();
|
||||
topics.add_partition(&db_name, 0);
|
||||
topics
|
||||
.set_partition_offset(&db_name, 0, Offset::Beginning)
|
||||
.unwrap();
|
||||
consumer.assign(&topics).unwrap();
|
||||
|
||||
let message = consumer.recv().await.unwrap();
|
||||
assert_eq!(message.topic(), db_name);
|
||||
|
||||
let entry = Entry::try_from(message.payload().unwrap().to_vec()).unwrap();
|
||||
// check the data is in write buffer
|
||||
let mut consumer =
|
||||
FileBufferConsumer::new(write_buffer_dir.path(), &db_name, Default::default(), None)
|
||||
.await
|
||||
.unwrap();
|
||||
let (_, mut stream) = consumer.streams().into_iter().next().unwrap();
|
||||
let sequenced_entry = stream.stream.next().await.unwrap().unwrap();
|
||||
let entry = sequenced_entry.entry();
|
||||
let partition_writes = entry.partition_writes().unwrap();
|
||||
assert_eq!(partition_writes.len(), 1);
|
||||
}
|
||||
|
||||
async fn produce_to_kafka_directly(
|
||||
producer: &FutureProducer,
|
||||
lp: &str,
|
||||
topic: &str,
|
||||
partition: Option<i32>,
|
||||
) {
|
||||
let entry = lp_to_entry(lp);
|
||||
let mut record: FutureRecord<'_, String, _> = FutureRecord::to(topic).payload(entry.data());
|
||||
|
||||
if let Some(pid) = partition {
|
||||
record = record.partition(pid);
|
||||
}
|
||||
|
||||
producer
|
||||
.send_result(record)
|
||||
.unwrap()
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn reads_come_from_kafka() {
|
||||
let kafka_connection = maybe_skip_kafka_integration!();
|
||||
async fn reads_come_from_write_buffer() {
|
||||
let write_buffer_dir = TempDir::new().unwrap();
|
||||
|
||||
// set up a database to read from Kafka
|
||||
// set up a database to read from write buffer
|
||||
let server = ServerFixture::create_shared().await;
|
||||
let db_name = rand_name();
|
||||
let write_buffer_connection = WriteBufferConnection {
|
||||
direction: WriteBufferDirection::Read.into(),
|
||||
r#type: "kafka".to_string(),
|
||||
connection: kafka_connection.to_string(),
|
||||
r#type: "file".to_string(),
|
||||
connection: write_buffer_dir.path().display().to_string(),
|
||||
creation_config: Some(WriteBufferCreationConfig {
|
||||
n_sequencers: 2,
|
||||
options: kafka_sequencer_options(),
|
||||
..Default::default()
|
||||
}),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
// Common Kafka config
|
||||
let mut cfg = ClientConfig::new();
|
||||
cfg.set("bootstrap.servers", &kafka_connection);
|
||||
cfg.set("message.timeout.ms", "5000");
|
||||
|
||||
DatabaseBuilder::new(db_name.clone())
|
||||
.write_buffer(write_buffer_connection)
|
||||
.build(server.grpc_channel())
|
||||
.await;
|
||||
|
||||
// put some points in Kafka
|
||||
let producer: FutureProducer = cfg.create().unwrap();
|
||||
// put some points in write buffer
|
||||
let time_provider = Arc::new(SystemProvider::new());
|
||||
let producer = FileBufferProducer::new(
|
||||
write_buffer_dir.path(),
|
||||
&db_name,
|
||||
Default::default(),
|
||||
time_provider,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let mut sequencer_ids = producer.sequencer_ids().into_iter();
|
||||
let sequencer_id_1 = sequencer_ids.next().unwrap();
|
||||
let sequencer_id_2 = sequencer_ids.next().unwrap();
|
||||
|
||||
// Kafka partitions must be configured based on the primary key because ordering across Kafka
|
||||
// partitions is undefined, so the upsert semantics would be undefined. Entries that can
|
||||
// potentially be merged must end up in the same Kafka partition. This test follows that
|
||||
// constraint, but doesn't actually encode it.
|
||||
|
||||
// Put some data for `upc,region=west` in partition 0
|
||||
// Put some data for `upc,region=west` in sequencer 1
|
||||
let lp_lines = [
|
||||
"upc,region=west user=23.2 100",
|
||||
"upc,region=west user=21.0 150",
|
||||
];
|
||||
produce_to_kafka_directly(&producer, &lp_lines.join("\n"), &db_name, Some(0)).await;
|
||||
producer
|
||||
.store_entry(&lp_to_entry(&lp_lines.join("\n")), sequencer_id_1, None)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Put some data for `upc,region=east` in partition 1
|
||||
// Put some data for `upc,region=east` in sequencer 2
|
||||
let lp_lines = [
|
||||
"upc,region=east user=76.2 300",
|
||||
"upc,region=east user=88.7 350",
|
||||
];
|
||||
produce_to_kafka_directly(&producer, &lp_lines.join("\n"), &db_name, Some(1)).await;
|
||||
producer
|
||||
.store_entry(&lp_to_entry(&lp_lines.join("\n")), sequencer_id_2, None)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let check = async {
|
||||
let mut interval = tokio::time::interval(std::time::Duration::from_millis(500));
|
||||
|
|
@ -265,19 +229,19 @@ async fn reads_come_from_kafka() {
|
|||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn cant_write_to_db_reading_from_kafka() {
|
||||
let kafka_connection = maybe_skip_kafka_integration!();
|
||||
async fn cant_write_to_db_reading_from_write_buffer() {
|
||||
let write_buffer_dir = TempDir::new().unwrap();
|
||||
|
||||
// set up a database to read from Kafka
|
||||
// set up a database to read from write buffer
|
||||
let server = ServerFixture::create_shared().await;
|
||||
let db_name = rand_name();
|
||||
let write_buffer_connection = WriteBufferConnection {
|
||||
direction: WriteBufferDirection::Read.into(),
|
||||
r#type: "kafka".to_string(),
|
||||
connection: kafka_connection.to_string(),
|
||||
r#type: "file".to_string(),
|
||||
connection: write_buffer_dir.path().display().to_string(),
|
||||
creation_config: Some(WriteBufferCreationConfig {
|
||||
n_sequencers: 1,
|
||||
options: kafka_sequencer_options(),
|
||||
..Default::default()
|
||||
}),
|
||||
..Default::default()
|
||||
};
|
||||
|
|
@ -287,7 +251,7 @@ async fn cant_write_to_db_reading_from_kafka() {
|
|||
.build(server.grpc_channel())
|
||||
.await;
|
||||
|
||||
// Writing to this database is an error; all data comes from Kafka
|
||||
// Writing to this database is an error; all data comes from write buffer
|
||||
let mut write_client = server.write_client();
|
||||
let err = write_client
|
||||
.write(&db_name, "temp,region=south color=1")
|
||||
|
|
@ -306,15 +270,15 @@ async fn cant_write_to_db_reading_from_kafka() {
|
|||
|
||||
#[tokio::test]
|
||||
async fn test_create_database_missing_write_buffer_sequencers() {
|
||||
let kafka_connection = maybe_skip_kafka_integration!();
|
||||
let write_buffer_dir = TempDir::new().unwrap();
|
||||
|
||||
// set up a database to read from Kafka
|
||||
// set up a database to read from write buffer
|
||||
let server = ServerFixture::create_shared().await;
|
||||
let db_name = rand_name();
|
||||
let write_buffer_connection = WriteBufferConnection {
|
||||
direction: WriteBufferDirection::Read.into(),
|
||||
r#type: "kafka".to_string(),
|
||||
connection: kafka_connection.to_string(),
|
||||
r#type: "file".to_string(),
|
||||
connection: write_buffer_dir.path().display().to_string(),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
|
|
@ -342,7 +306,7 @@ pub async fn test_cross_write_buffer_tracing() {
|
|||
.with_env("TRACES_EXPORTER_JAEGER_AGENT_PORT", udp_capture.port())
|
||||
.with_client_header("jaeger-debug-id", "some-debug-id");
|
||||
|
||||
// we need to use two servers but the same DB name here because the Kafka topic is named after the DB name
|
||||
// we need to use two servers but the same DB name here because the write buffer topic is named after the DB name
|
||||
let db_name = rand_name();
|
||||
|
||||
// create producer server
|
||||
|
|
@ -359,7 +323,7 @@ pub async fn test_cross_write_buffer_tracing() {
|
|||
connection: write_buffer_dir.path().display().to_string(),
|
||||
creation_config: Some(WriteBufferCreationConfig {
|
||||
n_sequencers: 1,
|
||||
options: kafka_sequencer_options(),
|
||||
..Default::default()
|
||||
}),
|
||||
..Default::default()
|
||||
};
|
||||
|
|
|
|||
|
|
@ -318,12 +318,7 @@ impl WriteBufferReading for FileBufferConsumer {
|
|||
let fetch_high_watermark = move || {
|
||||
let committed = committed.clone();
|
||||
|
||||
let fut = async move {
|
||||
let files = scan_dir::<u64>(&committed, FileType::File).await?;
|
||||
let watermark = files.keys().max().map(|n| n + 1).unwrap_or(0);
|
||||
|
||||
Ok(watermark)
|
||||
};
|
||||
let fut = async move { watermark(&committed).await };
|
||||
fut.boxed() as FetchHighWatermarkFut<'_>
|
||||
};
|
||||
let fetch_high_watermark = Box::new(fetch_high_watermark) as FetchHighWatermark<'_>;
|
||||
|
|
@ -385,40 +380,67 @@ impl ConsumerStream {
|
|||
|
||||
// read file
|
||||
let file_path = path.join(sequence_number.to_string());
|
||||
let data = match tokio::fs::read(&file_path).await {
|
||||
Ok(data) => data,
|
||||
Err(_) => {
|
||||
// just wait a bit
|
||||
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
// decode file
|
||||
let sequence = Sequence {
|
||||
id: sequencer_id,
|
||||
number: sequence_number,
|
||||
};
|
||||
let msg = match Self::decode_file(data, sequence, trace_collector.clone()) {
|
||||
Ok(sequence) => {
|
||||
match next_sequence_number.compare_exchange(
|
||||
sequence_number,
|
||||
sequence_number + 1,
|
||||
Ordering::SeqCst,
|
||||
Ordering::SeqCst,
|
||||
) {
|
||||
Ok(_) => {
|
||||
// can send to output
|
||||
Ok(sequence)
|
||||
let msg = match tokio::fs::read(&file_path).await {
|
||||
Ok(data) => {
|
||||
// decode file
|
||||
let sequence = Sequence {
|
||||
id: sequencer_id,
|
||||
number: sequence_number,
|
||||
};
|
||||
match Self::decode_file(data, sequence, trace_collector.clone()) {
|
||||
Ok(sequence) => {
|
||||
match next_sequence_number.compare_exchange(
|
||||
sequence_number,
|
||||
sequence_number + 1,
|
||||
Ordering::SeqCst,
|
||||
Ordering::SeqCst,
|
||||
) {
|
||||
Ok(_) => {
|
||||
// can send to output
|
||||
Ok(sequence)
|
||||
}
|
||||
Err(_) => {
|
||||
// interleaving change, retry
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(_) => {
|
||||
// interleaving change, retry
|
||||
e => e,
|
||||
}
|
||||
}
|
||||
Err(error) => {
|
||||
match error.kind() {
|
||||
std::io::ErrorKind::NotFound => {
|
||||
// figure out watermark and see if there's a gap in the stream
|
||||
if let Ok(watermark) = watermark(&path).await {
|
||||
// watermark is "last sequence number + 1", so substract 1 before comparing
|
||||
if watermark.saturating_sub(1) > sequence_number {
|
||||
// update position
|
||||
// failures are OK here since we'll re-read this value next round
|
||||
next_sequence_number
|
||||
.compare_exchange(
|
||||
sequence_number,
|
||||
sequence_number + 1,
|
||||
Ordering::SeqCst,
|
||||
Ordering::SeqCst,
|
||||
)
|
||||
.ok();
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
// no gap detected, just wait a bit for new data
|
||||
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
||||
continue;
|
||||
}
|
||||
_ => {
|
||||
// cannot read file => communicate to user
|
||||
Err(Box::new(error) as WriteBufferError)
|
||||
}
|
||||
}
|
||||
}
|
||||
e => e,
|
||||
};
|
||||
|
||||
if tx.send(msg).await.is_err() {
|
||||
// Receiver is gone
|
||||
return;
|
||||
|
|
@ -627,15 +649,46 @@ where
|
|||
Ok(results)
|
||||
}
|
||||
|
||||
async fn watermark(path: &Path) -> Result<u64, WriteBufferError> {
|
||||
let files = scan_dir::<u64>(path, FileType::File).await?;
|
||||
let watermark = files.keys().max().map(|n| n + 1).unwrap_or(0);
|
||||
Ok(watermark)
|
||||
}
|
||||
|
||||
pub mod test_utils {
|
||||
use std::path::Path;
|
||||
|
||||
/// Remove specific entry from write buffer.
|
||||
pub async fn remove_entry(
|
||||
write_buffer_path: &Path,
|
||||
database_name: &str,
|
||||
sequencer_id: u32,
|
||||
sequence_number: u64,
|
||||
) {
|
||||
tokio::fs::remove_file(
|
||||
write_buffer_path
|
||||
.join(database_name)
|
||||
.join("active")
|
||||
.join(sequencer_id.to_string())
|
||||
.join("committed")
|
||||
.join(sequence_number.to_string()),
|
||||
)
|
||||
.await
|
||||
.unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::num::NonZeroU32;
|
||||
|
||||
use entry::test_helpers::lp_to_entry;
|
||||
use tempfile::TempDir;
|
||||
use trace::RingBufferTraceCollector;
|
||||
|
||||
use crate::core::test_utils::{perform_generic_tests, TestAdapter, TestContext};
|
||||
|
||||
use super::test_utils::remove_entry;
|
||||
use super::*;
|
||||
|
||||
struct FileTestAdapter {
|
||||
|
|
@ -717,4 +770,99 @@ mod tests {
|
|||
async fn test_generic() {
|
||||
perform_generic_tests(FileTestAdapter::new()).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_ignores_missing_files_multi() {
|
||||
let adapter = FileTestAdapter::new();
|
||||
let ctx = adapter.new_context(NonZeroU32::new(1).unwrap()).await;
|
||||
|
||||
let writer = ctx.writing(true).await.unwrap();
|
||||
let sequencer_id = writer.sequencer_ids().into_iter().next().unwrap();
|
||||
let entry_1 = lp_to_entry("upc,region=east user=1 100");
|
||||
let entry_2 = lp_to_entry("upc,region=east user=2 200");
|
||||
let entry_3 = lp_to_entry("upc,region=east user=3 300");
|
||||
let entry_4 = lp_to_entry("upc,region=east user=4 400");
|
||||
let (sequence_1, _) = writer
|
||||
.store_entry(&entry_1, sequencer_id, None)
|
||||
.await
|
||||
.unwrap();
|
||||
let (sequence_2, _) = writer
|
||||
.store_entry(&entry_2, sequencer_id, None)
|
||||
.await
|
||||
.unwrap();
|
||||
let (sequence_3, _) = writer
|
||||
.store_entry(&entry_3, sequencer_id, None)
|
||||
.await
|
||||
.unwrap();
|
||||
let (sequence_4, _) = writer
|
||||
.store_entry(&entry_4, sequencer_id, None)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
remove_entry(
|
||||
&ctx.path,
|
||||
&ctx.database_name,
|
||||
sequencer_id,
|
||||
sequence_2.number,
|
||||
)
|
||||
.await;
|
||||
remove_entry(
|
||||
&ctx.path,
|
||||
&ctx.database_name,
|
||||
sequencer_id,
|
||||
sequence_3.number,
|
||||
)
|
||||
.await;
|
||||
|
||||
let mut reader = ctx.reading(true).await.unwrap();
|
||||
let mut stream = reader.streams().remove(&sequencer_id).unwrap();
|
||||
let sequenced_entry_1 = stream.stream.next().await.unwrap().unwrap();
|
||||
let sequenced_entry_4 = stream.stream.next().await.unwrap().unwrap();
|
||||
assert_eq!(
|
||||
sequence_1.number,
|
||||
sequenced_entry_1.sequence().unwrap().number
|
||||
);
|
||||
assert_eq!(
|
||||
sequence_4.number,
|
||||
sequenced_entry_4.sequence().unwrap().number
|
||||
);
|
||||
assert_eq!(&entry_1, sequenced_entry_1.entry());
|
||||
assert_eq!(&entry_4, sequenced_entry_4.entry());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_ignores_missing_files_single() {
|
||||
let adapter = FileTestAdapter::new();
|
||||
let ctx = adapter.new_context(NonZeroU32::new(1).unwrap()).await;
|
||||
|
||||
let writer = ctx.writing(true).await.unwrap();
|
||||
let sequencer_id = writer.sequencer_ids().into_iter().next().unwrap();
|
||||
let entry_1 = lp_to_entry("upc,region=east user=1 100");
|
||||
let entry_2 = lp_to_entry("upc,region=east user=2 200");
|
||||
let (sequence_1, _) = writer
|
||||
.store_entry(&entry_1, sequencer_id, None)
|
||||
.await
|
||||
.unwrap();
|
||||
let (sequence_2, _) = writer
|
||||
.store_entry(&entry_2, sequencer_id, None)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
remove_entry(
|
||||
&ctx.path,
|
||||
&ctx.database_name,
|
||||
sequencer_id,
|
||||
sequence_1.number,
|
||||
)
|
||||
.await;
|
||||
|
||||
let mut reader = ctx.reading(true).await.unwrap();
|
||||
let mut stream = reader.streams().remove(&sequencer_id).unwrap();
|
||||
let sequenced_entry_2 = stream.stream.next().await.unwrap().unwrap();
|
||||
assert_eq!(
|
||||
sequence_2.number,
|
||||
sequenced_entry_2.sequence().unwrap().number
|
||||
);
|
||||
assert_eq!(&entry_2, sequenced_entry_2.entry());
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue