Merge branch 'main' into alamb/update_deps

pull/24376/head
kodiakhq[bot] 2021-02-25 13:24:32 +00:00 committed by GitHub
commit 76921bdef9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 215 additions and 6 deletions

17
Cargo.lock generated
View File

@ -1590,6 +1590,7 @@ dependencies = [
"tokio", "tokio",
"tokio-stream", "tokio-stream",
"tonic", "tonic",
"tonic-health",
"tracing", "tracing",
"tracing-futures", "tracing-futures",
"tracing-opentelemetry", "tracing-opentelemetry",
@ -1604,6 +1605,7 @@ dependencies = [
"arrow_deps", "arrow_deps",
"data_types", "data_types",
"futures-util", "futures-util",
"generated_types",
"rand 0.8.3", "rand 0.8.3",
"reqwest", "reqwest",
"serde", "serde",
@ -3907,6 +3909,21 @@ dependencies = [
"syn", "syn",
] ]
[[package]]
name = "tonic-health"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a93d6649c8f5436d65337af08887a516183a096d785ef1fc3acf69ed60dbec6b"
dependencies = [
"async-stream",
"bytes",
"prost",
"tokio",
"tokio-stream",
"tonic",
"tonic-build",
]
[[package]] [[package]]
name = "tower" name = "tower"
version = "0.4.5" version = "0.4.5"

View File

@ -81,6 +81,7 @@ structopt = "0.3.21"
tokio = { version = "1.0", features = ["macros", "rt-multi-thread", "parking_lot"] } tokio = { version = "1.0", features = ["macros", "rt-multi-thread", "parking_lot"] }
tokio-stream = { version = "0.1.2", features = ["net"] } tokio-stream = { version = "0.1.2", features = ["net"] }
tonic = "0.4.0" tonic = "0.4.0"
tonic-health = "0.3.0"
tracing = { version = "0.1", features = ["release_max_level_debug"] } tracing = { version = "0.1", features = ["release_max_level_debug"] }
tracing-futures = "0.2.4" tracing-futures = "0.2.4"
tracing-opentelemetry = "0.11.0" tracing-opentelemetry = "0.11.0"

View File

@ -196,6 +196,22 @@ all data in the `company` organization's `sensors` bucket for the `processes` me
curl -v -G -d 'org=company' -d 'bucket=sensors' --data-urlencode 'sql_query=select * from processes' "http://127.0.0.1:8080/api/v2/read" curl -v -G -d 'org=company' -d 'bucket=sensors' --data-urlencode 'sql_query=select * from processes' "http://127.0.0.1:8080/api/v2/read"
``` ```
### Health Checks
The HTTP API exposes a healthcheck endpoint at `/health`
```shell
$ curl http://127.0.0.1:8080/health
OK
```
The gRPC API implements the [gRPC Health Checking Protocol](https://github.com/grpc/grpc/blob/master/doc/health-checking.md). This can be tested with [grpc-health-probe](https://github.com/grpc-ecosystem/grpc-health-probe)
```shell
$ grpc_health_probe -addr 127.0.0.1:8082 -service influxdata.platform.storage.Storage
status: SERVING
```
## Contributing ## Contributing
We welcome community contributions from anyone! We welcome community contributions from anyone!

View File

@ -5,6 +5,7 @@ build:
excludes: excludes:
- generated_types/protos/com - generated_types/protos/com
- generated_types/protos/influxdata/platform - generated_types/protos/influxdata/platform
- generated_types/protos/grpc
lint: lint:
use: use:

View File

@ -28,6 +28,7 @@ fn generate_grpc_types(root: &Path) -> Result<()> {
let storage_path = root.join("influxdata/platform/storage"); let storage_path = root.join("influxdata/platform/storage");
let idpe_path = root.join("com/github/influxdata/idpe/storage/read"); let idpe_path = root.join("com/github/influxdata/idpe/storage/read");
let management_path = root.join("influxdata/iox/management/v1"); let management_path = root.join("influxdata/iox/management/v1");
let grpc_path = root.join("grpc/health/v1");
let proto_files = vec![ let proto_files = vec![
storage_path.join("test.proto"), storage_path.join("test.proto"),
@ -39,6 +40,7 @@ fn generate_grpc_types(root: &Path) -> Result<()> {
management_path.join("base_types.proto"), management_path.join("base_types.proto"),
management_path.join("database_rules.proto"), management_path.join("database_rules.proto"),
management_path.join("service.proto"), management_path.join("service.proto"),
grpc_path.join("service.proto"),
]; ];
// Tell cargo to recompile if any of these proto files are changed // Tell cargo to recompile if any of these proto files are changed

View File

@ -0,0 +1,23 @@
syntax = "proto3";
package grpc.health.v1;
message HealthCheckRequest {
string service = 1;
}
message HealthCheckResponse {
enum ServingStatus {
UNKNOWN = 0;
SERVING = 1;
NOT_SERVING = 2;
SERVICE_UNKNOWN = 3; // Used only by the Watch method.
}
ServingStatus status = 1;
}
service Health {
rpc Check(HealthCheckRequest) returns (HealthCheckResponse);
rpc Watch(HealthCheckRequest) returns (stream HealthCheckResponse);
}

View File

@ -52,12 +52,28 @@ mod pb {
} }
} }
} }
// Needed because of https://github.com/hyperium/tonic/issues/471
pub mod grpc {
pub mod health {
pub mod v1 {
include!(concat!(env!("OUT_DIR"), "/grpc.health.v1.rs"));
}
}
}
} }
include!(concat!(env!("OUT_DIR"), "/wal_generated.rs")); include!(concat!(env!("OUT_DIR"), "/wal_generated.rs"));
/// gRPC Storage Service
pub const STORAGE_SERVICE: &str = "influxdata.platform.storage.Storage";
/// gRPC Testing Service
pub const IOX_TESTING_SERVICE: &str = "influxdata.platform.storage.IOxTesting";
/// gRPC Arrow Flight Service
pub const ARROW_SERVICE: &str = "arrow.flight.protocol.FlightService";
pub use pb::com::github::influxdata::idpe::storage::read::*; pub use pb::com::github::influxdata::idpe::storage::read::*;
pub use pb::influxdata::platform::storage::*; pub use pb::influxdata::platform::storage::*;
pub use google_types as google; pub use google_types as google;
pub use pb::influxdata; pub use pb::{grpc, influxdata};

View File

@ -5,12 +5,13 @@ authors = ["Dom Dwyer <dom@itsallbroken.com>"]
edition = "2018" edition = "2018"
[features] [features]
flight = ["arrow_deps", "serde/derive", "tonic", "serde_json", "futures-util"] flight = ["arrow_deps", "serde/derive", "serde_json", "futures-util"]
[dependencies] [dependencies]
# Workspace dependencies, in alphabetical order # Workspace dependencies, in alphabetical order
arrow_deps = { path = "../arrow_deps", optional = true } arrow_deps = { path = "../arrow_deps", optional = true }
data_types = { path = "../data_types" } data_types = { path = "../data_types" }
generated_types = { path = "../generated_types" }
# Crates.io dependencies, in alphabetical order # Crates.io dependencies, in alphabetical order
futures-util = { version = "0.3.1", optional = true } futures-util = { version = "0.3.1", optional = true }
@ -19,7 +20,7 @@ serde = "1.0.118"
serde_json = { version = "1.0.44", optional = true } serde_json = { version = "1.0.44", optional = true }
thiserror = "1.0.23" thiserror = "1.0.23"
tokio = { version = "1.0", features = ["macros"] } tokio = { version = "1.0", features = ["macros"] }
tonic = { version = "0.4.0", optional = true } tonic = { version = "0.4.0" }
[dev-dependencies] # In alphabetical order [dev-dependencies] # In alphabetical order
rand = "0.8.1" rand = "0.8.1"

View File

@ -9,6 +9,9 @@ use data_types::{http::ListDatabasesResponse, DatabaseName};
#[cfg(feature = "flight")] #[cfg(feature = "flight")]
mod flight; mod flight;
/// Client for the gRPC health checking API
pub mod health;
// can't combine these into one statement that uses `{}` because of this bug in // can't combine these into one statement that uses `{}` because of this bug in
// the `unreachable_pub` lint: https://github.com/rust-lang/rust/issues/64762 // the `unreachable_pub` lint: https://github.com/rust-lang/rust/issues/64762
#[cfg(feature = "flight")] #[cfg(feature = "flight")]

View File

@ -0,0 +1,70 @@
use generated_types::grpc::health::v1::*;
use thiserror::Error;
/// Error type for the health check client
#[derive(Debug, Error)]
pub enum Error {
/// Service is not serving
#[error("Service is not serving")]
NotServing,
/// Service returned an unexpected variant for the status enumeration
#[error("Received invalid response: {}", .0)]
InvalidResponse(i32),
/// Error connecting to the server
#[error("Connection error: {}", .0)]
ConnectionError(#[from] tonic::transport::Error),
/// Client received an unexpected error from the server
#[error("Unexpected server error: {}: {}", .0.code(), .0.message())]
UnexpectedError(#[from] tonic::Status),
}
/// Result type for the health check client
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// A client for the gRPC health checking API
///
/// Allows checking the status of a given service
#[derive(Debug)]
pub struct Client {
inner: health_client::HealthClient<tonic::transport::Channel>,
}
impl Client {
/// Create a new client with the provided endpoint
pub async fn connect<D>(dst: D) -> Result<Self>
where
D: std::convert::TryInto<tonic::transport::Endpoint>,
D::Error: Into<tonic::codegen::StdError>,
{
Ok(Self {
inner: health_client::HealthClient::connect(dst).await?,
})
}
/// Returns `Ok()` if the corresponding service is serving
pub async fn check(&mut self, service: impl Into<String>) -> Result<()> {
use health_check_response::ServingStatus;
let status = self
.inner
.check(HealthCheckRequest {
service: service.into(),
})
.await?
.into_inner();
match status.status() {
ServingStatus::Serving => Ok(()),
ServingStatus::NotServing => Err(Error::NotServing),
_ => Err(Error::InvalidResponse(status.status)),
}
}
/// Returns `Ok()` if the storage service is serving
pub async fn check_storage(&mut self) -> Result<()> {
self.check(generated_types::STORAGE_SERVICE).await
}
}

View File

@ -259,6 +259,7 @@ where
})) // this endpoint is for API backward compatibility with InfluxDB 2.x })) // this endpoint is for API backward compatibility with InfluxDB 2.x
.post("/api/v2/write", write::<M>) .post("/api/v2/write", write::<M>)
.get("/ping", ping) .get("/ping", ping)
.get("/health", health)
.get("/api/v2/read", read::<M>) .get("/api/v2/read", read::<M>)
.get("/iox/api/v1/databases", list_databases::<M>) .get("/iox/api/v1/databases", list_databases::<M>)
.put("/iox/api/v1/databases/:name", create_database::<M>) .put("/iox/api/v1/databases/:name", create_database::<M>)
@ -686,11 +687,17 @@ async fn get_writer<M: ConnectionManager + Send + Sync + Debug + 'static>(
// Route to test that the server is alive // Route to test that the server is alive
#[tracing::instrument(level = "debug")] #[tracing::instrument(level = "debug")]
async fn ping(req: Request<Body>) -> Result<Response<Body>, ApplicationError> { async fn ping(_: Request<Body>) -> Result<Response<Body>, ApplicationError> {
let response_body = "PONG"; let response_body = "PONG";
Ok(Response::new(Body::from(response_body.to_string()))) Ok(Response::new(Body::from(response_body.to_string())))
} }
#[tracing::instrument(level = "debug")]
async fn health(_: Request<Body>) -> Result<Response<Body>, ApplicationError> {
let response_body = "OK";
Ok(Response::new(Body::from(response_body.to_string())))
}
#[derive(Deserialize, Debug)] #[derive(Deserialize, Debug)]
/// Arguments in the query string of the request to /partitions /// Arguments in the query string of the request to /partitions
struct DatabaseInfo { struct DatabaseInfo {
@ -832,6 +839,22 @@ mod tests {
Ok(()) Ok(())
} }
#[tokio::test]
async fn test_health() -> Result<()> {
let test_storage = Arc::new(AppServer::new(
ConnectionManagerImpl {},
Arc::new(ObjectStore::new_in_memory(InMemory::new())),
));
let server_url = test_server(Arc::clone(&test_storage));
let client = Client::new();
let response = client.get(&format!("{}/health", server_url)).send().await;
// Print the response so if the test fails, we have a log of what went wrong
check_response("health", response, StatusCode::OK, "OK").await;
Ok(())
}
#[tokio::test] #[tokio::test]
async fn test_write() -> Result<()> { async fn test_write() -> Result<()> {
let test_storage = Arc::new(AppServer::new( let test_storage = Arc::new(AppServer::new(

View File

@ -30,7 +30,22 @@ where
{ {
let stream = TcpListenerStream::new(socket); let stream = TcpListenerStream::new(socket);
let (mut health_reporter, health_service) = tonic_health::server::health_reporter();
let services = [
generated_types::STORAGE_SERVICE,
generated_types::IOX_TESTING_SERVICE,
generated_types::ARROW_SERVICE,
];
for service in &services {
health_reporter
.set_service_status(service, tonic_health::ServingStatus::Serving)
.await;
}
tonic::transport::Server::builder() tonic::transport::Server::builder()
.add_service(health_service)
.add_service(testing::make_server()) .add_service(testing::make_server())
.add_service(storage::make_server(Arc::clone(&server))) .add_service(storage::make_server(Arc::clone(&server)))
.add_service(flight::make_server(server)) .add_service(flight::make_server(server))

View File

@ -377,6 +377,27 @@ impl TestServer {
// different ports but both need to be up for the test to run // different ports but both need to be up for the test to run
let try_grpc_connect = async { let try_grpc_connect = async {
let mut interval = tokio::time::interval(Duration::from_millis(500)); let mut interval = tokio::time::interval(Duration::from_millis(500));
loop {
match influxdb_iox_client::health::Client::connect(GRPC_URL_BASE).await {
Ok(mut client) => {
println!("Successfully connected to server");
match client.check_storage().await {
Ok(_) => {
println!("Storage service is running");
break;
}
Err(e) => println!("Error checking storage service status: {}", e),
}
}
Err(e) => {
println!("Waiting for gRPC API to be up: {}", e);
}
}
interval.tick().await;
}
loop { loop {
match StorageClient::connect(GRPC_URL_BASE).await { match StorageClient::connect(GRPC_URL_BASE).await {
Ok(storage_client) => { Ok(storage_client) => {
@ -387,7 +408,7 @@ impl TestServer {
return; return;
} }
Err(e) => { Err(e) => {
println!("Waiting for gRPC server to be up: {}", e); println!("Failed to create storage client: {}", e)
} }
} }
interval.tick().await; interval.tick().await;
@ -396,7 +417,7 @@ impl TestServer {
let try_http_connect = async { let try_http_connect = async {
let client = reqwest::Client::new(); let client = reqwest::Client::new();
let url = format!("{}/ping", HTTP_BASE); let url = format!("{}/health", HTTP_BASE);
let mut interval = tokio::time::interval(Duration::from_millis(500)); let mut interval = tokio::time::interval(Duration::from_millis(500));
loop { loop {
match client.get(&url).send().await { match client.get(&url).send().await {