test: end2end for skip replay

pull/24376/head
Marco Neumann 2021-08-13 10:46:05 +02:00
parent c959be2319
commit 53d325e8fc
3 changed files with 161 additions and 1 deletions

View File

@ -3,4 +3,5 @@
set -euxo pipefail
cargo test -p write_buffer kafka -- --nocapture
cargo test -p influxdb_iox --test end_to_end skip_replay -- --nocapture
cargo test -p influxdb_iox --test end_to_end write_buffer -- --nocapture

View File

@ -9,6 +9,7 @@ use data_types::{
job::{Job, Operation},
};
use test_helpers::make_temp_file;
use write_buffer::maybe_skip_kafka_integration;
use crate::{
common::server_fixture::ServerFixture,
@ -16,7 +17,7 @@ use crate::{
};
use super::scenario::{create_readable_database, rand_name};
use crate::end_to_end_cases::scenario::fixture_broken_catalog;
use crate::end_to_end_cases::scenario::{fixture_broken_catalog, fixture_replay_broken};
#[tokio::test]
async fn test_server_id() {
@ -692,6 +693,26 @@ async fn test_wipe_persisted_catalog_error_db_exists() {
.stderr(predicate::str::contains(&expected_err));
}
#[tokio::test]
async fn test_skip_replay() {
let kafka_connection = maybe_skip_kafka_integration!();
let db_name = rand_name();
let server_fixture = fixture_replay_broken(&db_name, &kafka_connection).await;
let addr = server_fixture.grpc_base();
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("database")
.arg("recover")
.arg("skip-replay")
.arg(&db_name)
.arg("--host")
.arg(addr)
.assert()
.success()
.stdout(predicate::str::contains("Ok"));
}
#[tokio::test]
async fn test_skip_replay_error_db_exists() {
let server_fixture = ServerFixture::create_shared().await;

View File

@ -1,4 +1,5 @@
use std::iter::once;
use std::time::{Duration, Instant};
use std::{convert::TryInto, str, u32};
use std::{sync::Arc, time::SystemTime};
@ -7,7 +8,10 @@ use arrow::{
record_batch::RecordBatch,
};
use data_types::chunk_metadata::{ChunkStorage, ChunkSummary};
use entry::test_helpers::lp_to_entries;
use futures::prelude::*;
use influxdb_iox_client::management::generated_types::database_rules::WriteBufferConnection;
use influxdb_iox_client::management::generated_types::partition_template;
use prost::Message;
use rand::{
distributions::{Alphanumeric, Standard},
@ -23,6 +27,9 @@ use generated_types::{
ReadSource, TimestampRange,
};
use influxdb_iox_client::{connection::Connection, flight::PerformQuery};
use write_buffer::core::WriteBufferWriting;
use write_buffer::kafka::test_utils::{create_kafka_topic, purge_kafka_topic};
use write_buffer::kafka::KafkaBufferProducer;
use crate::common::server_fixture::{ServerFixture, DEFAULT_SERVER_ID};
@ -618,3 +625,134 @@ pub async fn fixture_broken_catalog(db_name: &str) -> ServerFixture {
fixture
}
/// Creates a database that cannot be replayed
pub async fn fixture_replay_broken(db_name: &str, kafka_connection: &str) -> ServerFixture {
let server_id = DEFAULT_SERVER_ID;
let env = vec![("INFLUXDB_IOX_SKIP_REPLAY".to_string(), "no".to_string())];
let fixture = ServerFixture::create_single_use_with_env(env).await;
fixture
.management_client()
.update_server_id(server_id)
.await
.unwrap();
fixture.wait_server_initialized().await;
// setup Kafka
create_kafka_topic(kafka_connection, db_name, 1).await;
// Create database
let partition_template = data_types::database_rules::PartitionTemplate {
parts: vec![data_types::database_rules::TemplatePart::Column(
"partition_by".to_string(),
)],
};
fixture
.management_client()
.create_database(DatabaseRules {
name: db_name.to_string(),
write_buffer_connection: Some(WriteBufferConnection::Reading(
kafka_connection.to_string(),
)),
partition_template: Some(PartitionTemplate {
parts: vec![partition_template::Part {
part: Some(partition_template::part::Part::Column(
"partition_by".to_string(),
)),
}],
}),
lifecycle_rules: Some(LifecycleRules {
persist: true,
late_arrive_window_seconds: 1,
persist_age_threshold_seconds: 3600,
persist_row_threshold: 2,
..Default::default()
}),
..Default::default()
})
.await
.unwrap();
// ingest data as mixed throughput
let producer = KafkaBufferProducer::new(kafka_connection, db_name).unwrap();
producer
.store_entry(
&lp_to_entries("table_1,partition_by=a foo=1 10", &partition_template)
.pop()
.unwrap(),
0,
)
.await
.unwrap();
producer
.store_entry(
&lp_to_entries("table_1,partition_by=b foo=2 20", &partition_template)
.pop()
.unwrap(),
0,
)
.await
.unwrap();
producer
.store_entry(
&lp_to_entries("table_1,partition_by=b foo=3 30", &partition_template)
.pop()
.unwrap(),
0,
)
.await
.unwrap();
// wait for ingest
let t_0 = Instant::now();
loop {
// use later partition here so that we can implicitely wait for both entries
if fixture
.management_client()
.get_partition(db_name, "partition_by_b")
.await
.is_ok()
{
break;
}
assert!(t_0.elapsed() < Duration::from_secs(10));
tokio::time::sleep(Duration::from_millis(100)).await;
}
wait_for_exact_chunk_states(
&fixture,
db_name,
vec![
ChunkStorage::ReadBuffer,
ChunkStorage::ReadBufferAndObjectStore,
],
Duration::from_secs(10),
)
.await;
// purge data from Kafka
purge_kafka_topic(kafka_connection, db_name).await;
producer
.store_entry(
&lp_to_entries("table_1,partition_by=c foo=4 40", &partition_template)
.pop()
.unwrap(),
0,
)
.await
.unwrap();
// Try to replay and error
let fixture = fixture.restart_server().await;
let status = fixture.wait_server_initialized().await;
assert_eq!(status.database_statuses.len(), 1);
let load_error = &status.database_statuses[0].error.as_ref().unwrap().message;
assert_contains!(load_error, "error during replay: Cannot replay");
fixture
}