From 14ba02ec870e8db03e0e19189aa24a8f1352c1c5 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Thu, 10 Jun 2021 17:52:27 +0200 Subject: [PATCH] feat: expose server and DB init errors over gRPC Closes #1624. --- src/influxdb_ioxd/rpc/management.rs | 16 +++-- tests/end_to_end_cases/management_api.rs | 77 +++++++++++++++++++++++- 2 files changed, 84 insertions(+), 9 deletions(-) diff --git a/src/influxdb_ioxd/rpc/management.rs b/src/influxdb_ioxd/rpc/management.rs index a5963e4ac3..3a648e872a 100644 --- a/src/influxdb_ioxd/rpc/management.rs +++ b/src/influxdb_ioxd/rpc/management.rs @@ -6,7 +6,7 @@ use data_types::{database_rules::DatabaseRules, server_id::ServerId, DatabaseNam use generated_types::google::{ AlreadyExists, FieldViolation, FieldViolationExt, FromFieldOpt, InternalError, NotFound, }; -use generated_types::influxdata::iox::management::v1::*; +use generated_types::influxdata::iox::management::v1::{Error as ProtobufError, *}; use observability_deps::tracing::info; use query::{Database, DatabaseStore}; use server::{ConnectionManager, Error, Server}; @@ -376,16 +376,18 @@ where &self, _request: Request, ) -> Result, Status> { - // TODO: wire up errors (https://github.com/influxdata/influxdb_iox/issues/1624) let initialized = self.server.initialized(); let database_statuses: Vec<_> = if initialized { self.server .db_names_sorted() .into_iter() - .map(|db_name| DatabaseStatus { - db_name, - error: None, + .map(|db_name| { + let error = self.server.error_database(&db_name).map(|e| ProtobufError { + message: e.to_string(), + }); + + DatabaseStatus { db_name, error } }) .collect() } else { @@ -395,7 +397,9 @@ where Ok(Response::new(GetServerStatusResponse { server_status: Some(ServerStatus { initialized, - error: None, + error: self.server.error_generic().map(|e| ProtobufError { + message: e.to_string(), + }), database_statuses, }), })) diff --git a/tests/end_to_end_cases/management_api.rs b/tests/end_to_end_cases/management_api.rs index f8689757f1..0f881d9447 100644 --- a/tests/end_to_end_cases/management_api.rs +++ b/tests/end_to_end_cases/management_api.rs @@ -1,5 +1,6 @@ -use std::collections::HashSet; +use std::{collections::HashSet, process::Command}; +use assert_cmd::prelude::CommandCargoExt; use generated_types::{ google::protobuf::{Duration, Empty}, influxdata::iox::management::v1::{database_rules::RoutingRules, *}, @@ -11,7 +12,7 @@ use test_helpers::assert_contains; use super::scenario::{ create_readable_database, create_two_partition_database, create_unreadable_database, rand_name, }; -use crate::common::server_fixture::ServerFixture; +use crate::common::server_fixture::{grpc_channel, wait_for_grpc, BindAddresses, ServerFixture}; use std::time::Instant; use tonic::Code; @@ -790,7 +791,7 @@ fn normalize_chunks(chunks: Vec) -> Vec { } #[tokio::test] -async fn test_get_server_status() { +async fn test_get_server_status_ok() { let server_fixture = ServerFixture::create_single_use().await; let mut client = server_fixture.management_client(); @@ -835,3 +836,73 @@ async fn test_get_server_status() { let names_expected: HashSet<_> = [db_name1, db_name2].iter().cloned().collect(); assert_eq!(names_actual, names_expected); } + +#[tokio::test] +async fn test_get_server_status_global_error() { + let addrs = BindAddresses::default(); + let mut process = Command::cargo_bin("influxdb_iox") + .unwrap() + .arg("run") + .env("INFLUXDB_IOX_OBJECT_STORE", "s3") + .env("AWS_ACCESS_KEY_ID", "foo") + .env("AWS_SECRET_ACCESS_KEY", "bar") + .env("INFLUXDB_IOX_BUCKET", "bucket") + .env("INFLUXDB_IOX_BIND_ADDR", addrs.http_bind_addr()) + .env("INFLUXDB_IOX_GRPC_BIND_ADDR", addrs.grpc_bind_addr()) + .spawn() + .unwrap(); + + let wait = wait_for_grpc(&addrs); + let wait = tokio::time::timeout(std::time::Duration::from_secs(3), wait); + wait.await.unwrap(); + + let channel = grpc_channel(&addrs).await.unwrap(); + let mut client = influxdb_iox_client::management::Client::new(channel); + client.update_server_id(42).await.expect("set ID failed"); + + let check = async { + let mut interval = tokio::time::interval(std::time::Duration::from_millis(500)); + + loop { + let status = client.get_server_status().await.unwrap(); + if let Some(err) = status.error { + assert!(dbg!(err.message).starts_with("store error:")); + return; + } + + interval.tick().await; + } + }; + let check = tokio::time::timeout(std::time::Duration::from_secs(10), check); + check.await.unwrap(); + + process.kill().unwrap(); +} + +#[tokio::test] +async fn test_get_server_status_db_error() { + let server_fixture = ServerFixture::create_single_use().await; + let mut client = server_fixture.management_client(); + + // create malformed DB config + let mut path = server_fixture.dir().to_path_buf(); + path.push("42"); + path.push("my_db"); + std::fs::create_dir_all(path.clone()).unwrap(); + path.push("rules.pb"); + std::fs::write(path, "foo").unwrap(); + + // initialize + client.update_server_id(42).await.expect("set ID failed"); + server_fixture.wait_server_initialized().await; + + // check for errors + let status = client.get_server_status().await.unwrap(); + assert!(status.initialized); + assert_eq!(status.error, None); + assert_eq!(status.database_statuses.len(), 1); + let db_status = &status.database_statuses[0]; + assert_eq!(db_status.db_name, "my_db"); + assert!(dbg!(&db_status.error.as_ref().unwrap().message) + .starts_with("error deserializing database rules from protobuf:")); +}