parent
a449d5ef74
commit
14ba02ec87
|
@ -6,7 +6,7 @@ use data_types::{database_rules::DatabaseRules, server_id::ServerId, DatabaseNam
|
|||
use generated_types::google::{
|
||||
AlreadyExists, FieldViolation, FieldViolationExt, FromFieldOpt, InternalError, NotFound,
|
||||
};
|
||||
use generated_types::influxdata::iox::management::v1::*;
|
||||
use generated_types::influxdata::iox::management::v1::{Error as ProtobufError, *};
|
||||
use observability_deps::tracing::info;
|
||||
use query::{Database, DatabaseStore};
|
||||
use server::{ConnectionManager, Error, Server};
|
||||
|
@ -376,16 +376,18 @@ where
|
|||
&self,
|
||||
_request: Request<GetServerStatusRequest>,
|
||||
) -> Result<Response<GetServerStatusResponse>, Status> {
|
||||
// TODO: wire up errors (https://github.com/influxdata/influxdb_iox/issues/1624)
|
||||
let initialized = self.server.initialized();
|
||||
|
||||
let database_statuses: Vec<_> = if initialized {
|
||||
self.server
|
||||
.db_names_sorted()
|
||||
.into_iter()
|
||||
.map(|db_name| DatabaseStatus {
|
||||
db_name,
|
||||
error: None,
|
||||
.map(|db_name| {
|
||||
let error = self.server.error_database(&db_name).map(|e| ProtobufError {
|
||||
message: e.to_string(),
|
||||
});
|
||||
|
||||
DatabaseStatus { db_name, error }
|
||||
})
|
||||
.collect()
|
||||
} else {
|
||||
|
@ -395,7 +397,9 @@ where
|
|||
Ok(Response::new(GetServerStatusResponse {
|
||||
server_status: Some(ServerStatus {
|
||||
initialized,
|
||||
error: None,
|
||||
error: self.server.error_generic().map(|e| ProtobufError {
|
||||
message: e.to_string(),
|
||||
}),
|
||||
database_statuses,
|
||||
}),
|
||||
}))
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
use std::collections::HashSet;
|
||||
use std::{collections::HashSet, process::Command};
|
||||
|
||||
use assert_cmd::prelude::CommandCargoExt;
|
||||
use generated_types::{
|
||||
google::protobuf::{Duration, Empty},
|
||||
influxdata::iox::management::v1::{database_rules::RoutingRules, *},
|
||||
|
@ -11,7 +12,7 @@ use test_helpers::assert_contains;
|
|||
use super::scenario::{
|
||||
create_readable_database, create_two_partition_database, create_unreadable_database, rand_name,
|
||||
};
|
||||
use crate::common::server_fixture::ServerFixture;
|
||||
use crate::common::server_fixture::{grpc_channel, wait_for_grpc, BindAddresses, ServerFixture};
|
||||
use std::time::Instant;
|
||||
use tonic::Code;
|
||||
|
||||
|
@ -790,7 +791,7 @@ fn normalize_chunks(chunks: Vec<Chunk>) -> Vec<Chunk> {
|
|||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_get_server_status() {
|
||||
async fn test_get_server_status_ok() {
|
||||
let server_fixture = ServerFixture::create_single_use().await;
|
||||
let mut client = server_fixture.management_client();
|
||||
|
||||
|
@ -835,3 +836,73 @@ async fn test_get_server_status() {
|
|||
let names_expected: HashSet<_> = [db_name1, db_name2].iter().cloned().collect();
|
||||
assert_eq!(names_actual, names_expected);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_get_server_status_global_error() {
|
||||
let addrs = BindAddresses::default();
|
||||
let mut process = Command::cargo_bin("influxdb_iox")
|
||||
.unwrap()
|
||||
.arg("run")
|
||||
.env("INFLUXDB_IOX_OBJECT_STORE", "s3")
|
||||
.env("AWS_ACCESS_KEY_ID", "foo")
|
||||
.env("AWS_SECRET_ACCESS_KEY", "bar")
|
||||
.env("INFLUXDB_IOX_BUCKET", "bucket")
|
||||
.env("INFLUXDB_IOX_BIND_ADDR", addrs.http_bind_addr())
|
||||
.env("INFLUXDB_IOX_GRPC_BIND_ADDR", addrs.grpc_bind_addr())
|
||||
.spawn()
|
||||
.unwrap();
|
||||
|
||||
let wait = wait_for_grpc(&addrs);
|
||||
let wait = tokio::time::timeout(std::time::Duration::from_secs(3), wait);
|
||||
wait.await.unwrap();
|
||||
|
||||
let channel = grpc_channel(&addrs).await.unwrap();
|
||||
let mut client = influxdb_iox_client::management::Client::new(channel);
|
||||
client.update_server_id(42).await.expect("set ID failed");
|
||||
|
||||
let check = async {
|
||||
let mut interval = tokio::time::interval(std::time::Duration::from_millis(500));
|
||||
|
||||
loop {
|
||||
let status = client.get_server_status().await.unwrap();
|
||||
if let Some(err) = status.error {
|
||||
assert!(dbg!(err.message).starts_with("store error:"));
|
||||
return;
|
||||
}
|
||||
|
||||
interval.tick().await;
|
||||
}
|
||||
};
|
||||
let check = tokio::time::timeout(std::time::Duration::from_secs(10), check);
|
||||
check.await.unwrap();
|
||||
|
||||
process.kill().unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_get_server_status_db_error() {
|
||||
let server_fixture = ServerFixture::create_single_use().await;
|
||||
let mut client = server_fixture.management_client();
|
||||
|
||||
// create malformed DB config
|
||||
let mut path = server_fixture.dir().to_path_buf();
|
||||
path.push("42");
|
||||
path.push("my_db");
|
||||
std::fs::create_dir_all(path.clone()).unwrap();
|
||||
path.push("rules.pb");
|
||||
std::fs::write(path, "foo").unwrap();
|
||||
|
||||
// initialize
|
||||
client.update_server_id(42).await.expect("set ID failed");
|
||||
server_fixture.wait_server_initialized().await;
|
||||
|
||||
// check for errors
|
||||
let status = client.get_server_status().await.unwrap();
|
||||
assert!(status.initialized);
|
||||
assert_eq!(status.error, None);
|
||||
assert_eq!(status.database_statuses.len(), 1);
|
||||
let db_status = &status.database_statuses[0];
|
||||
assert_eq!(db_status.db_name, "my_db");
|
||||
assert!(dbg!(&db_status.error.as_ref().unwrap().message)
|
||||
.starts_with("error deserializing database rules from protobuf:"));
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue