diff --git a/Cargo.lock b/Cargo.lock index 6345cf0326..f1c94743d9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1579,6 +1579,7 @@ dependencies = [ "tokio", "tokio-stream", "tonic", + "tonic-health", "tracing", "tracing-futures", "tracing-opentelemetry", @@ -1593,6 +1594,7 @@ dependencies = [ "arrow_deps", "data_types", "futures-util", + "generated_types", "rand 0.8.3", "reqwest", "serde", @@ -3873,6 +3875,21 @@ dependencies = [ "syn", ] +[[package]] +name = "tonic-health" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a93d6649c8f5436d65337af08887a516183a096d785ef1fc3acf69ed60dbec6b" +dependencies = [ + "async-stream", + "bytes", + "prost", + "tokio", + "tokio-stream", + "tonic", + "tonic-build", +] + [[package]] name = "tower" version = "0.4.5" diff --git a/Cargo.toml b/Cargo.toml index 804db184a7..5a050db810 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -81,6 +81,7 @@ structopt = "0.3.21" tokio = { version = "1.0", features = ["macros", "rt-multi-thread", "parking_lot"] } tokio-stream = { version = "0.1.2", features = ["net"] } tonic = "0.4.0" +tonic-health = "0.3.0" tracing = { version = "0.1", features = ["release_max_level_debug"] } tracing-futures = "0.2.4" tracing-opentelemetry = "0.11.0" diff --git a/README.md b/README.md index b14a5afabf..f4736ece48 100644 --- a/README.md +++ b/README.md @@ -196,6 +196,22 @@ all data in the `company` organization's `sensors` bucket for the `processes` me curl -v -G -d 'org=company' -d 'bucket=sensors' --data-urlencode 'sql_query=select * from processes' "http://127.0.0.1:8080/api/v2/read" ``` +### Health Checks + +The HTTP API exposes a healthcheck endpoint at `/health` + +```shell +$ curl http://127.0.0.1:8080/health +OK +``` + +The gRPC API implements the [gRPC Health Checking Protocol](https://github.com/grpc/grpc/blob/master/doc/health-checking.md). This can be tested with [grpc-health-probe](https://github.com/grpc-ecosystem/grpc-health-probe) + +```shell +$ grpc_health_probe -addr 127.0.0.1:8082 -service influxdata.platform.storage.Storage +status: SERVING +``` + ## Contributing We welcome community contributions from anyone! diff --git a/buf.yaml b/buf.yaml index 030255f9de..fabedb23ea 100644 --- a/buf.yaml +++ b/buf.yaml @@ -5,6 +5,7 @@ build: excludes: - generated_types/protos/com - generated_types/protos/influxdata/platform + - generated_types/protos/grpc lint: use: diff --git a/generated_types/build.rs b/generated_types/build.rs index b632f70123..57261e1558 100644 --- a/generated_types/build.rs +++ b/generated_types/build.rs @@ -28,6 +28,7 @@ fn generate_grpc_types(root: &Path) -> Result<()> { let storage_path = root.join("influxdata/platform/storage"); let idpe_path = root.join("com/github/influxdata/idpe/storage/read"); let management_path = root.join("influxdata/iox/management/v1"); + let grpc_path = root.join("grpc/health/v1"); let proto_files = vec![ storage_path.join("test.proto"), @@ -39,6 +40,7 @@ fn generate_grpc_types(root: &Path) -> Result<()> { management_path.join("base_types.proto"), management_path.join("database_rules.proto"), management_path.join("service.proto"), + grpc_path.join("service.proto"), ]; // Tell cargo to recompile if any of these proto files are changed diff --git a/generated_types/protos/grpc/health/v1/service.proto b/generated_types/protos/grpc/health/v1/service.proto new file mode 100644 index 0000000000..7be24c77c8 --- /dev/null +++ b/generated_types/protos/grpc/health/v1/service.proto @@ -0,0 +1,23 @@ +syntax = "proto3"; + +package grpc.health.v1; + +message HealthCheckRequest { + string service = 1; +} + +message HealthCheckResponse { + enum ServingStatus { + UNKNOWN = 0; + SERVING = 1; + NOT_SERVING = 2; + SERVICE_UNKNOWN = 3; // Used only by the Watch method. + } + ServingStatus status = 1; +} + +service Health { + rpc Check(HealthCheckRequest) returns (HealthCheckResponse); + + rpc Watch(HealthCheckRequest) returns (stream HealthCheckResponse); +} diff --git a/generated_types/src/lib.rs b/generated_types/src/lib.rs index 487164cad1..0cb21817ce 100644 --- a/generated_types/src/lib.rs +++ b/generated_types/src/lib.rs @@ -52,12 +52,28 @@ mod pb { } } } + + // Needed because of https://github.com/hyperium/tonic/issues/471 + pub mod grpc { + pub mod health { + pub mod v1 { + include!(concat!(env!("OUT_DIR"), "/grpc.health.v1.rs")); + } + } + } } include!(concat!(env!("OUT_DIR"), "/wal_generated.rs")); +/// gRPC Storage Service +pub const STORAGE_SERVICE: &str = "influxdata.platform.storage.Storage"; +/// gRPC Testing Service +pub const IOX_TESTING_SERVICE: &str = "influxdata.platform.storage.IOxTesting"; +/// gRPC Arrow Flight Service +pub const ARROW_SERVICE: &str = "arrow.flight.protocol.FlightService"; + pub use pb::com::github::influxdata::idpe::storage::read::*; pub use pb::influxdata::platform::storage::*; pub use google_types as google; -pub use pb::influxdata; +pub use pb::{grpc, influxdata}; diff --git a/influxdb_iox_client/Cargo.toml b/influxdb_iox_client/Cargo.toml index ff4eaa082b..b854697287 100644 --- a/influxdb_iox_client/Cargo.toml +++ b/influxdb_iox_client/Cargo.toml @@ -5,12 +5,13 @@ authors = ["Dom Dwyer "] edition = "2018" [features] -flight = ["arrow_deps", "serde/derive", "tonic", "serde_json", "futures-util"] +flight = ["arrow_deps", "serde/derive", "serde_json", "futures-util"] [dependencies] # Workspace dependencies, in alphabetical order arrow_deps = { path = "../arrow_deps", optional = true } data_types = { path = "../data_types" } +generated_types = { path = "../generated_types" } # Crates.io dependencies, in alphabetical order futures-util = { version = "0.3.1", optional = true } @@ -19,7 +20,7 @@ serde = "1.0.118" serde_json = { version = "1.0.44", optional = true } thiserror = "1.0.23" tokio = { version = "1.0", features = ["macros"] } -tonic = { version = "0.4.0", optional = true } +tonic = { version = "0.4.0" } [dev-dependencies] # In alphabetical order rand = "0.8.1" diff --git a/influxdb_iox_client/src/client.rs b/influxdb_iox_client/src/client.rs index bb4523481e..d04daad147 100644 --- a/influxdb_iox_client/src/client.rs +++ b/influxdb_iox_client/src/client.rs @@ -9,6 +9,9 @@ use data_types::{http::ListDatabasesResponse, DatabaseName}; #[cfg(feature = "flight")] mod flight; +/// Client for the gRPC health checking API +pub mod health; + // can't combine these into one statement that uses `{}` because of this bug in // the `unreachable_pub` lint: https://github.com/rust-lang/rust/issues/64762 #[cfg(feature = "flight")] diff --git a/influxdb_iox_client/src/client/health.rs b/influxdb_iox_client/src/client/health.rs new file mode 100644 index 0000000000..a376c018e4 --- /dev/null +++ b/influxdb_iox_client/src/client/health.rs @@ -0,0 +1,70 @@ +use generated_types::grpc::health::v1::*; +use thiserror::Error; + +/// Error type for the health check client +#[derive(Debug, Error)] +pub enum Error { + /// Service is not serving + #[error("Service is not serving")] + NotServing, + + /// Service returned an unexpected variant for the status enumeration + #[error("Received invalid response: {}", .0)] + InvalidResponse(i32), + + /// Error connecting to the server + #[error("Connection error: {}", .0)] + ConnectionError(#[from] tonic::transport::Error), + + /// Client received an unexpected error from the server + #[error("Unexpected server error: {}: {}", .0.code(), .0.message())] + UnexpectedError(#[from] tonic::Status), +} + +/// Result type for the health check client +pub type Result = std::result::Result; + +/// A client for the gRPC health checking API +/// +/// Allows checking the status of a given service +#[derive(Debug)] +pub struct Client { + inner: health_client::HealthClient, +} + +impl Client { + /// Create a new client with the provided endpoint + pub async fn connect(dst: D) -> Result + where + D: std::convert::TryInto, + D::Error: Into, + { + Ok(Self { + inner: health_client::HealthClient::connect(dst).await?, + }) + } + + /// Returns `Ok()` if the corresponding service is serving + pub async fn check(&mut self, service: impl Into) -> Result<()> { + use health_check_response::ServingStatus; + + let status = self + .inner + .check(HealthCheckRequest { + service: service.into(), + }) + .await? + .into_inner(); + + match status.status() { + ServingStatus::Serving => Ok(()), + ServingStatus::NotServing => Err(Error::NotServing), + _ => Err(Error::InvalidResponse(status.status)), + } + } + + /// Returns `Ok()` if the storage service is serving + pub async fn check_storage(&mut self) -> Result<()> { + self.check(generated_types::STORAGE_SERVICE).await + } +} diff --git a/src/influxdb_ioxd/http.rs b/src/influxdb_ioxd/http.rs index 07adc1b96b..8402fc2b1b 100644 --- a/src/influxdb_ioxd/http.rs +++ b/src/influxdb_ioxd/http.rs @@ -259,6 +259,7 @@ where })) // this endpoint is for API backward compatibility with InfluxDB 2.x .post("/api/v2/write", write::) .get("/ping", ping) + .get("/health", health) .get("/api/v2/read", read::) .get("/iox/api/v1/databases", list_databases::) .put("/iox/api/v1/databases/:name", create_database::) @@ -686,11 +687,17 @@ async fn get_writer( // Route to test that the server is alive #[tracing::instrument(level = "debug")] -async fn ping(req: Request) -> Result, ApplicationError> { +async fn ping(_: Request) -> Result, ApplicationError> { let response_body = "PONG"; Ok(Response::new(Body::from(response_body.to_string()))) } +#[tracing::instrument(level = "debug")] +async fn health(_: Request) -> Result, ApplicationError> { + let response_body = "OK"; + Ok(Response::new(Body::from(response_body.to_string()))) +} + #[derive(Deserialize, Debug)] /// Arguments in the query string of the request to /partitions struct DatabaseInfo { @@ -832,6 +839,22 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_health() -> Result<()> { + let test_storage = Arc::new(AppServer::new( + ConnectionManagerImpl {}, + Arc::new(ObjectStore::new_in_memory(InMemory::new())), + )); + let server_url = test_server(Arc::clone(&test_storage)); + + let client = Client::new(); + let response = client.get(&format!("{}/health", server_url)).send().await; + + // Print the response so if the test fails, we have a log of what went wrong + check_response("health", response, StatusCode::OK, "OK").await; + Ok(()) + } + #[tokio::test] async fn test_write() -> Result<()> { let test_storage = Arc::new(AppServer::new( diff --git a/src/influxdb_ioxd/rpc.rs b/src/influxdb_ioxd/rpc.rs index d67b0a0382..3f85a2a0e4 100644 --- a/src/influxdb_ioxd/rpc.rs +++ b/src/influxdb_ioxd/rpc.rs @@ -30,7 +30,22 @@ where { let stream = TcpListenerStream::new(socket); + let (mut health_reporter, health_service) = tonic_health::server::health_reporter(); + + let services = [ + generated_types::STORAGE_SERVICE, + generated_types::IOX_TESTING_SERVICE, + generated_types::ARROW_SERVICE, + ]; + + for service in &services { + health_reporter + .set_service_status(service, tonic_health::ServingStatus::Serving) + .await; + } + tonic::transport::Server::builder() + .add_service(health_service) .add_service(testing::make_server()) .add_service(storage::make_server(Arc::clone(&server))) .add_service(flight::make_server(server)) diff --git a/tests/end-to-end.rs b/tests/end-to-end.rs index e6530709b6..76f45430eb 100644 --- a/tests/end-to-end.rs +++ b/tests/end-to-end.rs @@ -377,6 +377,27 @@ impl TestServer { // different ports but both need to be up for the test to run let try_grpc_connect = async { let mut interval = tokio::time::interval(Duration::from_millis(500)); + + loop { + match influxdb_iox_client::health::Client::connect(GRPC_URL_BASE).await { + Ok(mut client) => { + println!("Successfully connected to server"); + + match client.check_storage().await { + Ok(_) => { + println!("Storage service is running"); + break; + } + Err(e) => println!("Error checking storage service status: {}", e), + } + } + Err(e) => { + println!("Waiting for gRPC API to be up: {}", e); + } + } + interval.tick().await; + } + loop { match StorageClient::connect(GRPC_URL_BASE).await { Ok(storage_client) => { @@ -387,7 +408,7 @@ impl TestServer { return; } Err(e) => { - println!("Waiting for gRPC server to be up: {}", e); + println!("Failed to create storage client: {}", e) } } interval.tick().await; @@ -396,7 +417,7 @@ impl TestServer { let try_http_connect = async { let client = reqwest::Client::new(); - let url = format!("{}/ping", HTTP_BASE); + let url = format!("{}/health", HTTP_BASE); let mut interval = tokio::time::interval(Duration::from_millis(500)); loop { match client.get(&url).send().await {