refactor: Move gRPC make_server up a module

pull/24376/head
Carol (Nichols || Goulding) 2021-02-01 11:43:00 -05:00
parent 8975b1cbf5
commit 9ff2ee99d8
3 changed files with 55 additions and 40 deletions

View File

@ -66,7 +66,7 @@ pub enum Error {
ServingHttp { source: hyper::Error },
#[snafu(display("Error serving RPC: {}", source))]
ServingRPC { source: self::rpc::service::Error },
ServingRPC { source: self::rpc::Error },
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -134,7 +134,7 @@ pub async fn main(logging_level: LoggingLevel, config: Option<Config>) -> Result
.await
.context(StartListeningGrpc { grpc_bind_addr })?;
let grpc_server = self::rpc::service::make_server(socket, app_server.clone());
let grpc_server = self::rpc::make_server(socket, app_server.clone());
info!(bind_address=?grpc_bind_addr, "gRPC server listening");

View File

@ -13,3 +13,44 @@ pub mod expr;
pub mod id;
pub mod input;
pub mod service;
use arrow_deps::arrow_flight::flight_service_server::FlightServiceServer;
use data_types::error::ErrorLogger;
use generated_types::{i_ox_testing_server::IOxTestingServer, storage_server::StorageServer};
use query::DatabaseStore;
use snafu::{ResultExt, Snafu};
use std::sync::Arc;
use tokio::net::TcpListener;
use tokio_stream::wrappers::TcpListenerStream;
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("gRPC server error: {}", source))]
ServerError { source: tonic::transport::Error },
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Instantiate a server listening on the specified address
/// implementing the IOx, Storage, and Flight gRPC interfaces, the
/// underlying hyper server instance. Resolves when the server has
/// shutdown.
pub async fn make_server<T>(socket: TcpListener, storage: Arc<T>) -> Result<()>
where
T: DatabaseStore + 'static,
{
let stream = TcpListenerStream::new(socket);
tonic::transport::Server::builder()
.add_service(IOxTestingServer::new(service::GrpcService::new(
storage.clone(),
)))
.add_service(StorageServer::new(service::GrpcService::new(
storage.clone(),
)))
.add_service(FlightServiceServer::new(service::GrpcService::new(storage)))
.serve_with_incoming(stream)
.await
.context(ServerError {})
.log_if_error("Running Tonic Server")
}

View File

@ -13,10 +13,9 @@ use super::{
use arrow_deps::{
arrow,
arrow_flight::{
self,
flight_service_server::{FlightService, FlightServiceServer},
Action, ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightInfo,
HandshakeRequest, HandshakeResponse, PutResult, SchemaResult, Ticket,
self, flight_service_server::FlightService, Action, ActionType, Criteria, Empty,
FlightData, FlightDescriptor, FlightInfo, HandshakeRequest, HandshakeResponse, PutResult,
SchemaResult, Ticket,
},
datafusion::physical_plan::collect,
};
@ -27,13 +26,12 @@ use data_types::{
};
use futures::Stream;
use generated_types::{
i_ox_testing_server::{IOxTesting, IOxTestingServer},
storage_server::{Storage, StorageServer},
CapabilitiesResponse, Capability, Int64ValuesResponse, MeasurementFieldsRequest,
MeasurementFieldsResponse, MeasurementNamesRequest, MeasurementTagKeysRequest,
MeasurementTagValuesRequest, Predicate, ReadFilterRequest, ReadGroupRequest, ReadResponse,
ReadSeriesCardinalityRequest, ReadWindowAggregateRequest, StringValuesResponse, TagKeysRequest,
TagValuesRequest, TestErrorRequest, TestErrorResponse, TimestampRange,
i_ox_testing_server::IOxTesting, storage_server::Storage, CapabilitiesResponse, Capability,
Int64ValuesResponse, MeasurementFieldsRequest, MeasurementFieldsResponse,
MeasurementNamesRequest, MeasurementTagKeysRequest, MeasurementTagValuesRequest, Predicate,
ReadFilterRequest, ReadGroupRequest, ReadResponse, ReadSeriesCardinalityRequest,
ReadWindowAggregateRequest, StringValuesResponse, TagKeysRequest, TagValuesRequest,
TestErrorRequest, TestErrorResponse, TimestampRange,
};
use query::{
exec::fieldlist::FieldList,
@ -46,16 +44,13 @@ use query::{
use serde::Deserialize;
use snafu::{OptionExt, ResultExt, Snafu};
use std::{collections::HashMap, pin::Pin, sync::Arc};
use tokio::{net::TcpListener, sync::mpsc};
use tokio_stream::wrappers::{ReceiverStream, TcpListenerStream};
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tonic::{Request, Response, Status, Streaming};
use tracing::{error, info, warn};
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("gRPC server error: {}", source))]
ServerError { source: tonic::transport::Error },
#[snafu(display("Database not found: {}", db_name))]
DatabaseNotFound { db_name: String },
@ -215,7 +210,6 @@ impl Error {
/// status
fn to_status(&self) -> tonic::Status {
match &self {
Self::ServerError { .. } => Status::internal(self.to_string()),
Self::DatabaseNotFound { .. } => Status::not_found(self.to_string()),
Self::ListingTables { .. } => Status::internal(self.to_string()),
Self::ListingColumns { .. } => {
@ -1316,26 +1310,6 @@ where
Ok(field_list)
}
/// Instantiate a server listening on the specified address
/// implementing the IOx and Storage gRPC interfaces, the
/// underlying hyper server instance. Resolves when the server has
/// shutdown.
pub async fn make_server<T>(socket: TcpListener, storage: Arc<T>) -> Result<()>
where
T: DatabaseStore + 'static,
{
let stream = TcpListenerStream::new(socket);
tonic::transport::Server::builder()
.add_service(IOxTestingServer::new(GrpcService::new(storage.clone())))
.add_service(StorageServer::new(GrpcService::new(storage.clone())))
.add_service(FlightServiceServer::new(GrpcService::new(storage)))
.serve_with_incoming(stream)
.await
.context(ServerError {})
.log_if_error("Running Tonic Server")
}
#[cfg(test)]
mod tests {
use super::super::id::ID;
@ -2789,7 +2763,7 @@ mod tests {
println!("Starting InfluxDB IOx rpc test server on {:?}", bind_addr);
let server = make_server(socket, test_storage.clone());
let server = super::super::make_server(socket, test_storage.clone());
tokio::task::spawn(server);
let iox_client = connect_to_server::<IOxTestingClient>(bind_addr)