fix: include git sha (again) in release build (#4193)
* fix: error if git-sha can not be found * refactor: move main to influxdb_iox * fix: fmtpull/24376/head
parent
532d227d11
commit
d37af1a7f5
|
@ -2519,7 +2519,6 @@ dependencies = [
|
|||
"futures",
|
||||
"generated_types",
|
||||
"hashbrown 0.12.0",
|
||||
"heappy",
|
||||
"http",
|
||||
"hyper",
|
||||
"ingester",
|
||||
|
@ -2546,8 +2545,6 @@ dependencies = [
|
|||
"tempfile",
|
||||
"test_helpers",
|
||||
"thiserror",
|
||||
"tikv-jemalloc-ctl",
|
||||
"tikv-jemalloc-sys",
|
||||
"time 0.1.0",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
|
|
|
@ -140,7 +140,7 @@ aws = ["object_store/aws"] # Optional AWS / S3 object store support
|
|||
# runtime overhead on all allocations (calls to malloc).
|
||||
# Cargo cannot currently implement mutually exclusive features so let's force every build
|
||||
# to pick either heappy or jemalloc_replacing_malloc feature at least until we figure out something better.
|
||||
jemalloc_replacing_malloc = ["tikv-jemalloc-sys", "tikv-jemalloc-ctl", "ioxd/jemalloc_replacing_malloc"]
|
||||
jemalloc_replacing_malloc = ["tikv-jemalloc-sys", "tikv-jemalloc-ctl"]
|
||||
|
||||
# Implicit feature selected when running under `clippy --all-features` to accept mutable exclusive features during
|
||||
# linting
|
||||
|
|
|
@ -1,14 +1,29 @@
|
|||
// Include the GIT_HASH, if any, in `GIT_HASH` environment variable at build
|
||||
// time
|
||||
// Include the GIT_HASH, in `GIT_HASH` environment variable at build
|
||||
// time, panic'ing if it can not be found
|
||||
//
|
||||
// https://stackoverflow.com/questions/43753491/include-git-commit-hash-as-string-into-rust-program
|
||||
use std::process::Command;
|
||||
fn main() {
|
||||
let output = Command::new("git").args(&["rev-parse", "HEAD"]).output();
|
||||
|
||||
if let Ok(output) = output {
|
||||
if let Ok(git_hash) = String::from_utf8(output.stdout) {
|
||||
println!("cargo:rustc-env=GIT_HASH={}", git_hash);
|
||||
}
|
||||
}
|
||||
fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
// Populate env!(GIT_HASH) with the current git commit
|
||||
println!("cargo:rustc-env=GIT_HASH={}", get_git_hash());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn get_git_hash() -> String {
|
||||
let out = match std::env::var("VERSION_HASH") {
|
||||
Ok(v) => v,
|
||||
Err(_) => {
|
||||
let output = Command::new("git")
|
||||
.args(&["describe", "--always", "--dirty", "--abbrev=64"])
|
||||
.output()
|
||||
.expect("failed to execute git rev-parse to read the current git hash");
|
||||
|
||||
String::from_utf8(output.stdout).expect("non-utf8 found in git hash")
|
||||
}
|
||||
};
|
||||
|
||||
assert!(!out.is_empty(), "attempting to embed empty git hash");
|
||||
out
|
||||
}
|
||||
|
|
|
@ -25,6 +25,8 @@ use query::exec::Executor;
|
|||
use thiserror::Error;
|
||||
use time::{SystemProvider, TimeProvider};
|
||||
|
||||
use super::main;
|
||||
|
||||
/// The default bind address for the Router HTTP API.
|
||||
pub const DEFAULT_ROUTER_HTTP_BIND_ADDR: &str = "127.0.0.1:8080";
|
||||
|
||||
|
@ -43,7 +45,7 @@ pub const DEFAULT_COMPACTOR_GRPC_BIND_ADDR: &str = "127.0.0.1:8084";
|
|||
#[derive(Debug, Error)]
|
||||
pub enum Error {
|
||||
#[error("Run: {0}")]
|
||||
Run(#[from] ioxd::Error),
|
||||
Run(#[from] main::Error),
|
||||
|
||||
#[error("Catalog DSN error: {0}")]
|
||||
CatalogDsn(#[from] clap_blocks::catalog_dsn::Error),
|
||||
|
@ -386,5 +388,5 @@ pub async fn command(config: Config) -> Result<()> {
|
|||
Service::create_grpc_only(querier, &querier_run_config),
|
||||
];
|
||||
|
||||
Ok(ioxd::main(common_state, services).await?)
|
||||
Ok(main::main(common_state, services).await?)
|
||||
}
|
||||
|
|
|
@ -13,10 +13,12 @@ use clap_blocks::{
|
|||
use ioxd::{self, server_type::compactor::create_compactor_server_type, Service};
|
||||
use ioxd_common::server_type::{CommonServerState, CommonServerStateError};
|
||||
|
||||
use super::main;
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum Error {
|
||||
#[error("Run: {0}")]
|
||||
Run(#[from] ioxd::Error),
|
||||
Run(#[from] main::Error),
|
||||
|
||||
#[error("Invalid config: {0}")]
|
||||
InvalidConfig(#[from] CommonServerStateError),
|
||||
|
@ -100,5 +102,5 @@ pub async fn command(config: Config) -> Result<(), Error> {
|
|||
info!("starting compactor");
|
||||
|
||||
let services = vec![Service::create(server_type, common_state.run_config())];
|
||||
Ok(ioxd::main(common_state, services).await?)
|
||||
Ok(main::main(common_state, services).await?)
|
||||
}
|
||||
|
|
|
@ -13,10 +13,12 @@ use ioxd_database::{
|
|||
|
||||
use thiserror::Error;
|
||||
|
||||
use super::main;
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum Error {
|
||||
#[error("Run: {0}")]
|
||||
Run(#[from] ioxd::Error),
|
||||
Run(#[from] main::Error),
|
||||
|
||||
#[error("Cannot setup server: {0}")]
|
||||
Setup(#[from] ioxd_database::setup::Error),
|
||||
|
@ -128,5 +130,5 @@ pub async fn command(config: Config) -> Result<()> {
|
|||
));
|
||||
|
||||
let services = vec![Service::create(server_type, common_state.run_config())];
|
||||
Ok(ioxd::main(common_state, services).await?)
|
||||
Ok(main::main(common_state, services).await?)
|
||||
}
|
||||
|
|
|
@ -12,10 +12,12 @@ use query::exec::Executor;
|
|||
use std::{convert::TryFrom, sync::Arc};
|
||||
use thiserror::Error;
|
||||
|
||||
use super::main;
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum Error {
|
||||
#[error("Run: {0}")]
|
||||
Run(#[from] ioxd::Error),
|
||||
Run(#[from] main::Error),
|
||||
|
||||
#[error("Invalid config: {0}")]
|
||||
InvalidConfig(#[from] CommonServerStateError),
|
||||
|
@ -98,5 +100,5 @@ pub async fn command(config: Config) -> Result<()> {
|
|||
info!("starting ingester");
|
||||
|
||||
let services = vec![Service::create(server_type, common_state.run_config())];
|
||||
Ok(ioxd::main(common_state, services).await?)
|
||||
Ok(main::main(common_state, services).await?)
|
||||
}
|
||||
|
|
|
@ -0,0 +1,173 @@
|
|||
use ioxd::Service;
|
||||
use ioxd_common::{grpc_listener, http_listener, serve, server_type::CommonServerState};
|
||||
use observability_deps::tracing::{error, info};
|
||||
use panic_logging::SendPanicsToTracing;
|
||||
use snafu::{ResultExt, Snafu};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
#[cfg(all(not(feature = "heappy"), feature = "jemalloc_replacing_malloc"))]
|
||||
mod jemalloc;
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
pub enum Error {
|
||||
#[snafu(display("{}", source))]
|
||||
Wrapper { source: ioxd_common::Error },
|
||||
|
||||
#[snafu(display("Error joining server task: {}", source))]
|
||||
Joining { source: tokio::task::JoinError },
|
||||
}
|
||||
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
|
||||
impl From<ioxd_common::Error> for Error {
|
||||
fn from(source: ioxd_common::Error) -> Self {
|
||||
Self::Wrapper { source }
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(all(not(feature = "heappy"), not(feature = "jemalloc_replacing_malloc")))]
|
||||
fn build_malloc_conf() -> String {
|
||||
"system".to_string()
|
||||
}
|
||||
|
||||
#[cfg(all(feature = "heappy", not(feature = "jemalloc_replacing_malloc")))]
|
||||
fn build_malloc_conf() -> String {
|
||||
"heappy".to_string()
|
||||
}
|
||||
|
||||
#[cfg(all(not(feature = "heappy"), feature = "jemalloc_replacing_malloc"))]
|
||||
fn build_malloc_conf() -> String {
|
||||
tikv_jemalloc_ctl::config::malloc_conf::mib()
|
||||
.unwrap()
|
||||
.read()
|
||||
.unwrap()
|
||||
.to_string()
|
||||
}
|
||||
|
||||
#[cfg(all(
|
||||
feature = "heappy",
|
||||
feature = "jemalloc_replacing_malloc",
|
||||
not(feature = "clippy")
|
||||
))]
|
||||
fn build_malloc_conf() -> String {
|
||||
compile_error!("must use exactly one memory allocator")
|
||||
}
|
||||
|
||||
#[cfg(feature = "clippy")]
|
||||
fn build_malloc_conf() -> String {
|
||||
"clippy".to_string()
|
||||
}
|
||||
|
||||
/// This is the entry point for the IOx server.
|
||||
///
|
||||
/// This entry point ensures that the given set of Services are
|
||||
/// started using best practice, e.g. that we print the GIT-hash and
|
||||
/// malloc-configs, that a panic handler is installed, etc.
|
||||
///
|
||||
/// Due to its invasive nature (install global panic handling,
|
||||
/// logging, etc) this function should not be used during unit tests.
|
||||
pub async fn main(common_state: CommonServerState, services: Vec<Service>) -> Result<()> {
|
||||
let git_hash = env!("GIT_HASH", "starting influxdb_iox server");
|
||||
let num_cpus = num_cpus::get();
|
||||
let build_malloc_conf = build_malloc_conf();
|
||||
info!(
|
||||
git_hash,
|
||||
num_cpus,
|
||||
%build_malloc_conf,
|
||||
"InfluxDB IOx server starting",
|
||||
);
|
||||
|
||||
for service in &services {
|
||||
if let Some(http_bind_address) = &service.http_bind_address {
|
||||
if (&service.grpc_bind_address == http_bind_address)
|
||||
&& (service.grpc_bind_address.port() != 0)
|
||||
{
|
||||
error!(
|
||||
grpc_bind_address=%service.grpc_bind_address,
|
||||
http_bind_address=%http_bind_address,
|
||||
"grpc and http bind addresses must differ",
|
||||
);
|
||||
std::process::exit(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Install custom panic handler and forget about it.
|
||||
//
|
||||
// This leaks the handler and prevents it from ever being dropped during the
|
||||
// lifetime of the program - this is actually a good thing, as it prevents
|
||||
// the panic handler from being removed while unwinding a panic (which in
|
||||
// turn, causes a panic - see #548)
|
||||
let f = SendPanicsToTracing::new();
|
||||
std::mem::forget(f);
|
||||
|
||||
// Register jemalloc metrics
|
||||
#[cfg(all(not(feature = "heappy"), feature = "jemalloc_replacing_malloc"))]
|
||||
for service in &services {
|
||||
service
|
||||
.server_type
|
||||
.metric_registry()
|
||||
.register_instrument("jemalloc_metrics", jemalloc::JemallocMetrics::new);
|
||||
}
|
||||
|
||||
// Construct a token to trigger clean shutdown
|
||||
let frontend_shutdown = CancellationToken::new();
|
||||
|
||||
let mut serving_futures = Vec::new();
|
||||
for service in services {
|
||||
let common_state = common_state.clone();
|
||||
// start them all in their own tasks so the servers run at the same time
|
||||
let frontend_shutdown = frontend_shutdown.clone();
|
||||
serving_futures.push(tokio::spawn(async move {
|
||||
let trace_exporter = common_state.trace_exporter();
|
||||
let Service {
|
||||
http_bind_address,
|
||||
grpc_bind_address,
|
||||
server_type,
|
||||
} = service;
|
||||
|
||||
info!(?grpc_bind_address, "Binding gRPC services");
|
||||
let grpc_listener = grpc_listener(grpc_bind_address.into()).await?;
|
||||
|
||||
let http_listener = match http_bind_address {
|
||||
Some(http_bind_address) => {
|
||||
info!(?http_bind_address, "Completed bind of gRPC, binding http");
|
||||
Some(http_listener(http_bind_address.into()).await?)
|
||||
}
|
||||
None => {
|
||||
info!("No http server specified");
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
let r = serve(
|
||||
common_state,
|
||||
frontend_shutdown,
|
||||
grpc_listener,
|
||||
http_listener,
|
||||
server_type,
|
||||
)
|
||||
.await;
|
||||
|
||||
info!(
|
||||
?grpc_bind_address,
|
||||
?http_bind_address,
|
||||
"done serving, draining futures"
|
||||
);
|
||||
if let Some(trace_exporter) = trace_exporter {
|
||||
if let Err(e) = trace_exporter.drain().await {
|
||||
error!(%e, "error draining trace exporter");
|
||||
}
|
||||
}
|
||||
r
|
||||
}));
|
||||
}
|
||||
|
||||
for f in serving_futures {
|
||||
// Use ?? to unwrap Result<Result<..>>
|
||||
// "I heard you like errors, so I put an error in your error...."
|
||||
f.await.context(JoiningSnafu)??;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
|
@ -6,6 +6,7 @@ mod all_in_one;
|
|||
mod compactor;
|
||||
mod database;
|
||||
mod ingester;
|
||||
mod main;
|
||||
mod querier;
|
||||
mod router;
|
||||
mod router2;
|
||||
|
|
|
@ -12,10 +12,12 @@ use ioxd::{self, Service};
|
|||
use ioxd_common::server_type::{CommonServerState, CommonServerStateError};
|
||||
use ioxd_querier::create_querier_server_type;
|
||||
|
||||
use super::main;
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum Error {
|
||||
#[error("Run: {0}")]
|
||||
Run(#[from] ioxd::Error),
|
||||
Run(#[from] main::Error),
|
||||
|
||||
#[error("Invalid config: {0}")]
|
||||
InvalidConfig(#[from] CommonServerStateError),
|
||||
|
@ -93,5 +95,5 @@ pub async fn command(config: Config) -> Result<(), Error> {
|
|||
info!("starting querier");
|
||||
|
||||
let services = vec![Service::create(server_type, common_state.run_config())];
|
||||
Ok(ioxd::main(common_state, services).await?)
|
||||
Ok(main::main(common_state, services).await?)
|
||||
}
|
||||
|
|
|
@ -15,10 +15,12 @@ use router::{resolver::RemoteTemplate, server::RouterServer};
|
|||
use thiserror::Error;
|
||||
use time::SystemProvider;
|
||||
|
||||
use super::main;
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum Error {
|
||||
#[error("Run: {0}")]
|
||||
Run(#[from] ioxd::Error),
|
||||
Run(#[from] main::Error),
|
||||
|
||||
#[error("Cannot setup server: {0}")]
|
||||
Setup(#[from] ioxd_database::setup::Error),
|
||||
|
@ -157,5 +159,5 @@ pub async fn command(config: Config) -> Result<()> {
|
|||
));
|
||||
|
||||
let services = vec![Service::create(server_type, common_state.run_config())];
|
||||
Ok(ioxd::main(common_state, services).await?)
|
||||
Ok(main::main(common_state, services).await?)
|
||||
}
|
||||
|
|
|
@ -12,10 +12,12 @@ use ioxd_router2::create_router2_server_type;
|
|||
use observability_deps::tracing::*;
|
||||
use thiserror::Error;
|
||||
|
||||
use super::main;
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum Error {
|
||||
#[error("Run: {0}")]
|
||||
Run(#[from] ioxd::Error),
|
||||
Run(#[from] main::Error),
|
||||
|
||||
#[error("Invalid config: {0}")]
|
||||
InvalidConfig(#[from] CommonServerStateError),
|
||||
|
@ -83,5 +85,5 @@ pub async fn command(config: Config) -> Result<()> {
|
|||
|
||||
info!("starting router2");
|
||||
let services = vec![Service::create(server_type, common_state.run_config())];
|
||||
Ok(ioxd::main(common_state, services).await?)
|
||||
Ok(main::main(common_state, services).await?)
|
||||
}
|
||||
|
|
|
@ -12,10 +12,12 @@ use ioxd_common::server_type::{CommonServerState, CommonServerStateError};
|
|||
use metric::Registry;
|
||||
use thiserror::Error;
|
||||
|
||||
use super::main;
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum Error {
|
||||
#[error("Run: {0}")]
|
||||
Run(#[from] ioxd::Error),
|
||||
Run(#[from] main::Error),
|
||||
|
||||
#[error("Invalid config: {0}")]
|
||||
InvalidConfig(#[from] CommonServerStateError),
|
||||
|
@ -62,5 +64,5 @@ pub async fn command(config: Config) -> Result<()> {
|
|||
));
|
||||
|
||||
let services = vec![Service::create(server_type, common_state.run_config())];
|
||||
Ok(ioxd::main(common_state, services).await?)
|
||||
Ok(main::main(common_state, services).await?)
|
||||
}
|
||||
|
|
|
@ -46,7 +46,10 @@ static VERSION_STRING: Lazy<String> = Lazy::new(|| {
|
|||
format!(
|
||||
"{}, revision {}",
|
||||
option_env!("CARGO_PKG_VERSION").unwrap_or("UNKNOWN"),
|
||||
option_env!("GIT_HASH").unwrap_or("UNKNOWN")
|
||||
env!(
|
||||
"GIT_HASH",
|
||||
"Can not find find GIT HASH in build environment"
|
||||
)
|
||||
)
|
||||
});
|
||||
|
||||
|
|
|
@ -47,16 +47,12 @@ num_cpus = "1.13.0"
|
|||
pprof = { version = "0.7", default-features = false, features = ["flamegraph", "prost-codec"], optional = true }
|
||||
snafu = "0.7"
|
||||
thiserror = "1.0.30"
|
||||
tikv-jemalloc-ctl = { version = "0.4.0", optional = true }
|
||||
tokio = { version = "1.17", features = ["macros", "net", "parking_lot", "rt-multi-thread", "signal", "sync", "time"] }
|
||||
tokio-stream = { version = "0.1", features = ["net"] }
|
||||
tokio-util = { version = "0.7.1" }
|
||||
tonic = "0.6"
|
||||
tonic-health = "0.5.0"
|
||||
tonic-reflection = "0.3.0"
|
||||
# jemalloc-sys with unprefixed_malloc_on_supported_platforms feature and heappy are mutually exclusive
|
||||
tikv-jemalloc-sys = { version = "0.4.0", optional = true, features = ["unprefixed_malloc_on_supported_platforms"] }
|
||||
heappy = { git = "https://github.com/mkmik/heappy", rev = "1770cd0cde556d121e7f017538ddda0e1778126a", features = ["enable_heap_profiler", "jemalloc_shim", "measure_free"], optional = true }
|
||||
workspace-hack = { path = "../workspace-hack"}
|
||||
|
||||
[dev-dependencies]
|
||||
|
@ -68,21 +64,3 @@ iox_tests = { path = "../iox_tests" }
|
|||
# Crates.io dependencies, in alphabetical order
|
||||
assert_cmd = "2.0.2"
|
||||
tempfile = "3.1.0"
|
||||
|
||||
[features]
|
||||
default = ["jemalloc_replacing_malloc"]
|
||||
|
||||
azure = ["object_store/azure"] # Optional Azure Object store support
|
||||
gcp = ["object_store/gcp"] # Optional GCP object store support
|
||||
aws = ["object_store/aws"] # Optional AWS / S3 object store support
|
||||
# pprof is an optional feature for pprof support
|
||||
|
||||
# heappy is an optional feature; Not on by default as it
|
||||
# runtime overhead on all allocations (calls to malloc).
|
||||
# Cargo cannot currently implement mutually exclusive features so let's force every build
|
||||
# to pick either heappy or jemalloc_replacing_malloc feature at least until we figure out something better.
|
||||
jemalloc_replacing_malloc = ["tikv-jemalloc-sys", "tikv-jemalloc-ctl"]
|
||||
|
||||
# Implicit feature selected when running under `clippy --all-features` to accept mutable exclusive features during
|
||||
# linting
|
||||
clippy = []
|
||||
|
|
183
ioxd/src/lib.rs
183
ioxd/src/lib.rs
|
@ -1,74 +1,15 @@
|
|||
use clap_blocks::run_config::RunConfig;
|
||||
use ioxd_common::{
|
||||
grpc_listener, http_listener, serve,
|
||||
server_type::{CommonServerState, ServerType},
|
||||
};
|
||||
use observability_deps::tracing::{error, info};
|
||||
use panic_logging::SendPanicsToTracing;
|
||||
use snafu::{ResultExt, Snafu};
|
||||
use std::sync::Arc;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
#[cfg(all(not(feature = "heappy"), feature = "jemalloc_replacing_malloc"))]
|
||||
mod jemalloc;
|
||||
use clap_blocks::{run_config::RunConfig, socket_addr::SocketAddr};
|
||||
use ioxd_common::server_type::ServerType;
|
||||
|
||||
pub mod server_type;
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
pub enum Error {
|
||||
#[snafu(display("{}", source))]
|
||||
Wrapper { source: ioxd_common::Error },
|
||||
|
||||
#[snafu(display("Error joining server task: {}", source))]
|
||||
Joining { source: tokio::task::JoinError },
|
||||
}
|
||||
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
|
||||
impl From<ioxd_common::Error> for Error {
|
||||
fn from(source: ioxd_common::Error) -> Self {
|
||||
Self::Wrapper { source }
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(all(not(feature = "heappy"), not(feature = "jemalloc_replacing_malloc")))]
|
||||
fn build_malloc_conf() -> String {
|
||||
"system".to_string()
|
||||
}
|
||||
|
||||
#[cfg(all(feature = "heappy", not(feature = "jemalloc_replacing_malloc")))]
|
||||
fn build_malloc_conf() -> String {
|
||||
"heappy".to_string()
|
||||
}
|
||||
|
||||
#[cfg(all(not(feature = "heappy"), feature = "jemalloc_replacing_malloc"))]
|
||||
fn build_malloc_conf() -> String {
|
||||
tikv_jemalloc_ctl::config::malloc_conf::mib()
|
||||
.unwrap()
|
||||
.read()
|
||||
.unwrap()
|
||||
.to_string()
|
||||
}
|
||||
|
||||
#[cfg(all(
|
||||
feature = "heappy",
|
||||
feature = "jemalloc_replacing_malloc",
|
||||
not(feature = "clippy")
|
||||
))]
|
||||
fn build_malloc_conf() -> String {
|
||||
compile_error!("must use exactly one memory allocator")
|
||||
}
|
||||
|
||||
#[cfg(feature = "clippy")]
|
||||
fn build_malloc_conf() -> String {
|
||||
"clippy".to_string()
|
||||
}
|
||||
|
||||
/// A service that will start on the specified addresses
|
||||
pub struct Service {
|
||||
http_bind_address: Option<clap_blocks::socket_addr::SocketAddr>,
|
||||
grpc_bind_address: clap_blocks::socket_addr::SocketAddr,
|
||||
server_type: Arc<dyn ServerType>,
|
||||
pub http_bind_address: Option<SocketAddr>,
|
||||
pub grpc_bind_address: SocketAddr,
|
||||
pub server_type: Arc<dyn ServerType>,
|
||||
}
|
||||
|
||||
impl Service {
|
||||
|
@ -88,117 +29,3 @@ impl Service {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// This is the entry point for the IOx server.
|
||||
///
|
||||
/// This entry point ensures that the given set of Services are
|
||||
/// started using best practice, e.g. that we print the GIT-hash and
|
||||
/// malloc-configs, that a panic handler is installed, etc.
|
||||
///
|
||||
/// Due to its invasive nature (install global panic handling,
|
||||
/// logging, etc) this function should not be used during unit tests.
|
||||
pub async fn main(common_state: CommonServerState, services: Vec<Service>) -> Result<()> {
|
||||
let git_hash = option_env!("GIT_HASH").unwrap_or("UNKNOWN");
|
||||
let num_cpus = num_cpus::get();
|
||||
let build_malloc_conf = build_malloc_conf();
|
||||
info!(
|
||||
git_hash,
|
||||
num_cpus,
|
||||
%build_malloc_conf,
|
||||
"InfluxDB IOx server starting",
|
||||
);
|
||||
|
||||
for service in &services {
|
||||
if let Some(http_bind_address) = &service.http_bind_address {
|
||||
if (&service.grpc_bind_address == http_bind_address)
|
||||
&& (service.grpc_bind_address.port() != 0)
|
||||
{
|
||||
error!(
|
||||
grpc_bind_address=%service.grpc_bind_address,
|
||||
http_bind_address=%http_bind_address,
|
||||
"grpc and http bind addresses must differ",
|
||||
);
|
||||
std::process::exit(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Install custom panic handler and forget about it.
|
||||
//
|
||||
// This leaks the handler and prevents it from ever being dropped during the
|
||||
// lifetime of the program - this is actually a good thing, as it prevents
|
||||
// the panic handler from being removed while unwinding a panic (which in
|
||||
// turn, causes a panic - see #548)
|
||||
let f = SendPanicsToTracing::new();
|
||||
std::mem::forget(f);
|
||||
|
||||
// Register jemalloc metrics
|
||||
#[cfg(all(not(feature = "heappy"), feature = "jemalloc_replacing_malloc"))]
|
||||
for service in &services {
|
||||
service
|
||||
.server_type
|
||||
.metric_registry()
|
||||
.register_instrument("jemalloc_metrics", jemalloc::JemallocMetrics::new);
|
||||
}
|
||||
|
||||
// Construct a token to trigger clean shutdown
|
||||
let frontend_shutdown = CancellationToken::new();
|
||||
|
||||
let mut serving_futures = Vec::new();
|
||||
for service in services {
|
||||
let common_state = common_state.clone();
|
||||
// start them all in their own tasks so the servers run at the same time
|
||||
let frontend_shutdown = frontend_shutdown.clone();
|
||||
serving_futures.push(tokio::spawn(async move {
|
||||
let trace_exporter = common_state.trace_exporter();
|
||||
let Service {
|
||||
http_bind_address,
|
||||
grpc_bind_address,
|
||||
server_type,
|
||||
} = service;
|
||||
|
||||
info!(?grpc_bind_address, "Binding gRPC services");
|
||||
let grpc_listener = grpc_listener(grpc_bind_address.into()).await?;
|
||||
|
||||
let http_listener = match http_bind_address {
|
||||
Some(http_bind_address) => {
|
||||
info!(?http_bind_address, "Completed bind of gRPC, binding http");
|
||||
Some(http_listener(http_bind_address.into()).await?)
|
||||
}
|
||||
None => {
|
||||
info!("No http server specified");
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
let r = serve(
|
||||
common_state,
|
||||
frontend_shutdown,
|
||||
grpc_listener,
|
||||
http_listener,
|
||||
server_type,
|
||||
)
|
||||
.await;
|
||||
|
||||
info!(
|
||||
?grpc_bind_address,
|
||||
?http_bind_address,
|
||||
"done serving, draining futures"
|
||||
);
|
||||
if let Some(trace_exporter) = trace_exporter {
|
||||
if let Err(e) = trace_exporter.drain().await {
|
||||
error!(%e, "error draining trace exporter");
|
||||
}
|
||||
}
|
||||
r
|
||||
}));
|
||||
}
|
||||
|
||||
for f in serving_futures {
|
||||
// Use ?? to unwrap Result<Result<..>>
|
||||
// "I heard you like errors, so I put an error in your error...."
|
||||
f.await.context(JoiningSnafu)??;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue