feat: all in 1 IOx NG mode (#3965)

* feat: Add all_in_one mode

* fix: doc

* docs: fix truncated docs

* refactor: correctly identify PG connections

* refactor: resolve failed merge

Co-authored-by: Dom Dwyer <dom@itsallbroken.com>
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Andrew Lamb 2022-03-15 12:28:37 -04:00 committed by GitHub
parent c0e0bcbb1a
commit 9b3f946c10
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 572 additions and 65 deletions

View File

@ -120,6 +120,16 @@ impl WriteBufferConfig {
pub fn topic(&self) -> &str {
self.topic.as_ref()
}
/// Get the write buffer config's auto create topics.
pub fn auto_create_topics(&self) -> Option<NonZeroU32> {
self.auto_create_topics
}
/// Set the write buffer config's auto create topics.
pub fn set_auto_create_topics(&mut self, auto_create_topics: Option<NonZeroU32>) {
self.auto_create_topics = auto_create_topics;
}
}
#[cfg(test)]

View File

@ -0,0 +1,368 @@
//! Implementation of command line option for running all in one mode
use std::{num::NonZeroU32, sync::Arc};
use clap_blocks::{
catalog_dsn::CatalogDsnConfig,
ingester::IngesterConfig,
run_config::{RunConfig, DEFAULT_API_BIND_ADDR, DEFAULT_GRPC_BIND_ADDR},
socket_addr::SocketAddr,
write_buffer::WriteBufferConfig,
};
use influxdb_ioxd::{
self,
server_type::{
common_state::{CommonServerState, CommonServerStateError},
compactor::create_compactor_server_type,
ingester::create_ingester_server_type,
querier::create_querier_server_type,
router2::create_router2_server_type,
},
Service,
};
use object_store::ObjectStore;
use observability_deps::tracing::*;
use query::exec::Executor;
use thiserror::Error;
use time::{SystemProvider, TimeProvider};
/// The default bind address for the Router HTTP API.
pub const DEFAULT_ROUTER_HTTP_BIND_ADDR: &str = "127.0.0.1:8080";
/// The default bind address for the Router gRPC.
pub const DEFAULT_ROUTER_GRPC_BIND_ADDR: &str = "127.0.0.1:8081";
/// The default bind address for the Querier gRPC (chosen to match default gRPC addr)
pub const DEFAULT_QUERIER_GRPC_BIND_ADDR: &str = "127.0.0.1:8082";
/// The default bind address for the Ingester gRPC (chosen to match default gRPC addr)
pub const DEFAULT_INGESTER_GRPC_BIND_ADDR: &str = "127.0.0.1:8083";
/// The default bind address for the Compactor gRPC (chosen to match default gRPC addr)
pub const DEFAULT_COMPACTOR_GRPC_BIND_ADDR: &str = "127.0.0.1:8084";
#[derive(Debug, Error)]
pub enum Error {
#[error("Run: {0}")]
Run(#[from] influxdb_ioxd::Error),
#[error("Catalog DSN error: {0}")]
CatalogDsn(#[from] clap_blocks::catalog_dsn::Error),
#[error("Catalog error: {0}")]
Catalog(#[from] iox_catalog::interface::Error),
#[error("Cannot parse object store config: {0}")]
ObjectStoreParsing(#[from] clap_blocks::object_store::ParseError),
#[error("Router2 error: {0}")]
Router2(#[from] influxdb_ioxd::server_type::router2::Error),
#[error("Ingester error: {0}")]
Ingester(#[from] influxdb_ioxd::server_type::ingester::Error),
#[error("Invalid config: {0}")]
InvalidConfig(#[from] CommonServerStateError),
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// The intention is to keep the number of options on this Config
/// object as small as possible. For more complex configurations and
/// deployments the individual services (e.g. Router and Compactor)
/// should be instantiated and configured individually.
///
/// This creates the following four services, configured to talk to a
/// common catalog, objectstore and write buffer.
///
/// ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
/// │ Router │ │ Ingester │ │ Querier │ │ Compactor │
/// └─────────────┘ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘
/// │ │ │ │
/// │ │ │ │
/// │ │ │ │
/// └────┬───────────┴─────┬───────────┴─────────┬───────┘
/// │ │ │
/// │ │ │
/// .──▼──. .──▼──. .──▼──.
/// ( ) ( ) ( )
/// │`─────'│ │`─────'│ │`─────'│
/// │ │ │ │ │ │
/// │.─────.│ │.─────.│ │.─────.│
/// ( ) ( ) ( )
/// `─────' `─────' `─────'
///
/// Existing Object Store kafka / redpanda
/// Postgres (file, mem, or Object Store
/// (started by pre-existing) (file, mem, or
/// user) configured with pre-existing)
/// --object-store configured with
///
/// Ideally all the gRPC services would listen on the same port, but
/// due to challenges with tonic this effort was postponed. See
/// <https://github.com/influxdata/influxdb_iox/issues/3926#issuecomment-1063231779>
/// for more details
///
/// Currently, the starts services on 5 ports, designed so that the
/// ports used to interact with the old architecture are the same as
/// the new architecture (8082 write endpoint and query on 8082).
///
/// Router;
/// 8080 http (overrides INFLUXDB_IOX_BIND_ADDR)
/// 8081 grpc (overrides INFLUXDB_IOX_GRPC_BIND_ADDR)
///
/// Querier:
/// 8082 grpc
///
/// Ingester
/// 8083 grpc
///
/// Compactor
/// 8084 grpc
///
/// The reason it would be nicer to run all 4 sets of gRPC services
/// (router, compactor, querier, ingester) on the same ports is:
///
/// 1. It reduces the number of listening ports required
/// 2. It probably enforces good hygiene around API design
///
/// Some downsides are that different services would have to implement
/// distinct APIs (aka there can't be multiple 'write' apis) and some
/// general-purpose services (like the google job API or the health
/// endpoint) that are provided by multiple server modes need to be
/// specially handled.
#[derive(Debug, clap::Parser)]
#[clap(
name = "all-in-one",
about = "Runs in IOx All in One mode, containing router, ingester, compactor and querier."
)]
pub struct Config {
#[clap(flatten)]
pub(crate) run_config: RunConfig,
#[clap(flatten)]
pub(crate) catalog_dsn: CatalogDsnConfig,
#[clap(flatten)]
pub(crate) write_buffer_config: WriteBufferConfig,
#[clap(flatten)]
pub(crate) ingester_config: IngesterConfig,
/// The address on which IOx will serve Router HTTP API requests
#[clap(
long = "--router-http-bind",
env = "INFLUXDB_IOX_ROUTER_HTTP_BIND_ADDR",
default_value = DEFAULT_ROUTER_HTTP_BIND_ADDR,
)]
pub router_http_bind_address: SocketAddr,
/// The address on which IOx will serve Router gRPC API requests
#[clap(
long = "--router-grpc-bind",
env = "INFLUXDB_IOX_ROUTER_GRPC_BIND_ADDR",
default_value = DEFAULT_ROUTER_GRPC_BIND_ADDR,
)]
pub router_grpc_bind_address: SocketAddr,
/// The address on which IOx will serve Querier gRPC API requests
#[clap(
long = "--querier-grpc-bind",
env = "INFLUXDB_IOX_QUERIER_GRPC_BIND_ADDR",
default_value = DEFAULT_QUERIER_GRPC_BIND_ADDR,
)]
pub querier_grpc_bind_address: SocketAddr,
/// The address on which IOx will serve Router Ingester API requests
#[clap(
long = "--ingester-grpc-bind",
env = "INFLUXDB_IOX_INGESTER_GRPC_BIND_ADDR",
default_value = DEFAULT_INGESTER_GRPC_BIND_ADDR,
)]
pub ingester_grpc_bind_address: SocketAddr,
/// The address on which IOx will serve Router Compactor API requests
#[clap(
long = "--compactor-grpc-bind",
env = "INFLUXDB_IOX_COMPACTOR_GRPC_BIND_ADDR",
default_value = DEFAULT_COMPACTOR_GRPC_BIND_ADDR,
)]
pub compactor_grpc_bind_address: SocketAddr,
}
impl Config {
/// Get a specialized run config to use for each service
fn specialize(self) -> SpecializedConfig {
if self.run_config.http_bind_address != DEFAULT_API_BIND_ADDR.parse().unwrap() {
eprintln!("Warning: --http-bind-addr ignored in all in one mode");
}
if self.run_config.grpc_bind_address != DEFAULT_GRPC_BIND_ADDR.parse().unwrap() {
eprintln!("Warning: --grpc-bind-addr ignored in all in one mode");
}
let router_run_config = self
.run_config
.clone()
.with_http_bind_address(self.router_http_bind_address)
.with_grpc_bind_address(self.router_grpc_bind_address);
let querier_run_config = self
.run_config
.clone()
.with_grpc_bind_address(self.querier_grpc_bind_address);
let ingester_run_config = self
.run_config
.clone()
.with_grpc_bind_address(self.ingester_grpc_bind_address);
let compactor_run_config = self
.run_config
.clone()
.with_grpc_bind_address(self.compactor_grpc_bind_address);
SpecializedConfig {
router_run_config,
querier_run_config,
ingester_run_config,
compactor_run_config,
catalog_dsn: self.catalog_dsn,
write_buffer_config: self.write_buffer_config,
ingester_config: self.ingester_config,
}
}
}
/// Different run configs for the different services (needed as they
/// listen on different ports)
struct SpecializedConfig {
router_run_config: RunConfig,
querier_run_config: RunConfig,
ingester_run_config: RunConfig,
compactor_run_config: RunConfig,
catalog_dsn: CatalogDsnConfig,
write_buffer_config: WriteBufferConfig,
ingester_config: IngesterConfig,
}
pub async fn command(config: Config) -> Result<()> {
let SpecializedConfig {
router_run_config,
querier_run_config,
ingester_run_config,
compactor_run_config,
catalog_dsn,
mut write_buffer_config,
ingester_config,
} = config.specialize();
// Ensure at least one topic is automatically created in all in one mode
write_buffer_config.set_auto_create_topics(Some(
write_buffer_config.auto_create_topics().unwrap_or_else(|| {
let default_config = NonZeroU32::new(1).unwrap();
info!(
?default_config,
"Automatically configuring creation of a single topic"
);
default_config
}),
));
// If you want this level of control, should be instatiating the
// services individually
let query_pool_name = "iox-shared";
let metrics = Arc::new(metric::Registry::default());
let catalog = catalog_dsn
.get_catalog("iox-all-in-one", Arc::clone(&metrics))
.await?;
// In the name of ease of use, automatically run db migrations in
// all in one mode to ensure the database is ready.
info!("running db migrations");
catalog.setup().await?;
// Create a topic
catalog
.repositories()
.await
.kafka_topics()
.create_or_get(query_pool_name)
.await?;
let object_store = Arc::new(
ObjectStore::try_from(router_run_config.object_store_config())
.map_err(Error::ObjectStoreParsing)?,
);
let time_provider: Arc<dyn TimeProvider> = Arc::new(SystemProvider::new());
// create common state from the router and use it below
let common_state = CommonServerState::from_config(router_run_config.clone())?;
// TODO: make num_threads a parameter (other modes have it
// configured by a command line)
let num_threads = num_cpus::get();
info!(%num_threads, "Creating shared query executor");
let exec = Arc::new(Executor::new(num_threads));
info!("starting router2");
let router2 = create_router2_server_type(
&common_state,
Arc::clone(&metrics),
Arc::clone(&catalog),
&write_buffer_config,
query_pool_name,
)
.await?;
info!("starting ingester");
let ingester = create_ingester_server_type(
&common_state,
Arc::clone(&metrics),
Arc::clone(&catalog),
Arc::clone(&object_store),
Arc::clone(&exec),
&write_buffer_config,
ingester_config,
)
.await?;
info!("starting compactor");
let sequencers = vec![]; // TODO sequencers
let compactor = create_compactor_server_type(
&common_state,
Arc::clone(&metrics),
Arc::clone(&catalog),
Arc::clone(&object_store),
Arc::clone(&exec),
Arc::clone(&time_provider),
sequencers,
)
.await;
info!("starting querier");
let querier = create_querier_server_type(
&common_state,
metrics,
catalog,
object_store,
time_provider,
exec,
)
.await;
info!("starting all in one server");
let services = vec![
Service::create(router2, &router_run_config),
Service::create_grpc_only(ingester, &ingester_run_config),
Service::create_grpc_only(compactor, &compactor_run_config),
Service::create_grpc_only(querier, &querier_run_config),
];
Ok(influxdb_ioxd::main(common_state, services).await?)
}

View File

@ -15,6 +15,7 @@ use influxdb_ioxd::{
common_state::{CommonServerState, CommonServerStateError},
compactor::create_compactor_server_type,
},
Service,
};
#[derive(Debug, Error)]
@ -106,5 +107,6 @@ pub async fn command(config: Config) -> Result<(), Error> {
info!("starting compactor");
Ok(influxdb_ioxd::main(common_state, server_type).await?)
let services = vec![Service::create(server_type, common_state.run_config())];
Ok(influxdb_ioxd::main(common_state, services).await?)
}

View File

@ -13,6 +13,7 @@ use influxdb_ioxd::{
DatabaseServerType,
},
},
Service,
};
use thiserror::Error;
@ -130,5 +131,6 @@ pub async fn command(config: Config) -> Result<()> {
config.config_file.is_some(),
));
Ok(influxdb_ioxd::main(common_state, server_type).await?)
let services = vec![Service::create(server_type, common_state.run_config())];
Ok(influxdb_ioxd::main(common_state, services).await?)
}

View File

@ -10,6 +10,7 @@ use influxdb_ioxd::{
common_state::{CommonServerState, CommonServerStateError},
ingester::create_ingester_server_type,
},
Service,
};
use object_store::ObjectStore;
use observability_deps::tracing::*;
@ -101,5 +102,6 @@ pub async fn command(config: Config) -> Result<()> {
info!("starting ingester");
Ok(influxdb_ioxd::main(common_state, server_type).await?)
let services = vec![Service::create(server_type, common_state.run_config())];
Ok(influxdb_ioxd::main(common_state, services).await?)
}

View File

@ -2,13 +2,14 @@ use snafu::{ResultExt, Snafu};
use clap_blocks::run_config::RunConfig;
pub mod compactor;
pub mod database;
pub mod ingester;
pub mod querier;
pub mod router;
pub mod router2;
pub mod test;
mod all_in_one;
mod compactor;
mod database;
mod ingester;
mod querier;
mod router;
mod router2;
mod test;
#[derive(Debug, Snafu)]
#[allow(clippy::enum_variant_names)]
@ -31,6 +32,9 @@ pub enum Error {
#[snafu(display("Error in ingester subcommand: {}", source))]
IngesterError { source: ingester::Error },
#[snafu(display("Error in all in one subcommand: {}", source))]
AllInOneError { source: all_in_one::Error },
#[snafu(display("Error in test subcommand: {}", source))]
TestError { source: test::Error },
}
@ -58,6 +62,7 @@ impl Config {
Some(Command::Router(config)) => &config.run_config,
Some(Command::Router2(config)) => &config.run_config,
Some(Command::Ingester(config)) => &config.run_config,
Some(Command::AllInOne(config)) => &config.run_config,
Some(Command::Test(config)) => &config.run_config,
}
}
@ -68,13 +73,13 @@ enum Command {
/// Run the server in compactor mode
Compactor(compactor::Config),
/// Run the server in database mode
/// Run the server in database mode (Deprecated)
Database(database::Config),
/// Run the server in querier mode
Querier(querier::Config),
/// Run the server in routing mode
/// Run the server in routing mode (Deprecated)
Router(router::Config),
/// Run the server in router2 mode
@ -83,6 +88,9 @@ enum Command {
/// Run the server in ingester mode
Ingester(ingester::Config),
/// Run the server in "all in one" mode
AllInOne(all_in_one::Config),
/// Run the server in test mode
Test(test::Config),
}
@ -105,6 +113,7 @@ pub async fn command(config: Config) -> Result<()> {
Some(Command::Router(config)) => router::command(config).await.context(RouterSnafu),
Some(Command::Router2(config)) => router2::command(config).await.context(Router2Snafu),
Some(Command::Ingester(config)) => ingester::command(config).await.context(IngesterSnafu),
Some(Command::AllInOne(config)) => all_in_one::command(config).await.context(AllInOneSnafu),
Some(Command::Test(config)) => test::command(config).await.context(TestSnafu),
}
}

View File

@ -14,6 +14,7 @@ use influxdb_ioxd::{
common_state::{CommonServerState, CommonServerStateError},
querier::create_querier_server_type,
},
Service,
};
#[derive(Debug, Error)]
@ -95,5 +96,6 @@ pub async fn command(config: Config) -> Result<(), Error> {
info!("starting querier");
Ok(influxdb_ioxd::main(common_state, server_type).await?)
let services = vec![Service::create(server_type, common_state.run_config())];
Ok(influxdb_ioxd::main(common_state, services).await?)
}

View File

@ -13,6 +13,7 @@ use influxdb_ioxd::{
common_state::{CommonServerState, CommonServerStateError},
router::RouterServerType,
},
Service,
};
use observability_deps::tracing::warn;
use router::{resolver::RemoteTemplate, server::RouterServer};
@ -160,5 +161,6 @@ pub async fn command(config: Config) -> Result<()> {
config_immutable,
));
Ok(influxdb_ioxd::main(common_state, server_type).await?)
let services = vec![Service::create(server_type, common_state.run_config())];
Ok(influxdb_ioxd::main(common_state, services).await?)
}

View File

@ -11,6 +11,7 @@ use influxdb_ioxd::{
common_state::{CommonServerState, CommonServerStateError},
router2::create_router2_server_type,
},
Service,
};
use observability_deps::tracing::*;
use thiserror::Error;
@ -85,5 +86,6 @@ pub async fn command(config: Config) -> Result<()> {
.await?;
info!("starting router2");
Ok(influxdb_ioxd::main(common_state, server_type).await?)
let services = vec![Service::create(server_type, common_state.run_config())];
Ok(influxdb_ioxd::main(common_state, services).await?)
}

View File

@ -9,6 +9,7 @@ use influxdb_ioxd::{
common_state::{CommonServerState, CommonServerStateError},
test::{TestAction, TestServerType},
},
Service,
};
use metric::Registry;
use thiserror::Error;
@ -62,5 +63,6 @@ pub async fn command(config: Config) -> Result<()> {
config.test_action,
));
Ok(influxdb_ioxd::main(common_state, server_type).await?)
let services = vec![Service::create(server_type, common_state.run_config())];
Ok(influxdb_ioxd::main(common_state, services).await?)
}

View File

@ -1,10 +1,12 @@
use crate::server_type::{common_state::CommonServerState, ServerType};
use clap_blocks::run_config::RunConfig;
use futures::{future::FusedFuture, pin_mut, FutureExt};
use hyper::server::conn::AddrIncoming;
use observability_deps::tracing::{error, info};
use panic_logging::SendPanicsToTracing;
use snafu::{ResultExt, Snafu};
use std::{net::SocketAddr, sync::Arc};
use tokio_util::sync::CancellationToken;
use trace_http::ctx::TraceHeaderParser;
mod http;
@ -36,6 +38,9 @@ pub enum Error {
#[snafu(display("Error serving RPC: {}", source))]
ServingRpc { source: server_type::RpcError },
#[snafu(display("Error joining server task: {}", source))]
Joining { source: tokio::task::JoinError },
#[snafu(display("Early Http shutdown"))]
LostHttp,
@ -102,13 +107,40 @@ fn build_malloc_conf() -> String {
"clippy".to_string()
}
/// A service that will start on the specified addresses
pub struct Service {
http_bind_address: Option<clap_blocks::socket_addr::SocketAddr>,
grpc_bind_address: clap_blocks::socket_addr::SocketAddr,
server_type: Arc<dyn ServerType>,
}
impl Service {
pub fn create(server_type: Arc<dyn ServerType>, run_config: &RunConfig) -> Self {
Self {
http_bind_address: Some(run_config.http_bind_address),
grpc_bind_address: run_config.grpc_bind_address,
server_type,
}
}
pub fn create_grpc_only(server_type: Arc<dyn ServerType>, run_config: &RunConfig) -> Self {
Self {
http_bind_address: None,
grpc_bind_address: run_config.grpc_bind_address,
server_type,
}
}
}
/// This is the entry point for the IOx server.
///
/// The precise server type depends on `T`. This entry point ensures that the given `server_type` is started using best
/// practice, e.g. that we print the GIT-hash and malloc-configs, that a panic handler is installed, etc.
/// This entry point ensures that the given set of Services are
/// started using best practice, e.g. that we print the GIT-hash and
/// malloc-configs, that a panic handler is installed, etc.
///
/// Due to the invasive nature of the setup routine, this should not be used during unit tests.
pub async fn main(common_state: CommonServerState, server_type: Arc<dyn ServerType>) -> Result<()> {
/// Due to its invasive nature (install global panic handling,
/// logging, etc) this function should not be used during unit tests.
pub async fn main(common_state: CommonServerState, services: Vec<Service>) -> Result<()> {
let git_hash = option_env!("GIT_HASH").unwrap_or("UNKNOWN");
let num_cpus = num_cpus::get();
let build_malloc_conf = build_malloc_conf();
@ -119,15 +151,19 @@ pub async fn main(common_state: CommonServerState, server_type: Arc<dyn ServerTy
"InfluxDB IOx server starting",
);
if (common_state.run_config().grpc_bind_address == common_state.run_config().http_bind_address)
&& (common_state.run_config().grpc_bind_address.port() != 0)
{
error!(
grpc_bind_address=%common_state.run_config().grpc_bind_address,
http_bind_address=%common_state.run_config().http_bind_address,
"grpc and http bind addresses must differ",
);
std::process::exit(1);
for service in &services {
if let Some(http_bind_address) = &service.http_bind_address {
if (&service.grpc_bind_address == http_bind_address)
&& (service.grpc_bind_address.port() != 0)
{
error!(
grpc_bind_address=%service.grpc_bind_address,
http_bind_address=%http_bind_address,
"grpc and http bind addresses must differ",
);
std::process::exit(1);
}
}
}
// Install custom panic handler and forget about it.
@ -141,22 +177,73 @@ pub async fn main(common_state: CommonServerState, server_type: Arc<dyn ServerTy
// Register jemalloc metrics
#[cfg(all(not(feature = "heappy"), feature = "jemalloc_replacing_malloc"))]
server_type
.metric_registry()
.register_instrument("jemalloc_metrics", jemalloc::JemallocMetrics::new);
let grpc_listener = grpc_listener(common_state.run_config().grpc_bind_address.into()).await?;
let http_listener = http_listener(common_state.run_config().http_bind_address.into()).await?;
let trace_exporter = common_state.trace_exporter();
let r = serve(common_state, grpc_listener, http_listener, server_type).await;
if let Some(trace_exporter) = trace_exporter {
if let Err(e) = trace_exporter.drain().await {
error!(%e, "error draining trace exporter");
}
for service in &services {
service
.server_type
.metric_registry()
.register_instrument("jemalloc_metrics", jemalloc::JemallocMetrics::new);
}
r
// Construct a token to trigger clean shutdown
let frontend_shutdown = CancellationToken::new();
let mut serving_futures = Vec::new();
for service in services {
let common_state = common_state.clone();
// start them all in their own tasks so the servers run at the same time
let frontend_shutdown = frontend_shutdown.clone();
serving_futures.push(tokio::spawn(async move {
let trace_exporter = common_state.trace_exporter();
let Service {
http_bind_address,
grpc_bind_address,
server_type,
} = service;
info!(?grpc_bind_address, "Binding gRPC services");
let grpc_listener = grpc_listener(grpc_bind_address.into()).await?;
let http_listener = match http_bind_address {
Some(http_bind_address) => {
info!(?http_bind_address, "Completed bind of gRPC, binding http");
Some(http_listener(http_bind_address.into()).await?)
}
None => {
info!("No http server specified");
None
}
};
let r = serve(
common_state,
frontend_shutdown,
grpc_listener,
http_listener,
server_type,
)
.await;
info!(
?grpc_bind_address,
?http_bind_address,
"done serving, draining futures"
);
if let Some(trace_exporter) = trace_exporter {
if let Err(e) = trace_exporter.drain().await {
error!(%e, "error draining trace exporter");
}
}
r
}));
}
for f in serving_futures {
// Use ?? to unwrap Result<Result<..>>
// "I heard you like errors, so I put an error in your error...."
f.await.context(JoiningSnafu)??;
}
Ok(())
}
pub async fn grpc_listener(addr: SocketAddr) -> Result<tokio::net::TcpListener> {
@ -179,19 +266,16 @@ pub async fn http_listener(addr: SocketAddr) -> Result<AddrIncoming> {
Ok(listener)
}
/// Instantiates the gRPC and HTTP listeners and returns a Future that completes when
/// these listeners, the Server, Databases, etc... have all exited.
///
/// This is effectively the "main loop" for influxdb_iox
/// Instantiates the gRPC and optional HTTP listeners and returns a
/// Future that completes when these listeners, the Server, Databases,
/// etc... have all exited or the frontend_shutdown token is called.
async fn serve(
common_state: CommonServerState,
frontend_shutdown: CancellationToken,
grpc_listener: tokio::net::TcpListener,
http_listener: AddrIncoming,
http_listener: Option<AddrIncoming>,
server_type: Arc<dyn ServerType>,
) -> Result<()> {
// Construct a token to trigger shutdown of API services
let frontend_shutdown = tokio_util::sync::CancellationToken::new();
let trace_header_parser = TraceHeaderParser::new()
.with_jaeger_trace_context_header_name(
&common_state
@ -207,7 +291,6 @@ async fn serve(
);
// Construct and start up gRPC server
let grpc_server = rpc::serve(
grpc_listener,
Arc::clone(&server_type),
@ -215,15 +298,25 @@ async fn serve(
frontend_shutdown.clone(),
)
.fuse();
info!("gRPC server listening");
let http_server = http::serve(
http_listener,
Arc::clone(&server_type),
frontend_shutdown.clone(),
trace_header_parser,
)
let captured_server_type = Arc::clone(&server_type);
let captured_shutdown = frontend_shutdown.clone();
let http_server = async move {
if let Some(http_listener) = http_listener {
http::serve(
http_listener,
captured_server_type,
captured_shutdown,
trace_header_parser,
)
.await?
} else {
// don't resolve otherwise will cause server to shutdown
captured_shutdown.cancelled().await
}
Ok(())
}
.fuse();
info!("HTTP server listening");

View File

@ -12,7 +12,7 @@ pub enum CommonServerStateError {
}
/// Common state used by all server types (e.g. `Database` and `Router`)
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct CommonServerState {
run_config: RunConfig,
trace_exporter: Option<Arc<trace_exporters::export::AsyncExporter>>,

View File

@ -220,7 +220,14 @@ mod tests {
false,
));
serve(common_state, grpc_listener, http_listener, server_type).await
serve(
common_state,
CancellationToken::new(),
grpc_listener,
Some(http_listener),
server_type,
)
.await
}
#[tokio::test]
@ -413,7 +420,13 @@ mod tests {
false,
));
let fut = serve(common_state, grpc_listener, http_listener, server_type);
let fut = serve(
common_state,
CancellationToken::new(),
grpc_listener,
Some(http_listener),
server_type,
);
let join = tokio::spawn(fut);
(addr, server, join)

View File

@ -1,5 +1,5 @@
//! IOx ingester implementation.
//! Design doc: https://docs.google.com/document/d/14NlzBiWwn0H37QxnE0k3ybTU58SKyUZmdgYpVw6az0Q/edit#
//! Design doc: <https://docs.google.com/document/d/14NlzBiWwn0H37QxnE0k3ybTU58SKyUZmdgYpVw6az0Q/edit#>
//!
#![deny(rustdoc::broken_intra_doc_links, rust_2018_idioms)]

View File

@ -616,7 +616,7 @@ async fn maybe_auto_create_directories(
continue;
}
} else {
return Err("no sequencers initialized".to_string().into());
return Err("no file sequencers initialized".to_string().into());
}
}
}