feat: Enable gRPC reflection

pull/24376/head
Marko Mikulicic 2021-05-05 21:52:55 +02:00
parent c3c80f53ce
commit 0d6d94dc00
No known key found for this signature in database
GPG Key ID: D02A41F91A687DB3
6 changed files with 50 additions and 3 deletions

16
Cargo.lock generated
View File

@ -1520,6 +1520,7 @@ dependencies = [
"tokio-util", "tokio-util",
"tonic", "tonic",
"tonic-health", "tonic-health",
"tonic-reflection",
"tracker", "tracker",
"write_buffer", "write_buffer",
] ]
@ -4038,6 +4039,21 @@ dependencies = [
"tonic-build", "tonic-build",
] ]
[[package]]
name = "tonic-reflection"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "56498d20188550337ea6c48ecf59d11646382dbfcd61051fb98c1093cfae3c21"
dependencies = [
"bytes",
"prost",
"prost-types",
"tokio",
"tokio-stream",
"tonic",
"tonic-build",
]
[[package]] [[package]]
name = "tower" name = "tower"
version = "0.4.7" version = "0.4.7"

View File

@ -97,6 +97,7 @@ tokio-stream = { version = "0.1.2", features = ["net"] }
tokio-util = { version = "0.6.3" } tokio-util = { version = "0.6.3" }
tonic = "0.4.0" tonic = "0.4.0"
tonic-health = "0.3.0" tonic-health = "0.3.0"
tonic-reflection = "0.1.0"
[dev-dependencies] [dev-dependencies]
# Workspace dependencies, in alphabetical order # Workspace dependencies, in alphabetical order

View File

@ -1,5 +1,6 @@
//! Compiles Protocol Buffers into native Rust types. //! Compiles Protocol Buffers into native Rust types.
use std::env;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
type Error = Box<dyn std::error::Error>; type Error = Box<dyn std::error::Error>;
@ -69,7 +70,11 @@ fn generate_grpc_types(root: &Path) -> Result<()> {
) )
.extern_path(".google.protobuf", "::google_types::protobuf"); .extern_path(".google.protobuf", "::google_types::protobuf");
tonic_build::configure().compile_with_config(config, &proto_files, &[root.into()])?; let descriptor_path = PathBuf::from(env::var("OUT_DIR").unwrap()).join("proto_descriptor.bin");
tonic_build::configure()
.file_descriptor_set_path(&descriptor_path)
.format(true)
.compile_with_config(config, &proto_files, &[root.into()])?;
Ok(()) Ok(())
} }

View File

@ -102,6 +102,11 @@ pub fn protobuf_type_url(protobuf_type: &str) -> String {
format!("{}/{}", ANY_TYPE_PREFIX, protobuf_type) format!("{}/{}", ANY_TYPE_PREFIX, protobuf_type)
} }
/// Protobuf file descriptor containing all generated types.
/// Useful in gRPC reflection.
pub const FILE_DESCRIPTOR_SET: &'static [u8] =
tonic::include_file_descriptor_set!("proto_descriptor");
/// Compares the protobuf type URL found within a google.protobuf.Any /// Compares the protobuf type URL found within a google.protobuf.Any
/// message to an expected Protobuf package and message name /// message to an expected Protobuf package and message name
/// ///

View File

@ -50,7 +50,7 @@ pub enum Error {
ServingHttp { source: hyper::Error }, ServingHttp { source: hyper::Error },
#[snafu(display("Error serving RPC: {}", source))] #[snafu(display("Error serving RPC: {}", source))]
ServingRpc { source: tonic::transport::Error }, ServingRpc { source: rpc::Error },
#[snafu(display( #[snafu(display(
"Specified {} for the object store, required configuration missing for {}", "Specified {} for the object store, required configuration missing for {}",

View File

@ -5,6 +5,7 @@ use tokio::net::TcpListener;
use tokio_stream::wrappers::TcpListenerStream; use tokio_stream::wrappers::TcpListenerStream;
use server::{ConnectionManager, Server}; use server::{ConnectionManager, Server};
use snafu::{ResultExt, Snafu};
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
pub mod error; pub mod error;
@ -15,6 +16,19 @@ mod storage;
mod testing; mod testing;
mod write; mod write;
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("gRPC transport error: {}", source))]
TransportError { source: tonic::transport::Error },
#[snafu(display("gRPC reflection error: {}", source))]
ReflectionError {
source: tonic_reflection::server::Error,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Instantiate a server listening on the specified address /// Instantiate a server listening on the specified address
/// implementing the IOx, Storage, and Flight gRPC interfaces, the /// implementing the IOx, Storage, and Flight gRPC interfaces, the
/// underlying hyper server instance. Resolves when the server has /// underlying hyper server instance. Resolves when the server has
@ -23,13 +37,17 @@ pub async fn serve<M>(
socket: TcpListener, socket: TcpListener,
server: Arc<Server<M>>, server: Arc<Server<M>>,
shutdown: CancellationToken, shutdown: CancellationToken,
) -> Result<(), tonic::transport::Error> ) -> Result<()>
where where
M: ConnectionManager + Send + Sync + Debug + 'static, M: ConnectionManager + Send + Sync + Debug + 'static,
{ {
let stream = TcpListenerStream::new(socket); let stream = TcpListenerStream::new(socket);
let (mut health_reporter, health_service) = tonic_health::server::health_reporter(); let (mut health_reporter, health_service) = tonic_health::server::health_reporter();
let reflection_service = tonic_reflection::server::Builder::configure()
.register_encoded_file_descriptor_set(generated_types::FILE_DESCRIPTOR_SET)
.build()
.context(ReflectionError)?;
let services = [ let services = [
generated_types::STORAGE_SERVICE, generated_types::STORAGE_SERVICE,
@ -45,6 +63,7 @@ where
tonic::transport::Server::builder() tonic::transport::Server::builder()
.add_service(health_service) .add_service(health_service)
.add_service(reflection_service)
.add_service(testing::make_server()) .add_service(testing::make_server())
.add_service(storage::make_server( .add_service(storage::make_server(
Arc::clone(&server), Arc::clone(&server),
@ -56,4 +75,5 @@ where
.add_service(operations::make_server(server)) .add_service(operations::make_server(server))
.serve_with_incoming_shutdown(stream, shutdown.cancelled()) .serve_with_incoming_shutdown(stream, shutdown.cancelled())
.await .await
.context(TransportError)
} }