test: restore `write_buffer_errors_propagate`

This was removed in #2203 due to insufficient mocking capabilities.
pull/24376/head
Marco Neumann 2021-08-12 13:37:27 +02:00
parent 1a7293015b
commit 114a9004b3
1 changed files with 42 additions and 1 deletions

View File

@ -1279,9 +1279,11 @@ mod tests {
use data_types::{
chunk_metadata::ChunkAddr,
database_rules::{
HashRing, LifecycleRules, PartitionTemplate, ShardConfig, TemplatePart, NO_SHARD_CONFIG,
HashRing, LifecycleRules, PartitionTemplate, ShardConfig, TemplatePart,
WriteBufferConnection, NO_SHARD_CONFIG,
},
};
use entry::test_helpers::lp_to_entry;
use futures::TryStreamExt;
use generated_types::database_rules::decode_database_rules;
use influxdb_line_protocol::parse_lines;
@ -1299,6 +1301,7 @@ mod tests {
time::{Duration, Instant},
};
use test_helpers::assert_contains;
use write_buffer::config::WriteBufferConfigFactory;
const ARBITRARY_DEFAULT_TIME: i64 = 456;
@ -2229,6 +2232,44 @@ mod tests {
assert!(matches!(err, Error::CannotCreateDatabase { .. }));
}
#[tokio::test]
async fn write_buffer_errors_propagate() {
let mut factory = WriteBufferConfigFactory::new();
factory.register_alway_fail_mock("my_mock".to_string());
let application = Arc::new(ApplicationState::with_write_buffer_factory(
Arc::new(ObjectStore::new_in_memory()),
Arc::new(factory),
None,
));
let server = make_server(application);
server.set_id(ServerId::try_from(1).unwrap()).unwrap();
server.wait_for_init().await.unwrap();
let db_name = DatabaseName::new("my_db").unwrap();
let rules = DatabaseRules {
name: db_name.clone(),
partition_template: PartitionTemplate {
parts: vec![TemplatePart::TimeFormat("YYYY-MM".to_string())],
},
lifecycle_rules: Default::default(),
routing_rules: None,
worker_cleanup_avg_sleep: Duration::from_secs(2),
write_buffer_connection: Some(WriteBufferConnection::Writing(
"mock://my_mock".to_string(),
)),
};
server.create_database(rules.clone()).await.unwrap();
let entry = lp_to_entry("cpu bar=1 10");
let res = server.write_entry_local(&db_name, entry).await;
assert!(
matches!(res, Err(Error::WriteBuffer { .. })),
"Expected Err(Error::WriteBuffer {{ .. }}), got: {:?}",
res
);
}
// run a sql query against the database, returning the results as record batches
async fn run_query(db: Arc<Db>, query: &str) -> Vec<RecordBatch> {
let planner = SqlQueryPlanner::default();