Merge pull request #3379 from influxdata/crepererum/issue3375

fix: treat early server worker exit as proper error
pull/24376/head
kodiakhq[bot] 2021-12-16 08:39:52 +00:00 committed by GitHub
commit 32ddd08445
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 280 additions and 12 deletions

View File

@ -5,14 +5,19 @@ use crate::structopt_blocks::run_config::RunConfig;
pub mod database;
pub mod router;
pub mod test;
#[derive(Debug, Snafu)]
#[allow(clippy::enum_variant_names)]
pub enum Error {
#[snafu(display("Error in database subcommand: {}", source))]
DatabaseError { source: database::Error },
#[snafu(display("Error in router subcommand: {}", source))]
RouterError { source: router::Error },
#[snafu(display("Error in test subcommand: {}", source))]
TestError { source: test::Error },
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -34,6 +39,7 @@ impl Config {
None => &self.database_config.run_config,
Some(Command::Database(config)) => &config.run_config,
Some(Command::Router(config)) => &config.run_config,
Some(Command::Test(config)) => &config.run_config,
}
}
}
@ -42,6 +48,7 @@ impl Config {
enum Command {
Database(database::Config),
Router(router::Config),
Test(test::Config),
}
pub async fn command(config: Config) -> Result<()> {
@ -56,5 +63,6 @@ pub async fn command(config: Config) -> Result<()> {
}
Some(Command::Database(config)) => database::command(config).await.context(DatabaseError),
Some(Command::Router(config)) => router::command(config).await.context(RouterError),
Some(Command::Test(config)) => test::command(config).await.context(TestError),
}
}

View File

@ -0,0 +1,69 @@
//! Implementation of command line option for running server
use std::sync::Arc;
use crate::{
influxdb_ioxd::{
self,
server_type::{
common_state::{CommonServerState, CommonServerStateError},
test::{TestAction, TestServerType},
},
},
structopt_blocks::run_config::RunConfig,
};
use metric::Registry;
use structopt::StructOpt;
use thiserror::Error;
#[derive(Debug, Error)]
pub enum Error {
#[error("Run: {0}")]
Run(#[from] influxdb_ioxd::Error),
#[error("Invalid config: {0}")]
InvalidConfig(#[from] CommonServerStateError),
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug, StructOpt)]
#[structopt(
name = "run",
about = "Runs in test mode",
long_about = "Run the IOx test server.\n\nThe configuration options below can be \
set either with the command line flags or with the specified environment \
variable. If there is a file named '.env' in the current working directory, \
it is sourced before loading the configuration.
Configuration is loaded from the following sources (highest precedence first):
- command line arguments
- user set environment variables
- .env file contents
- pre-configured default values"
)]
pub struct Config {
#[structopt(flatten)]
pub(crate) run_config: RunConfig,
/// Test action
#[structopt(
long = "--test-action",
env = "IOX_TEST_ACTION",
default_value = "None",
possible_values = &TestAction::variants(),
case_insensitive = true,
)]
test_action: TestAction,
}
pub async fn command(config: Config) -> Result<()> {
let common_state = CommonServerState::from_config(config.run_config.clone())?;
let server_type = Arc::new(TestServerType::new(
Arc::new(Registry::new()),
common_state.trace_collector(),
config.test_action,
));
Ok(influxdb_ioxd::main(common_state, server_type).await?)
}

View File

@ -33,6 +33,15 @@ pub enum Error {
#[snafu(display("Error serving RPC: {}", source))]
ServingRpc { source: server_type::RpcError },
#[snafu(display("Early Http shutdown"))]
LostHttp,
#[snafu(display("Early RPC shutdown"))]
LostRpc,
#[snafu(display("Early server shutdown"))]
LostServer,
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -276,19 +285,28 @@ where
_ = signal => info!("Shutdown requested"),
_ = server_handle => {
error!("server worker shutdown prematurely");
res = res.and(Err(Error::LostServer));
},
result = grpc_server => match result {
Ok(_) => info!("gRPC server shutdown"),
Ok(_) if frontend_shutdown.is_cancelled() => info!("gRPC server shutdown"),
Ok(_) => {
error!("Early gRPC server exit");
res = res.and(Err(Error::LostRpc));
}
Err(error) => {
error!(%error, "gRPC server error");
res = res.and(Err(Error::ServingRpc{source: error}))
res = res.and(Err(Error::ServingRpc{source: error}));
}
},
result = http_server => match result {
Ok(_) => info!("HTTP server shutdown"),
Ok(_) if frontend_shutdown.is_cancelled() => info!("HTTP server shutdown"),
Ok(_) => {
error!("Early HTTP server exit");
res = res.and(Err(Error::LostHttp));
}
Err(error) => {
error!(%error, "HTTP server error");
res = res.and(Err(Error::ServingHttp{source: error}))
res = res.and(Err(Error::ServingHttp{source: error}));
}
},
}

View File

@ -102,6 +102,7 @@ pub(crate) use add_gated_service;
/// be used w/ [`serve_builder`].
macro_rules! setup_builder {
($input:ident, $server_type:ident) => {{
#[allow(unused_imports)]
use $crate::influxdb_ioxd::{
rpc::{add_service, testing, RpcBuilder},
server_type::ServerType,

View File

@ -126,6 +126,7 @@ mod tests {
use server::rules::ProvidedDatabaseRules;
use std::{convert::TryInto, net::SocketAddr, num::NonZeroU64};
use structopt::StructOpt;
use test_helpers::assert_error;
use tokio::task::JoinHandle;
use trace::{
span::{Span, SpanStatus},
@ -149,7 +150,7 @@ mod tests {
config: RunConfig,
application: Arc<ApplicationState>,
server: Arc<Server>,
) {
) -> Result<(), crate::influxdb_ioxd::Error> {
let grpc_listener = grpc_listener(config.grpc_bind_address.into())
.await
.unwrap();
@ -160,9 +161,7 @@ mod tests {
let common_state = CommonServerState::from_config(config).unwrap();
let server_type = Arc::new(DatabaseServerType::new(application, server, &common_state));
serve(common_state, grpc_listener, http_listener, server_type)
.await
.unwrap()
serve(common_state, grpc_listener, http_listener, server_type).await
}
#[tokio::test]
@ -438,8 +437,11 @@ mod tests {
assert!(spans[2].end.is_some());
assert_ne!(spans[0].ctx.span_id, spans[1].ctx.span_id);
// shutdown server early
server.shutdown();
join.await.unwrap().unwrap();
let res = join.await.unwrap();
assert_error!(res, crate::influxdb_ioxd::Error::LostServer);
}
/// Ensure that query is fully executed.
@ -507,8 +509,10 @@ mod tests {
.await
.unwrap();
// early shutdown
server.shutdown();
join.await.unwrap().unwrap();
let res = join.await.unwrap();
assert_error!(res, crate::influxdb_ioxd::Error::LostServer);
// Check generated traces
@ -577,8 +581,10 @@ mod tests {
collector.drain().await.unwrap();
// early shutdown
server.shutdown();
join.await.unwrap().unwrap();
let res = join.await.unwrap();
assert_error!(res, crate::influxdb_ioxd::Error::LostServer);
let span = receiver.recv().await.unwrap();
assert_eq!(span.ctx.trace_id.get(), 0x34f8495);

View File

@ -11,6 +11,7 @@ use crate::influxdb_ioxd::{http::error::HttpApiErrorSource, rpc::RpcBuilderInput
pub mod common_state;
pub mod database;
pub mod router;
pub mod test;
#[derive(Debug, Snafu)]
pub enum RpcError {

View File

@ -0,0 +1,114 @@
use std::sync::Arc;
use crate::influxdb_ioxd::{
http::error::{HttpApiError, HttpApiErrorExt, HttpApiErrorSource},
rpc::{serve_builder, setup_builder, RpcBuilderInput},
};
use async_trait::async_trait;
use clap::arg_enum;
use hyper::{Body, Method, Request, Response};
use metric::Registry;
use snafu::Snafu;
use tokio_util::sync::CancellationToken;
use trace::TraceCollector;
use super::{RpcError, ServerType};
#[derive(Debug, Snafu)]
pub enum ApplicationError {
#[snafu(display("No handler for {:?} {}", method, path))]
RouteNotFound { method: Method, path: String },
}
impl HttpApiErrorSource for ApplicationError {
fn to_http_api_error(&self) -> HttpApiError {
match self {
e @ Self::RouteNotFound { .. } => e.not_found(),
}
}
}
arg_enum! {
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum TestAction {
None,
EarlyReturnFromGrpcWorker,
EarlyReturnFromServerWorker,
PanicInGrpcWorker,
PanicInServerWorker,
}
}
#[derive(Debug)]
pub struct TestServerType {
metric_registry: Arc<Registry>,
trace_collector: Option<Arc<dyn TraceCollector>>,
shutdown: CancellationToken,
test_action: TestAction,
}
impl TestServerType {
pub fn new(
metric_registry: Arc<Registry>,
trace_collector: Option<Arc<dyn TraceCollector>>,
test_action: TestAction,
) -> Self {
Self {
metric_registry,
trace_collector,
shutdown: CancellationToken::new(),
test_action,
}
}
}
#[async_trait]
impl ServerType for TestServerType {
type RouteError = ApplicationError;
fn metric_registry(&self) -> Arc<Registry> {
Arc::clone(&self.metric_registry)
}
fn trace_collector(&self) -> Option<Arc<dyn TraceCollector>> {
self.trace_collector.clone()
}
async fn route_http_request(
&self,
req: Request<Body>,
) -> Result<Response<Body>, Self::RouteError> {
Err(ApplicationError::RouteNotFound {
method: req.method().clone(),
path: req.uri().path().to_string(),
})
}
async fn server_grpc(self: Arc<Self>, builder_input: RpcBuilderInput) -> Result<(), RpcError> {
match self.test_action {
TestAction::PanicInGrpcWorker => panic!("Test panic in gRPC worker"),
TestAction::EarlyReturnFromGrpcWorker => Ok(()),
_ => {
let builder = setup_builder!(builder_input, self);
serve_builder!(builder);
Ok(())
}
}
}
async fn join(self: Arc<Self>) {
if self.test_action == TestAction::PanicInServerWorker {
panic!("Test panic in server worker");
}
if self.test_action == TestAction::EarlyReturnFromServerWorker {
return;
}
self.shutdown.cancelled().await;
}
fn shutdown(&self) {
self.shutdown.cancel();
}
}

View File

@ -1,6 +1,6 @@
use std::{
io::Read,
process::Command,
process::{Command, Stdio},
time::{Duration, Instant},
};
@ -93,3 +93,54 @@ fn test_deprecated_cli_without_server_type() {
process.kill().unwrap();
process.wait().unwrap();
}
#[test]
fn test_early_return_from_grpc_worker() {
assert_early_exit_return_code("EarlyReturnFromGrpcWorker");
}
#[test]
fn test_early_return_from_server_worker() {
assert_early_exit_return_code("EarlyReturnFromServerWorker");
}
#[test]
fn test_panic_in_grpc_worker() {
assert_early_exit_return_code("PanicInGrpcWorker");
}
#[test]
fn test_panic_in_server_worker() {
assert_early_exit_return_code("PanicInServerWorker");
}
fn assert_early_exit_return_code(test_action: &str) {
let addrs = BindAddresses::default();
let mut process = Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("run")
.arg("test")
.arg("--test-action")
.arg(test_action)
.env("INFLUXDB_IOX_BIND_ADDR", addrs.http_bind_addr())
.env("INFLUXDB_IOX_GRPC_BIND_ADDR", addrs.grpc_bind_addr())
.stdout(Stdio::null())
.stderr(Stdio::null())
.spawn()
.unwrap();
let t_start = Instant::now();
loop {
if t_start.elapsed() > Duration::from_secs(10) {
process.kill().unwrap();
panic!("Server process did not exit!");
}
if let Some(status) = process.try_wait().unwrap() {
assert!(!status.success());
break;
}
std::thread::sleep(Duration::from_millis(10));
}
}