feat: Log what kind of server in startup/shutdown logs

pull/24376/head
Carol (Nichols || Goulding) 2022-04-27 12:01:52 -04:00
parent 84683056db
commit df1afa3481
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
6 changed files with 46 additions and 21 deletions

View File

@ -132,16 +132,20 @@ pub async fn main(
server_type,
} = service;
info!(?grpc_bind_address, "Binding gRPC services");
info!(?grpc_bind_address, ?server_type, "Binding gRPC services");
let grpc_listener = grpc_listener(grpc_bind_address.into()).await?;
let http_listener = match http_bind_address {
Some(http_bind_address) => {
info!(?http_bind_address, "Completed bind of gRPC, binding http");
info!(
?http_bind_address,
?server_type,
"Completed bind of gRPC, binding http"
);
Some(http_listener(http_bind_address.into()).await?)
}
None => {
info!("No http server specified");
info!(?server_type, "No http server specified");
None
}
};
@ -151,13 +155,14 @@ pub async fn main(
frontend_shutdown,
grpc_listener,
http_listener,
server_type,
Arc::clone(&server_type),
)
.await;
info!(
?grpc_bind_address,
?http_bind_address,
?server_type,
"done serving, draining futures"
);
if let Some(trace_exporter) = trace_exporter {

View File

@ -119,12 +119,13 @@ pub async fn serve(
frontend_shutdown.clone(),
)
.fuse();
info!("gRPC server listening");
info!(?server_type, "gRPC server listening");
let captured_server_type = Arc::clone(&server_type);
let captured_shutdown = frontend_shutdown.clone();
let http_server = async move {
if let Some(http_listener) = http_listener {
info!(server_type=?captured_server_type, "HTTP server listening");
http::serve(
http_listener,
captured_server_type,
@ -139,10 +140,9 @@ pub async fn serve(
Ok(())
}
.fuse();
info!("HTTP server listening");
// Purposefully use log not tokio-tracing to ensure correctly hooked up
log::info!("InfluxDB IOx server ready");
log::info!("InfluxDB IOx {:?} server ready", server_type);
// Get IOx background worker join handle
let server_handle = Arc::clone(&server_type).join().fuse();
@ -192,30 +192,30 @@ pub async fn serve(
// registry, don't exit before HTTP and gRPC requests dependent on them
while !grpc_server.is_terminated() && !http_server.is_terminated() {
futures::select! {
_ = signal => info!("Shutdown requested"),
_ = signal => info!(?server_type, "Shutdown requested"),
_ = server_handle => {
error!("server worker shutdown prematurely");
error!(?server_type, "server worker shutdown prematurely");
res = res.and(Err(Error::LostServer));
},
result = grpc_server => match result {
Ok(_) if frontend_shutdown.is_cancelled() => info!("gRPC server shutdown"),
Ok(_) if frontend_shutdown.is_cancelled() => info!(?server_type, "gRPC server shutdown"),
Ok(_) => {
error!("Early gRPC server exit");
error!(?server_type, "Early gRPC server exit");
res = res.and(Err(Error::LostRpc));
}
Err(error) => {
error!(%error, "gRPC server error");
error!(%error, ?server_type, "gRPC server error");
res = res.and(Err(Error::ServingRpc{source: error}));
}
},
result = http_server => match result {
Ok(_) if frontend_shutdown.is_cancelled() => info!("HTTP server shutdown"),
Ok(_) if frontend_shutdown.is_cancelled() => info!(?server_type, "HTTP server shutdown"),
Ok(_) => {
error!("Early HTTP server exit");
error!(?server_type, "Early HTTP server exit");
res = res.and(Err(Error::LostHttp));
}
Err(error) => {
error!(%error, "HTTP server error");
error!(%error, ?server_type, "HTTP server error");
res = res.and(Err(Error::ServingHttp{source: error}));
}
},
@ -223,13 +223,13 @@ pub async fn serve(
frontend_shutdown.cancel()
}
info!("frontend shutdown completed");
info!(?server_type, "frontend shutdown completed");
server_type.shutdown();
if !server_handle.is_terminated() {
server_handle.await;
}
info!("backend shutdown completed");
info!(?server_type, "backend shutdown completed");
res
}

View File

@ -42,12 +42,17 @@ pub enum Error {
pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug)]
pub struct CompactorServerType<C: CompactorHandler> {
server: CompactorServer<C>,
trace_collector: Option<Arc<dyn TraceCollector>>,
}
impl<C: CompactorHandler> std::fmt::Debug for CompactorServerType<C> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Compactor")
}
}
impl<C: CompactorHandler> CompactorServerType<C> {
pub fn new(server: CompactorServer<C>, common_state: &CommonServerState) -> Self {
Self {

View File

@ -51,12 +51,17 @@ pub enum Error {
pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug)]
pub struct IngesterServerType<I: IngestHandler> {
server: IngesterServer<I>,
trace_collector: Option<Arc<dyn TraceCollector>>,
}
impl<I: IngestHandler> std::fmt::Debug for IngesterServerType<I> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Ingester")
}
}
impl<I: IngestHandler> IngesterServerType<I> {
pub fn new(server: IngesterServer<I>, common_state: &CommonServerState) -> Self {
Self {

View File

@ -27,13 +27,18 @@ use ioxd_common::{
mod rpc;
#[derive(Debug)]
pub struct QuerierServerType<C: QuerierHandler> {
database: Arc<QuerierDatabase>,
server: QuerierServer<C>,
trace_collector: Option<Arc<dyn TraceCollector>>,
}
impl<C: QuerierHandler> std::fmt::Debug for QuerierServerType<C> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Querier")
}
}
impl<C: QuerierHandler> QuerierServerType<C> {
pub fn new(
server: QuerierServer<C>,

View File

@ -53,7 +53,6 @@ pub enum Error {
pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug)]
pub struct RouterServerType<D> {
server: RouterServer<D>,
shutdown: CancellationToken,
@ -70,6 +69,12 @@ impl<D> RouterServerType<D> {
}
}
impl<D> std::fmt::Debug for RouterServerType<D> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Router2")
}
}
#[async_trait]
impl<D> ServerType for RouterServerType<D>
where