From d527775aecbd99489f68bd86de54864bde6deede Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Mon, 25 Oct 2021 13:28:42 +0200 Subject: [PATCH 1/4] feat: allow gaps in file-based WB + improve error handling --- write_buffer/src/file.rs | 178 +++++++++++++++++++++++++++++++-------- 1 file changed, 144 insertions(+), 34 deletions(-) diff --git a/write_buffer/src/file.rs b/write_buffer/src/file.rs index 630e2749e8..7588d36fc8 100644 --- a/write_buffer/src/file.rs +++ b/write_buffer/src/file.rs @@ -318,12 +318,7 @@ impl WriteBufferReading for FileBufferConsumer { let fetch_high_watermark = move || { let committed = committed.clone(); - let fut = async move { - let files = scan_dir::(&committed, FileType::File).await?; - let watermark = files.keys().max().map(|n| n + 1).unwrap_or(0); - - Ok(watermark) - }; + let fut = async move { watermark(&committed).await }; fut.boxed() as FetchHighWatermarkFut<'_> }; let fetch_high_watermark = Box::new(fetch_high_watermark) as FetchHighWatermark<'_>; @@ -385,40 +380,66 @@ impl ConsumerStream { // read file let file_path = path.join(sequence_number.to_string()); - let data = match tokio::fs::read(&file_path).await { - Ok(data) => data, - Err(_) => { - // just wait a bit - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - continue; - } - }; - - // decode file - let sequence = Sequence { - id: sequencer_id, - number: sequence_number, - }; - let msg = match Self::decode_file(data, sequence, trace_collector.clone()) { - Ok(sequence) => { - match next_sequence_number.compare_exchange( - sequence_number, - sequence_number + 1, - Ordering::SeqCst, - Ordering::SeqCst, - ) { - Ok(_) => { - // can send to output - Ok(sequence) + let msg = match tokio::fs::read(&file_path).await { + Ok(data) => { + // decode file + let sequence = Sequence { + id: sequencer_id, + number: sequence_number, + }; + match Self::decode_file(data, sequence, trace_collector.clone()) { + Ok(sequence) => { + match next_sequence_number.compare_exchange( + sequence_number, + sequence_number + 1, + Ordering::SeqCst, + Ordering::SeqCst, + ) { + Ok(_) => { + // can send to output + Ok(sequence) + } + Err(_) => { + // interleaving change, retry + continue; + } + } } - Err(_) => { - // interleaving change, retry + e => e, + } + } + Err(error) => { + match error.kind() { + std::io::ErrorKind::NotFound => { + // figure out watermark and see if there's a gap in the stream + if let Ok(watermark) = watermark(&path).await { + if watermark > sequence_number { + // update position + // failures are OK here since we'll re-read this value next round + next_sequence_number + .compare_exchange( + sequence_number, + sequence_number + 1, + Ordering::SeqCst, + Ordering::SeqCst, + ) + .ok(); + continue; + } + }; + + // no gap detected, just wait a bit for new data + tokio::time::sleep(std::time::Duration::from_secs(1)).await; continue; } + _ => { + // cannot read file => communicate to user + Err(Box::new(error) as WriteBufferError) + } } } - e => e, }; + if tx.send(msg).await.is_err() { // Receiver is gone return; @@ -627,15 +648,45 @@ where Ok(results) } +async fn watermark(path: &Path) -> Result { + let files = scan_dir::(path, FileType::File).await?; + let watermark = files.keys().max().map(|n| n + 1).unwrap_or(0); + Ok(watermark) +} + +pub mod test_utils { + use std::path::Path; + + /// Remove specific entry from write buffer. + pub async fn remove_entry( + root: &Path, + database_name: &str, + sequencer_id: u32, + sequence_number: u64, + ) { + tokio::fs::remove_file( + root.join(database_name) + .join("active") + .join(sequencer_id.to_string()) + .join("committed") + .join(sequence_number.to_string()), + ) + .await + .unwrap() + } +} + #[cfg(test)] mod tests { use std::num::NonZeroU32; + use entry::test_helpers::lp_to_entry; use tempfile::TempDir; use trace::RingBufferTraceCollector; use crate::core::test_utils::{perform_generic_tests, TestAdapter, TestContext}; + use super::test_utils::remove_entry; use super::*; struct FileTestAdapter { @@ -717,4 +768,63 @@ mod tests { async fn test_generic() { perform_generic_tests(FileTestAdapter::new()).await; } + + #[tokio::test] + async fn test_ignores_missing_files() { + let adapter = FileTestAdapter::new(); + let ctx = adapter.new_context(NonZeroU32::new(1).unwrap()).await; + + let writer = ctx.writing(true).await.unwrap(); + let sequencer_id = writer.sequencer_ids().into_iter().next().unwrap(); + let entry_1 = lp_to_entry("upc,region=east user=1 100"); + let entry_2 = lp_to_entry("upc,region=east user=2 200"); + let entry_3 = lp_to_entry("upc,region=east user=3 300"); + let entry_4 = lp_to_entry("upc,region=east user=4 400"); + let (sequence_1, _) = writer + .store_entry(&entry_1, sequencer_id, None) + .await + .unwrap(); + let (sequence_2, _) = writer + .store_entry(&entry_2, sequencer_id, None) + .await + .unwrap(); + let (sequence_3, _) = writer + .store_entry(&entry_3, sequencer_id, None) + .await + .unwrap(); + let (sequence_4, _) = writer + .store_entry(&entry_4, sequencer_id, None) + .await + .unwrap(); + + remove_entry( + &ctx.path, + &ctx.database_name, + sequencer_id, + sequence_2.number, + ) + .await; + remove_entry( + &ctx.path, + &ctx.database_name, + sequencer_id, + sequence_3.number, + ) + .await; + + let mut reader = ctx.reading(true).await.unwrap(); + let mut stream = reader.streams().remove(&sequencer_id).unwrap(); + let sequenced_entry_1 = stream.stream.next().await.unwrap().unwrap(); + let sequenced_entry_4 = stream.stream.next().await.unwrap().unwrap(); + assert_eq!( + sequence_1.number, + sequenced_entry_1.sequence().unwrap().number + ); + assert_eq!( + sequence_4.number, + sequenced_entry_4.sequence().unwrap().number + ); + assert_eq!(&entry_1, sequenced_entry_1.entry()); + assert_eq!(&entry_4, sequenced_entry_4.entry()); + } } From 2833cefc12d807356d0f00fb2ff2a43046af1002 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Mon, 25 Oct 2021 13:32:16 +0200 Subject: [PATCH 2/4] test: remove Kafka from end2end tests Kafka is now sufficiently tested via the `write_buffer` crate. The end2end tests can now use the in-memory mock implementation or -- if servers can only be controlled via CLI -- the file-based implementation. --- Cargo.lock | 1 - Cargo.toml | 1 - docker/Dockerfile.ci.integration | 2 +- docker/integration_test.sh | 7 - tests/end_to_end_cases/management_cli.rs | 6 +- tests/end_to_end_cases/scenario.rs | 56 ++++--- tests/end_to_end_cases/write_buffer.rs | 186 +++++++++-------------- 7 files changed, 106 insertions(+), 153 deletions(-) delete mode 100755 docker/integration_test.sh diff --git a/Cargo.lock b/Cargo.lock index 407e91a546..7b02ddd8ce 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1635,7 +1635,6 @@ dependencies = [ "prost", "query", "rand", - "rdkafka", "read_buffer", "reqwest", "rustyline", diff --git a/Cargo.toml b/Cargo.toml index 05033a0972..b4a09bac4a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -182,7 +182,6 @@ flate2 = "1.0" hex = "0.4.2" predicates = "2.0.3" rand = "0.8.3" -rdkafka = "0.27.0" reqwest = "0.11" tempfile = "3.1.0" diff --git a/docker/Dockerfile.ci.integration b/docker/Dockerfile.ci.integration index 53cef783c8..9294cded81 100644 --- a/docker/Dockerfile.ci.integration +++ b/docker/Dockerfile.ci.integration @@ -21,4 +21,4 @@ ENV TEST_INTEGRATION=1 ENV KAFKA_CONNECT=kafka:9092 # Run the integration tests that connect to Kafka that will be running in another container -CMD ["sh", "-c", "./docker/integration_test.sh"] +CMD ["sh", "-c", "cargo test -p write_buffer kafka -- --nocapture"] diff --git a/docker/integration_test.sh b/docker/integration_test.sh deleted file mode 100755 index 70ad18cc2e..0000000000 --- a/docker/integration_test.sh +++ /dev/null @@ -1,7 +0,0 @@ -#!/bin/bash - -set -euxo pipefail - -cargo test -p write_buffer kafka -- --nocapture -cargo test -p influxdb_iox --test end_to_end skip_replay -- --nocapture -cargo test -p influxdb_iox --test end_to_end write_buffer -- --nocapture diff --git a/tests/end_to_end_cases/management_cli.rs b/tests/end_to_end_cases/management_cli.rs index b318a58fa9..baeb39ac43 100644 --- a/tests/end_to_end_cases/management_cli.rs +++ b/tests/end_to_end_cases/management_cli.rs @@ -8,8 +8,8 @@ use generated_types::google::longrunning::IoxOperation; use generated_types::influxdata::iox::management::v1::{ operation_metadata::Job, WipePreservedCatalog, }; +use tempfile::TempDir; use test_helpers::make_temp_file; -use write_buffer::maybe_skip_kafka_integration; use crate::{ common::server_fixture::ServerFixture, @@ -910,9 +910,9 @@ async fn test_wipe_persisted_catalog_error_db_exists() { #[tokio::test] async fn test_skip_replay() { - let kafka_connection = maybe_skip_kafka_integration!(); + let write_buffer_dir = TempDir::new().unwrap(); let db_name = rand_name(); - let server_fixture = fixture_replay_broken(&db_name, &kafka_connection).await; + let server_fixture = fixture_replay_broken(&db_name, write_buffer_dir.path()).await; let addr = server_fixture.grpc_base(); Command::cargo_bin("influxdb_iox") diff --git a/tests/end_to_end_cases/scenario.rs b/tests/end_to_end_cases/scenario.rs index 2b1f3cb9c5..c965762d49 100644 --- a/tests/end_to_end_cases/scenario.rs +++ b/tests/end_to_end_cases/scenario.rs @@ -1,6 +1,5 @@ -use std::convert::TryFrom; use std::iter::once; -use std::num::NonZeroU32; +use std::path::Path; use std::time::Duration; use std::{convert::TryInto, str, u32}; use std::{sync::Arc, time::SystemTime}; @@ -31,9 +30,9 @@ use generated_types::{ ReadSource, TimestampRange, }; use influxdb_iox_client::{connection::Connection, flight::PerformQuery}; +use time::SystemProvider; use write_buffer::core::WriteBufferWriting; -use write_buffer::kafka::test_utils::{kafka_sequencer_options, purge_kafka_topic}; -use write_buffer::kafka::KafkaBufferProducer; +use write_buffer::file::FileBufferProducer; use crate::common::server_fixture::{ServerFixture, TestConfig, DEFAULT_SERVER_ID}; @@ -655,7 +654,7 @@ pub async fn fixture_broken_catalog(db_name: &str) -> ServerFixture { } /// Creates a database that cannot be replayed -pub async fn fixture_replay_broken(db_name: &str, kafka_connection: &str) -> ServerFixture { +pub async fn fixture_replay_broken(db_name: &str, path: &Path) -> ServerFixture { let server_id = DEFAULT_SERVER_ID; let test_config = TestConfig::new().with_env("INFLUXDB_IOX_SKIP_REPLAY", "no"); @@ -680,11 +679,11 @@ pub async fn fixture_replay_broken(db_name: &str, kafka_connection: &str) -> Ser name: db_name.to_string(), write_buffer_connection: Some(WriteBufferConnection { direction: write_buffer_connection::Direction::Read.into(), - r#type: "kafka".to_string(), - connection: kafka_connection.to_string(), + r#type: "file".to_string(), + connection: path.display().to_string(), creation_config: Some(WriteBufferCreationConfig { n_sequencers: 1, - options: kafka_sequencer_options(), + ..Default::default() }), ..Default::default() }), @@ -708,45 +707,37 @@ pub async fn fixture_replay_broken(db_name: &str, kafka_connection: &str) -> Ser .unwrap(); // ingest data as mixed throughput - let creation_config = Some(data_types::database_rules::WriteBufferCreationConfig { - n_sequencers: NonZeroU32::try_from(1).unwrap(), - options: kafka_sequencer_options(), - }); - let producer = KafkaBufferProducer::new( - kafka_connection, - db_name, - &Default::default(), - creation_config.as_ref(), - Arc::new(time::SystemProvider::new()), - ) - .await - .unwrap(); - producer + let time_provider = Arc::new(SystemProvider::new()); + let producer = FileBufferProducer::new(path, db_name, Default::default(), time_provider) + .await + .unwrap(); + let sequencer_id = producer.sequencer_ids().into_iter().next().unwrap(); + let (sequence_1, _) = producer .store_entry( &lp_to_entries("table_1,partition_by=a foo=1 10", &partition_template) .pop() .unwrap(), - 0, + sequencer_id, None, ) .await .unwrap(); - producer + let (sequence_2, _) = producer .store_entry( &lp_to_entries("table_1,partition_by=b foo=2 20", &partition_template) .pop() .unwrap(), - 0, + sequencer_id, None, ) .await .unwrap(); - producer + let (sequence_3, _) = producer .store_entry( &lp_to_entries("table_1,partition_by=b foo=3 30", &partition_template) .pop() .unwrap(), - 0, + sequencer_id, None, ) .await @@ -766,8 +757,7 @@ pub async fn fixture_replay_broken(db_name: &str, kafka_connection: &str) -> Ser ) .await; - // purge data from Kafka - purge_kafka_topic(kafka_connection, db_name).await; + // add new entry to the end producer .store_entry( &lp_to_entries("table_1,partition_by=c foo=4 40", &partition_template) @@ -779,6 +769,14 @@ pub async fn fixture_replay_broken(db_name: &str, kafka_connection: &str) -> Ser .await .unwrap(); + // purge data from write buffer + write_buffer::file::test_utils::remove_entry(path, db_name, sequencer_id, sequence_1.number) + .await; + write_buffer::file::test_utils::remove_entry(path, db_name, sequencer_id, sequence_2.number) + .await; + write_buffer::file::test_utils::remove_entry(path, db_name, sequencer_id, sequence_3.number) + .await; + // Try to replay and error let fixture = fixture.restart_server().await; diff --git a/tests/end_to_end_cases/write_buffer.rs b/tests/end_to_end_cases/write_buffer.rs index 17e2bbd631..3ca812824d 100644 --- a/tests/end_to_end_cases/write_buffer.rs +++ b/tests/end_to_end_cases/write_buffer.rs @@ -6,7 +6,8 @@ use crate::{ end_to_end_cases::scenario::{rand_name, DatabaseBuilder}, }; use arrow_util::assert_batches_sorted_eq; -use entry::{test_helpers::lp_to_entry, Entry}; +use entry::test_helpers::lp_to_entry; +use futures::StreamExt; use generated_types::influxdata::iox::management::v1::{ write_buffer_connection::Direction as WriteBufferDirection, WriteBufferConnection, }; @@ -14,36 +15,35 @@ use influxdb_iox_client::{ management::{generated_types::WriteBufferCreationConfig, CreateDatabaseError}, write::WriteError, }; -use rdkafka::{ - consumer::{Consumer, StreamConsumer}, - producer::{FutureProducer, FutureRecord}, - ClientConfig, Message, Offset, TopicPartitionList, -}; -use std::convert::TryFrom; +use std::sync::Arc; use tempfile::TempDir; use test_helpers::assert_contains; -use write_buffer::{kafka::test_utils::kafka_sequencer_options, maybe_skip_kafka_integration}; +use time::SystemProvider; +use write_buffer::{ + core::{WriteBufferReading, WriteBufferWriting}, + file::{FileBufferConsumer, FileBufferProducer}, +}; #[tokio::test] -async fn writes_go_to_kafka() { - let kafka_connection = maybe_skip_kafka_integration!(); +async fn writes_go_to_write_buffer() { + let write_buffer_dir = TempDir::new().unwrap(); - // set up a database with a write buffer pointing at kafka + // set up a database with a write buffer pointing at write buffer let server = ServerFixture::create_shared().await; let db_name = rand_name(); let write_buffer_connection = WriteBufferConnection { direction: WriteBufferDirection::Write.into(), - r#type: "kafka".to_string(), - connection: kafka_connection.to_string(), + r#type: "file".to_string(), + connection: write_buffer_dir.path().display().to_string(), creation_config: Some(WriteBufferCreationConfig { n_sequencers: 1, - options: kafka_sequencer_options(), + ..Default::default() }), ..Default::default() }; DatabaseBuilder::new(db_name.clone()) - .write_buffer(write_buffer_connection) + .write_buffer(write_buffer_connection.clone()) .build(server.grpc_channel()) .await; @@ -62,43 +62,32 @@ async fn writes_go_to_kafka() { .expect("cannot write"); assert_eq!(num_lines_written, 3); - // check the data is in kafka - let mut cfg = ClientConfig::new(); - cfg.set("bootstrap.servers", kafka_connection); - cfg.set("session.timeout.ms", "6000"); - cfg.set("enable.auto.commit", "false"); - cfg.set("group.id", "placeholder"); - - let consumer: StreamConsumer = cfg.create().unwrap(); - let mut topics = TopicPartitionList::new(); - topics.add_partition(&db_name, 0); - topics - .set_partition_offset(&db_name, 0, Offset::Beginning) - .unwrap(); - consumer.assign(&topics).unwrap(); - - let message = consumer.recv().await.unwrap(); - assert_eq!(message.topic(), db_name); - - let entry = Entry::try_from(message.payload().unwrap().to_vec()).unwrap(); + // check the data is in write buffer + let mut consumer = + FileBufferConsumer::new(write_buffer_dir.path(), &db_name, Default::default(), None) + .await + .unwrap(); + let (_, mut stream) = consumer.streams().into_iter().next().unwrap(); + let sequenced_entry = stream.stream.next().await.unwrap().unwrap(); + let entry = sequenced_entry.entry(); let partition_writes = entry.partition_writes().unwrap(); assert_eq!(partition_writes.len(), 2); } #[tokio::test] -async fn writes_go_to_kafka_whitelist() { - let kafka_connection = maybe_skip_kafka_integration!(); +async fn writes_go_to_write_buffer_whitelist() { + let write_buffer_dir = TempDir::new().unwrap(); - // set up a database with a write buffer pointing at kafka + // set up a database with a write buffer pointing at write buffer let server = ServerFixture::create_shared().await; let db_name = rand_name(); let write_buffer_connection = WriteBufferConnection { direction: WriteBufferDirection::Write.into(), - r#type: "kafka".to_string(), - connection: kafka_connection.to_string(), + r#type: "file".to_string(), + connection: write_buffer_dir.path().display().to_string(), creation_config: Some(WriteBufferCreationConfig { n_sequencers: 1, - options: kafka_sequencer_options(), + ..Default::default() }), ..Default::default() }; @@ -125,99 +114,74 @@ async fn writes_go_to_kafka_whitelist() { .expect("cannot write"); assert_eq!(num_lines_written, 4); - // check the data is in kafka - let mut cfg = ClientConfig::new(); - cfg.set("bootstrap.servers", kafka_connection); - cfg.set("session.timeout.ms", "6000"); - cfg.set("enable.auto.commit", "false"); - cfg.set("group.id", "placeholder"); - - let consumer: StreamConsumer = cfg.create().unwrap(); - let mut topics = TopicPartitionList::new(); - topics.add_partition(&db_name, 0); - topics - .set_partition_offset(&db_name, 0, Offset::Beginning) - .unwrap(); - consumer.assign(&topics).unwrap(); - - let message = consumer.recv().await.unwrap(); - assert_eq!(message.topic(), db_name); - - let entry = Entry::try_from(message.payload().unwrap().to_vec()).unwrap(); + // check the data is in write buffer + let mut consumer = + FileBufferConsumer::new(write_buffer_dir.path(), &db_name, Default::default(), None) + .await + .unwrap(); + let (_, mut stream) = consumer.streams().into_iter().next().unwrap(); + let sequenced_entry = stream.stream.next().await.unwrap().unwrap(); + let entry = sequenced_entry.entry(); let partition_writes = entry.partition_writes().unwrap(); assert_eq!(partition_writes.len(), 1); } -async fn produce_to_kafka_directly( - producer: &FutureProducer, - lp: &str, - topic: &str, - partition: Option, -) { - let entry = lp_to_entry(lp); - let mut record: FutureRecord<'_, String, _> = FutureRecord::to(topic).payload(entry.data()); - - if let Some(pid) = partition { - record = record.partition(pid); - } - - producer - .send_result(record) - .unwrap() - .await - .unwrap() - .unwrap(); -} - #[tokio::test] -async fn reads_come_from_kafka() { - let kafka_connection = maybe_skip_kafka_integration!(); +async fn reads_come_from_write_buffer() { + let write_buffer_dir = TempDir::new().unwrap(); - // set up a database to read from Kafka + // set up a database to read from write buffer let server = ServerFixture::create_shared().await; let db_name = rand_name(); let write_buffer_connection = WriteBufferConnection { direction: WriteBufferDirection::Read.into(), - r#type: "kafka".to_string(), - connection: kafka_connection.to_string(), + r#type: "file".to_string(), + connection: write_buffer_dir.path().display().to_string(), creation_config: Some(WriteBufferCreationConfig { n_sequencers: 2, - options: kafka_sequencer_options(), + ..Default::default() }), ..Default::default() }; - // Common Kafka config - let mut cfg = ClientConfig::new(); - cfg.set("bootstrap.servers", &kafka_connection); - cfg.set("message.timeout.ms", "5000"); - DatabaseBuilder::new(db_name.clone()) .write_buffer(write_buffer_connection) .build(server.grpc_channel()) .await; - // put some points in Kafka - let producer: FutureProducer = cfg.create().unwrap(); + // put some points in write buffer + let time_provider = Arc::new(SystemProvider::new()); + let producer = FileBufferProducer::new( + write_buffer_dir.path(), + &db_name, + Default::default(), + time_provider, + ) + .await + .unwrap(); + let mut sequencer_ids = producer.sequencer_ids().into_iter(); + let sequencer_id_1 = sequencer_ids.next().unwrap(); + let sequencer_id_2 = sequencer_ids.next().unwrap(); - // Kafka partitions must be configured based on the primary key because ordering across Kafka - // partitions is undefined, so the upsert semantics would be undefined. Entries that can - // potentially be merged must end up in the same Kafka partition. This test follows that - // constraint, but doesn't actually encode it. - - // Put some data for `upc,region=west` in partition 0 + // Put some data for `upc,region=west` in sequencer 1 let lp_lines = [ "upc,region=west user=23.2 100", "upc,region=west user=21.0 150", ]; - produce_to_kafka_directly(&producer, &lp_lines.join("\n"), &db_name, Some(0)).await; + producer + .store_entry(&lp_to_entry(&lp_lines.join("\n")), sequencer_id_1, None) + .await + .unwrap(); - // Put some data for `upc,region=east` in partition 1 + // Put some data for `upc,region=east` in sequencer 2 let lp_lines = [ "upc,region=east user=76.2 300", "upc,region=east user=88.7 350", ]; - produce_to_kafka_directly(&producer, &lp_lines.join("\n"), &db_name, Some(1)).await; + producer + .store_entry(&lp_to_entry(&lp_lines.join("\n")), sequencer_id_2, None) + .await + .unwrap(); let check = async { let mut interval = tokio::time::interval(std::time::Duration::from_millis(500)); @@ -265,19 +229,19 @@ async fn reads_come_from_kafka() { } #[tokio::test] -async fn cant_write_to_db_reading_from_kafka() { - let kafka_connection = maybe_skip_kafka_integration!(); +async fn cant_write_to_db_reading_from_write_buffer() { + let write_buffer_dir = TempDir::new().unwrap(); // set up a database to read from Kafka let server = ServerFixture::create_shared().await; let db_name = rand_name(); let write_buffer_connection = WriteBufferConnection { direction: WriteBufferDirection::Read.into(), - r#type: "kafka".to_string(), - connection: kafka_connection.to_string(), + r#type: "file".to_string(), + connection: write_buffer_dir.path().display().to_string(), creation_config: Some(WriteBufferCreationConfig { n_sequencers: 1, - options: kafka_sequencer_options(), + ..Default::default() }), ..Default::default() }; @@ -306,15 +270,15 @@ async fn cant_write_to_db_reading_from_kafka() { #[tokio::test] async fn test_create_database_missing_write_buffer_sequencers() { - let kafka_connection = maybe_skip_kafka_integration!(); + let write_buffer_dir = TempDir::new().unwrap(); // set up a database to read from Kafka let server = ServerFixture::create_shared().await; let db_name = rand_name(); let write_buffer_connection = WriteBufferConnection { direction: WriteBufferDirection::Read.into(), - r#type: "kafka".to_string(), - connection: kafka_connection.to_string(), + r#type: "file".to_string(), + connection: write_buffer_dir.path().display().to_string(), ..Default::default() }; @@ -359,7 +323,7 @@ pub async fn test_cross_write_buffer_tracing() { connection: write_buffer_dir.path().display().to_string(), creation_config: Some(WriteBufferCreationConfig { n_sequencers: 1, - options: kafka_sequencer_options(), + ..Default::default() }), ..Default::default() }; From 93f6519c341109ce0274e884338e8e70224826b1 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Tue, 26 Oct 2021 09:42:23 +0200 Subject: [PATCH 3/4] fix: address review comments --- docs/testing.md | 2 +- tests/end_to_end_cases/scenario.rs | 42 +++++++++++++++++++------- tests/end_to_end_cases/write_buffer.rs | 8 ++--- write_buffer/src/file.rs | 5 +-- 4 files changed, 39 insertions(+), 18 deletions(-) diff --git a/docs/testing.md b/docs/testing.md index c3b43647ac..fdd68c2adb 100644 --- a/docs/testing.md +++ b/docs/testing.md @@ -136,7 +136,7 @@ You can then run the tests with `KAFKA_CONNECT=localhost:9093`. To run just the tests, the full command would then be: ``` -TEST_INTEGRATION=1 KAFKA_CONNECT=localhost:9093 cargo test -p influxdb_iox --test end_to_end write_buffer +TEST_INTEGRATION=1 KAFKA_CONNECT=localhost:9093 cargo test -p write_buffer kafka --nocapture ``` ### Running `cargo test` in a Docker container diff --git a/tests/end_to_end_cases/scenario.rs b/tests/end_to_end_cases/scenario.rs index c965762d49..f9d2048bad 100644 --- a/tests/end_to_end_cases/scenario.rs +++ b/tests/end_to_end_cases/scenario.rs @@ -654,7 +654,7 @@ pub async fn fixture_broken_catalog(db_name: &str) -> ServerFixture { } /// Creates a database that cannot be replayed -pub async fn fixture_replay_broken(db_name: &str, path: &Path) -> ServerFixture { +pub async fn fixture_replay_broken(db_name: &str, write_buffer_path: &Path) -> ServerFixture { let server_id = DEFAULT_SERVER_ID; let test_config = TestConfig::new().with_env("INFLUXDB_IOX_SKIP_REPLAY", "no"); @@ -680,7 +680,7 @@ pub async fn fixture_replay_broken(db_name: &str, path: &Path) -> ServerFixture write_buffer_connection: Some(WriteBufferConnection { direction: write_buffer_connection::Direction::Read.into(), r#type: "file".to_string(), - connection: path.display().to_string(), + connection: write_buffer_path.display().to_string(), creation_config: Some(WriteBufferCreationConfig { n_sequencers: 1, ..Default::default() @@ -708,9 +708,14 @@ pub async fn fixture_replay_broken(db_name: &str, path: &Path) -> ServerFixture // ingest data as mixed throughput let time_provider = Arc::new(SystemProvider::new()); - let producer = FileBufferProducer::new(path, db_name, Default::default(), time_provider) - .await - .unwrap(); + let producer = FileBufferProducer::new( + write_buffer_path, + db_name, + Default::default(), + time_provider, + ) + .await + .unwrap(); let sequencer_id = producer.sequencer_ids().into_iter().next().unwrap(); let (sequence_1, _) = producer .store_entry( @@ -770,12 +775,27 @@ pub async fn fixture_replay_broken(db_name: &str, path: &Path) -> ServerFixture .unwrap(); // purge data from write buffer - write_buffer::file::test_utils::remove_entry(path, db_name, sequencer_id, sequence_1.number) - .await; - write_buffer::file::test_utils::remove_entry(path, db_name, sequencer_id, sequence_2.number) - .await; - write_buffer::file::test_utils::remove_entry(path, db_name, sequencer_id, sequence_3.number) - .await; + write_buffer::file::test_utils::remove_entry( + write_buffer_path, + db_name, + sequencer_id, + sequence_1.number, + ) + .await; + write_buffer::file::test_utils::remove_entry( + write_buffer_path, + db_name, + sequencer_id, + sequence_2.number, + ) + .await; + write_buffer::file::test_utils::remove_entry( + write_buffer_path, + db_name, + sequencer_id, + sequence_3.number, + ) + .await; // Try to replay and error let fixture = fixture.restart_server().await; diff --git a/tests/end_to_end_cases/write_buffer.rs b/tests/end_to_end_cases/write_buffer.rs index 3ca812824d..db698f0e90 100644 --- a/tests/end_to_end_cases/write_buffer.rs +++ b/tests/end_to_end_cases/write_buffer.rs @@ -232,7 +232,7 @@ async fn reads_come_from_write_buffer() { async fn cant_write_to_db_reading_from_write_buffer() { let write_buffer_dir = TempDir::new().unwrap(); - // set up a database to read from Kafka + // set up a database to read from write buffer let server = ServerFixture::create_shared().await; let db_name = rand_name(); let write_buffer_connection = WriteBufferConnection { @@ -251,7 +251,7 @@ async fn cant_write_to_db_reading_from_write_buffer() { .build(server.grpc_channel()) .await; - // Writing to this database is an error; all data comes from Kafka + // Writing to this database is an error; all data comes from write buffer let mut write_client = server.write_client(); let err = write_client .write(&db_name, "temp,region=south color=1") @@ -272,7 +272,7 @@ async fn cant_write_to_db_reading_from_write_buffer() { async fn test_create_database_missing_write_buffer_sequencers() { let write_buffer_dir = TempDir::new().unwrap(); - // set up a database to read from Kafka + // set up a database to read from write buffer let server = ServerFixture::create_shared().await; let db_name = rand_name(); let write_buffer_connection = WriteBufferConnection { @@ -306,7 +306,7 @@ pub async fn test_cross_write_buffer_tracing() { .with_env("TRACES_EXPORTER_JAEGER_AGENT_PORT", udp_capture.port()) .with_client_header("jaeger-debug-id", "some-debug-id"); - // we need to use two servers but the same DB name here because the Kafka topic is named after the DB name + // we need to use two servers but the same DB name here because the write buffer topic is named after the DB name let db_name = rand_name(); // create producer server diff --git a/write_buffer/src/file.rs b/write_buffer/src/file.rs index 7588d36fc8..a7e3c7a690 100644 --- a/write_buffer/src/file.rs +++ b/write_buffer/src/file.rs @@ -659,13 +659,14 @@ pub mod test_utils { /// Remove specific entry from write buffer. pub async fn remove_entry( - root: &Path, + write_buffer_path: &Path, database_name: &str, sequencer_id: u32, sequence_number: u64, ) { tokio::fs::remove_file( - root.join(database_name) + write_buffer_path + .join(database_name) .join("active") .join(sequencer_id.to_string()) .join("committed") From 3af1504ed263c3c67c2e73569980ce1be25ae9db Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Tue, 26 Oct 2021 10:09:30 +0200 Subject: [PATCH 4/4] fix: race condition in file-based write buffer --- write_buffer/src/file.rs | 41 ++++++++++++++++++++++++++++++++++++++-- 1 file changed, 39 insertions(+), 2 deletions(-) diff --git a/write_buffer/src/file.rs b/write_buffer/src/file.rs index a7e3c7a690..ac99e3dc1b 100644 --- a/write_buffer/src/file.rs +++ b/write_buffer/src/file.rs @@ -413,7 +413,8 @@ impl ConsumerStream { std::io::ErrorKind::NotFound => { // figure out watermark and see if there's a gap in the stream if let Ok(watermark) = watermark(&path).await { - if watermark > sequence_number { + // watermark is "last sequence number + 1", so substract 1 before comparing + if watermark.saturating_sub(1) > sequence_number { // update position // failures are OK here since we'll re-read this value next round next_sequence_number @@ -771,7 +772,7 @@ mod tests { } #[tokio::test] - async fn test_ignores_missing_files() { + async fn test_ignores_missing_files_multi() { let adapter = FileTestAdapter::new(); let ctx = adapter.new_context(NonZeroU32::new(1).unwrap()).await; @@ -828,4 +829,40 @@ mod tests { assert_eq!(&entry_1, sequenced_entry_1.entry()); assert_eq!(&entry_4, sequenced_entry_4.entry()); } + + #[tokio::test] + async fn test_ignores_missing_files_single() { + let adapter = FileTestAdapter::new(); + let ctx = adapter.new_context(NonZeroU32::new(1).unwrap()).await; + + let writer = ctx.writing(true).await.unwrap(); + let sequencer_id = writer.sequencer_ids().into_iter().next().unwrap(); + let entry_1 = lp_to_entry("upc,region=east user=1 100"); + let entry_2 = lp_to_entry("upc,region=east user=2 200"); + let (sequence_1, _) = writer + .store_entry(&entry_1, sequencer_id, None) + .await + .unwrap(); + let (sequence_2, _) = writer + .store_entry(&entry_2, sequencer_id, None) + .await + .unwrap(); + + remove_entry( + &ctx.path, + &ctx.database_name, + sequencer_id, + sequence_1.number, + ) + .await; + + let mut reader = ctx.reading(true).await.unwrap(); + let mut stream = reader.streams().remove(&sequencer_id).unwrap(); + let sequenced_entry_2 = stream.stream.next().await.unwrap().unwrap(); + assert_eq!( + sequence_2.number, + sequenced_entry_2.sequence().unwrap().number + ); + assert_eq!(&entry_2, sequenced_entry_2.entry()); + } }