refactor: rework write buffer compaction as integration test (#2830)

* refactor: rework write buffer compaction as integration test

* chore: fix lint

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Raphael Taylor-Davies 2021-10-14 12:04:44 +01:00 committed by GitHub
parent 074ae40382
commit 4087d094b1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 157 additions and 122 deletions

View File

@ -1225,9 +1225,48 @@ where
} }
} }
pub mod test_utils {
use super::*;
use crate::connection::test_helpers::TestConnectionManager;
use object_store::ObjectStore;
/// Create a new [`ApplicationState`] with an in-memory object store
pub fn make_application() -> Arc<ApplicationState> {
Arc::new(ApplicationState::new(
Arc::new(ObjectStore::new_in_memory()),
None,
))
}
/// Creates a new server with the provided [`ApplicationState`]
pub fn make_server(application: Arc<ApplicationState>) -> Arc<Server<TestConnectionManager>> {
Arc::new(Server::new(
TestConnectionManager::new(),
application,
Default::default(),
))
}
/// Creates a new server with the provided [`ApplicationState`]
///
/// Sets the `server_id` provided and waits for it to initialize
pub async fn make_initialized_server(
server_id: ServerId,
application: Arc<ApplicationState>,
) -> Arc<Server<TestConnectionManager>> {
let server = make_server(application);
server.set_id(server_id).unwrap();
server.wait_for_init().await.unwrap();
server
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::{
test_utils::{make_application, make_server},
*,
};
use arrow::record_batch::RecordBatch; use arrow::record_batch::RecordBatch;
use arrow_util::assert_batches_eq; use arrow_util::assert_batches_eq;
use bytes::Bytes; use bytes::Bytes;
@ -1266,21 +1305,6 @@ mod tests {
const ARBITRARY_DEFAULT_TIME: i64 = 456; const ARBITRARY_DEFAULT_TIME: i64 = 456;
fn make_application() -> Arc<ApplicationState> {
Arc::new(ApplicationState::new(
Arc::new(ObjectStore::new_in_memory()),
None,
))
}
fn make_server(application: Arc<ApplicationState>) -> Arc<Server<TestConnectionManager>> {
Arc::new(Server::new(
TestConnectionManager::new(),
application,
Default::default(),
))
}
#[tokio::test] #[tokio::test]
async fn server_api_calls_return_error_with_no_id_set() { async fn server_api_calls_return_error_with_no_id_set() {
let server = make_server(make_application()); let server = make_server(make_application());

View File

@ -192,18 +192,14 @@ async fn stream_in_sequenced_entries<'a>(
mod tests { mod tests {
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::convert::TryFrom; use std::convert::TryFrom;
use std::num::{NonZeroU32, NonZeroUsize}; use std::num::NonZeroU32;
use ::test_helpers::assert_contains;
use arrow_util::assert_batches_eq; use arrow_util::assert_batches_eq;
use data_types::database_rules::{PartitionTemplate, TemplatePart};
use data_types::sequence::Sequence; use data_types::sequence::Sequence;
use entry::test_helpers::lp_to_entry; use entry::test_helpers::lp_to_entry;
use persistence_windows::min_max_sequence::MinMaxSequence; use persistence_windows::min_max_sequence::MinMaxSequence;
use query::exec::ExecutionContextProvider; use query::exec::ExecutionContextProvider;
use query::frontend::sql::SqlQueryPlanner; use query::frontend::sql::SqlQueryPlanner;
use query::QueryDatabase;
use test_helpers::tracing::TracingCapture;
use write_buffer::mock::{MockBufferForReading, MockBufferSharedState}; use write_buffer::mock::{MockBufferForReading, MockBufferSharedState};
use crate::db::test_helpers::run_query; use crate::db::test_helpers::run_query;
@ -435,87 +431,6 @@ mod tests {
assert_batches_eq!(expected, &batches); assert_batches_eq!(expected, &batches);
} }
#[tokio::test]
async fn write_buffer_reads_wait_for_compaction() {
let tracing_capture = TracingCapture::new();
// setup write buffer
// these numbers are handtuned to trigger hard buffer limits w/o making the test too big
let n_entries = 50u64;
let write_buffer_state =
MockBufferSharedState::empty_with_n_sequencers(NonZeroU32::try_from(1).unwrap());
for sequence_number in 0..n_entries {
let lp = format!(
"table_1,tag_partition_by=a foo=\"hello\",bar=1 {}",
sequence_number / 2
);
write_buffer_state.push_entry(SequencedEntry::new_from_sequence(
Sequence::new(0, sequence_number),
Time::from_timestamp_nanos(0),
lp_to_entry(&lp),
));
}
write_buffer_state.push_entry(SequencedEntry::new_from_sequence(
Sequence::new(0, n_entries),
Time::from_timestamp_nanos(0),
lp_to_entry("table_2,partition_by=a foo=1 0"),
));
// create DB
let partition_template = PartitionTemplate {
parts: vec![TemplatePart::Column("tag_partition_by".to_string())],
};
let test_db = TestDb::builder()
.lifecycle_rules(data_types::database_rules::LifecycleRules {
buffer_size_hard: Some(NonZeroUsize::new(10_000).unwrap()),
mub_row_threshold: NonZeroUsize::new(10).unwrap(),
..Default::default()
})
.partition_template(partition_template)
.build()
.await;
let db = test_db.db;
// start background task loop
let shutdown: CancellationToken = Default::default();
let shutdown_captured = shutdown.clone();
let db_captured = Arc::clone(&db);
let join_handle =
tokio::spawn(async move { db_captured.background_worker(shutdown_captured).await });
let consumer = WriteBufferConsumer::new(
Box::new(MockBufferForReading::new(write_buffer_state, None).unwrap()),
Arc::clone(&db),
test_db.metric_registry.as_ref(),
);
// after a while the table should exist
let t_0 = Instant::now();
loop {
if db.table_schema("table_2").is_some() {
break;
}
assert!(t_0.elapsed() < Duration::from_secs(10));
tokio::time::sleep(Duration::from_millis(100)).await;
}
// do: stop background task loop
shutdown.cancel();
join_handle.await.unwrap();
consumer.shutdown();
consumer.join().await.unwrap();
// no rows should be dropped
let batches = run_query(db, "select sum(bar) as n from table_1").await;
let expected = vec!["+----+", "| n |", "+----+", "| 25 |", "+----+"];
assert_batches_eq!(expected, &batches);
// check that hard buffer limit was actually hit (otherwise this test is pointless/outdated)
assert_contains!(tracing_capture.to_string(), "Hard limit reached while reading from write buffer, waiting for compaction to catch up");
}
#[tokio::test] #[tokio::test]
async fn error_converting_data_from_write_buffer_to_sequenced_entry_is_reported() { async fn error_converting_data_from_write_buffer_to_sequenced_entry_is_reported() {
let write_buffer_state = let write_buffer_state =

View File

@ -14,42 +14,26 @@ use data_types::{
DatabaseName, DatabaseName,
}; };
use iox_object_store::IoxObjectStore; use iox_object_store::IoxObjectStore;
use object_store::ObjectStore;
use predicate::{delete_expr::DeleteExpr, delete_predicate::DeletePredicate}; use predicate::{delete_expr::DeleteExpr, delete_predicate::DeletePredicate};
use query::{QueryChunk, QueryChunkMeta, QueryDatabase}; use query::{QueryChunk, QueryChunkMeta, QueryDatabase};
use server::{ use server::{
connection::test_helpers::TestConnectionManager,
db::test_helpers::{run_query, write_lp}, db::test_helpers::{run_query, write_lp},
rules::ProvidedDatabaseRules, rules::ProvidedDatabaseRules,
ApplicationState, Db, Server, test_utils::{make_application, make_initialized_server},
Db,
}; };
use test_helpers::maybe_start_logging; use test_helpers::maybe_start_logging;
async fn start_server(
server_id: ServerId,
application: Arc<ApplicationState>,
) -> Arc<Server<TestConnectionManager>> {
let server = Arc::new(Server::new(
TestConnectionManager::new(),
application,
Default::default(),
));
server.set_id(server_id).unwrap();
server.wait_for_init().await.unwrap();
server
}
#[tokio::test] #[tokio::test]
async fn delete_predicate_preservation() { async fn delete_predicate_preservation() {
maybe_start_logging(); maybe_start_logging();
// ==================== setup ==================== // ==================== setup ====================
let object_store = Arc::new(ObjectStore::new_in_memory());
let server_id = ServerId::new(NonZeroU32::new(1).unwrap()); let server_id = ServerId::new(NonZeroU32::new(1).unwrap());
let db_name = DatabaseName::new("delete_predicate_preservation_test").unwrap(); let db_name = DatabaseName::new("delete_predicate_preservation_test").unwrap();
let application = Arc::new(ApplicationState::new(Arc::clone(&object_store), None)); let application = make_application();
let server = start_server(server_id, Arc::clone(&application)).await; let server = make_initialized_server(server_id, Arc::clone(&application)).await;
// Test that delete predicates are stored within the preserved catalog // Test that delete predicates are stored within the preserved catalog

View File

@ -0,0 +1,112 @@
use std::num::{NonZeroU32, NonZeroUsize};
use std::time::{Duration, Instant};
use arrow_util::assert_batches_eq;
use data_types::database_rules::{
DatabaseRules, LifecycleRules, PartitionTemplate, TemplatePart, WriteBufferConnection,
WriteBufferDirection,
};
use data_types::{sequence::Sequence, server_id::ServerId, DatabaseName};
use entry::{test_helpers::lp_to_entry, SequencedEntry};
use query::QueryDatabase;
use server::{
db::test_helpers::run_query,
rules::ProvidedDatabaseRules,
test_utils::{make_application, make_initialized_server},
};
use test_helpers::{assert_contains, tracing::TracingCapture};
use time::Time;
use write_buffer::mock::MockBufferSharedState;
#[tokio::test]
async fn write_buffer_reads_wait_for_compaction() {
let tracing_capture = TracingCapture::new();
// ==================== setup ====================
let server_id = ServerId::new(NonZeroU32::new(1).unwrap());
let db_name = DatabaseName::new("delete_predicate_preservation_test").unwrap();
// Test that delete predicates are stored within the preserved catalog
// setup write buffer
// these numbers are handtuned to trigger hard buffer limits w/o making the test too big
let n_entries = 50u64;
let write_buffer_state =
MockBufferSharedState::empty_with_n_sequencers(NonZeroU32::new(1).unwrap());
for sequence_number in 0..n_entries {
let lp = format!(
"table_1,tag_partition_by=a foo=\"hello\",bar=1 {}",
sequence_number / 2
);
write_buffer_state.push_entry(SequencedEntry::new_from_sequence(
Sequence::new(0, sequence_number),
Time::from_timestamp_nanos(0),
lp_to_entry(&lp),
));
}
write_buffer_state.push_entry(SequencedEntry::new_from_sequence(
Sequence::new(0, n_entries),
Time::from_timestamp_nanos(0),
lp_to_entry("table_2,partition_by=a foo=1 0"),
));
let application = make_application();
application
.write_buffer_factory()
.register_mock("my_mock".to_string(), write_buffer_state);
let server = make_initialized_server(server_id, application).await;
// create DB
let rules = DatabaseRules {
partition_template: PartitionTemplate {
parts: vec![TemplatePart::Column("tag_partition_by".to_string())],
},
lifecycle_rules: LifecycleRules {
buffer_size_hard: Some(NonZeroUsize::new(10_000).unwrap()),
mub_row_threshold: NonZeroUsize::new(10).unwrap(),
..Default::default()
},
write_buffer_connection: Some(WriteBufferConnection {
direction: WriteBufferDirection::Read,
type_: "mock".to_string(),
connection: "my_mock".to_string(),
..Default::default()
}),
..DatabaseRules::new(db_name.clone())
};
let database = server
.create_database(ProvidedDatabaseRules::new_rules(rules.clone().into()).unwrap())
.await
.unwrap();
let db = database.initialized_db().unwrap();
// after a while the table should exist
let t_0 = Instant::now();
loop {
if db.table_schema("table_2").is_some() {
break;
}
assert!(t_0.elapsed() < Duration::from_secs(10));
tokio::time::sleep(Duration::from_millis(100)).await;
}
// no rows should be dropped
let batches = run_query(db, "select sum(bar) as n from table_1").await;
let expected = vec!["+----+", "| n |", "+----+", "| 25 |", "+----+"];
assert_batches_eq!(expected, &batches);
// check that hard buffer limit was actually hit (otherwise this test is pointless/outdated)
assert_contains!(
tracing_capture.to_string(),
"Hard limit reached while reading from write buffer, waiting for compaction to catch up"
);
server.shutdown();
server.join().await.unwrap();
}