Merge pull request #4447 from influxdata/cn/aio

feat: Make all-in-one mode easier to use
pull/24376/head
kodiakhq[bot] 2022-04-28 14:00:55 +00:00 committed by GitHub
commit 3198d75f2f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 369 additions and 141 deletions

View File

@ -65,6 +65,7 @@ To compile and run InfluxDB IOx from source, you'll need the following:
* [Clang](#clang)
* [lld (on Linux)](#lld)
* [protoc (on Apple Silicon)](#protoc)
* [Postgres](#postgres)
#### Rust
@ -136,6 +137,16 @@ PROTOC_INCLUDE=/opt/homebrew/include
IOx should then build correctly.
#### Postgres
The catalog is stored in Postgres (unless you're running in ephemeral mode). Postgres can be installed via Homebrew:
```shell
brew install postgres
```
then follow the instructions for starting Postgres either at system startup or on-demand.
### Clone the repository
Clone this repository using `git`.
@ -158,7 +169,7 @@ The rest of these instructions assume you are in this directory.
InfluxDB IOx can be configured using either environment variables or a configutation file,
making it suitable for deployment in containerized environments.
For a list configuration options, run `influxdb_iox --help`.
For a list of configuration options, run `influxdb_iox --help`.
For configuration options for specific subcommands, run `influxdb_iox <subcommand> --help`.
To use a configuration file, use a `.env` file in the working directory.
@ -169,7 +180,7 @@ To use the example configuration file, run:
cp docs/env.example .env
```
### Compile and start the server
### Compiling and Running
InfluxDB IOx is built using Cargo, Rust's package manager and build tool.
@ -181,34 +192,65 @@ cargo build
This which will create a binary at `target/debug/influxdb_iox`.
To start the InfluxDB IOx server, run:
#### Ephemeral mode
To start InfluxDB IOx and store data in memory, after you've compiled for development, run:
```shell
./target/debug/influxdb_iox run database
./target/debug/influxdb_iox run all-in-one
```
By default the server will start an HTTP server on port `8080` and a gRPC server on port `8082`.
You can also compile and run with one command:
#### Local persistence mode
To start InfluxDB IOx and store the catalog in Postgres and data in the local filesystem to persist
data across restarts, after you've compiled for development, run:
```shell
cargo run -- run database
./target/debug/influxdb_iox run all-in-one --catalog-dsn postgres:///iox_shared --data-dir=~/iox_data
```
To compile for performance testing, build in release mode:
where `--catalog-dsn` is a connection URL to the Postgres database you wish to use, and
`--data-dir` is the directory you wish to use.
### Loading data in local mode
Because the services run on different gRPC ports, and because the CLI uses the gRPC write API, if
you're using `influxdb_iox database` you have to set a `--host` with the correct gRPC
```shell
influxdb_iox -vv database write my_db test_fixtures/lineproto/metrics.lp --host http://localhost:8081
```
#### Compile and run
Rather than building and running the binary in `target`, you can also compile and run with one
command:
```shell
cargo run -- run all-in-one
```
#### Release mode for performance testing
To compile for performance testing, build in release mode then use the binary in `target/release`:
```shell
cargo build --release
./target/release/influxdb_iox run database
./target/release/influxdb_iox run all-in-one
```
You can also run in release mode with one step:
You can also compile and run in release mode with one step:
```shell
cargo run --release -- run database
cargo run --release -- run all-in-one
```
To run all available tests in debug mode, you may want to set min stack size to avoid the current known stack overflow issue:
#### Running tests
To run all available tests in debug mode, you may want to set min stack size to avoid the current
known stack overflow issue:
```shell
RUST_MIN_STACK=10485760 cargo test --all
@ -234,26 +276,6 @@ DOCKER_BUILDKIT=1 docker build .
### Write and read data
Each IOx instance requires a server ID.
This can be set one of 4 ways:
* set an environment variable `INFLUXDB_IOX_ID=42`
* set a flag `--server-id 42`
* use the API (not convered here)
* use the CLI
```shell
influxdb_iox server set 42
```
To write data, you need to create a database.
You can do so via the API or using the CLI.
For example, to create a database called `company_sensors`, use this command:
```shell
influxdb_iox database create company_sensors
```
Data can be stored in InfluxDB IOx by sending it in [line protocol] format to the `/api/v2/write` endpoint or using the CLI.
For example, here is a command that will send the data in the `test_fixtures/lineproto/metrics.lp` file in this repository, assuming that you're running the server on the default port into the `company_sensors` database, you can use:

View File

@ -3,26 +3,20 @@ name = "clap_blocks"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
clap = { version = "3", features = ["derive", "env"] }
data_types = { path = "../data_types" }
futures = "0.3"
iox_catalog = { path = "../iox_catalog" }
iox_time = { path = "../iox_time" }
metric = { path = "../metric" }
object_store = { path = "../object_store" }
observability_deps = { path = "../observability_deps" }
trace_exporters = { path = "../trace_exporters" }
trace = { path = "../trace" }
iox_time = { path = "../iox_time" }
trogging = { path = "../trogging", default-features = false, features = ["clap"] }
write_buffer = { path = "../write_buffer" }
clap = { version = "3", features = ["derive", "env"] }
futures = "0.3"
snafu = "0.7"
tempfile = "3.1.0"
trace = { path = "../trace" }
trace_exporters = { path = "../trace_exporters" }
trogging = { path = "../trogging", default-features = false, features = ["clap"] }
uuid = { version = "0.8", features = ["v4"] }
workspace-hack = { path = "../workspace-hack"}
[dev-dependencies]
tempfile = "3.1.0"
write_buffer = { path = "../write_buffer" }

View File

@ -1,6 +1,7 @@
use iox_catalog::{
create_or_get_default_records, interface::Catalog, mem::MemCatalog, postgres::PostgresCatalog,
};
use observability_deps::tracing::*;
use snafu::{OptionExt, ResultExt, Snafu};
use std::{ops::DerefMut, sync::Arc};
@ -55,6 +56,31 @@ pub enum CatalogType {
}
impl CatalogDsnConfig {
/// Create a new memory instance for all-in-one mode if a catalog DSN is not specified, setting
/// the default for arguments that are irrelevant
pub fn new_memory() -> Self {
info!("Catalog: In-memory");
Self {
catalog_type_: CatalogType::Memory,
dsn: None,
max_catalog_connections: 10,
postgres_schema_name: iox_catalog::postgres::SCHEMA_NAME.to_string(),
}
}
/// Create a new Postgres instance for all-in-one mode if a catalog DSN is specified
pub fn new_postgres(dsn: String, postgres_schema_name: String) -> Self {
info!("Catalog: Postgres at `{}`", dsn);
Self {
catalog_type_: CatalogType::Postgres,
dsn: Some(dsn),
max_catalog_connections: 10,
postgres_schema_name,
}
}
pub async fn get_catalog(
&self,
app_name: &'static str,

View File

@ -1,10 +1,10 @@
//! CLI handling for object store config (via CLI arguments and environment variables).
use std::{convert::TryFrom, fs, num::NonZeroUsize, path::PathBuf, time::Duration};
use futures::TryStreamExt;
use object_store::{path::ObjectStorePath, DynObjectStore, ObjectStoreImpl, ThrottleConfig};
use observability_deps::tracing::{info, warn};
use snafu::{ResultExt, Snafu};
use std::{convert::TryFrom, fs, num::NonZeroUsize, path::PathBuf, time::Duration};
use uuid::Uuid;
#[derive(Debug, Snafu)]
@ -184,6 +184,34 @@ Possible values (case insensitive):
pub object_store_connection_limit: NonZeroUsize,
}
impl ObjectStoreConfig {
/// Create a new instance for all-in-one mode, only allowing some arguments.
pub fn new(database_directory: Option<PathBuf>) -> Self {
match &database_directory {
Some(dir) => info!("Object store: File-based in `{}`", dir.display()),
None => info!("Object store: In-memory"),
}
let object_store = database_directory.as_ref().map(|_| ObjectStoreType::File);
Self {
aws_access_key_id: Default::default(),
aws_allow_http: Default::default(),
aws_default_region: Default::default(),
aws_endpoint: Default::default(),
aws_secret_access_key: Default::default(),
aws_session_token: Default::default(),
azure_storage_access_key: Default::default(),
azure_storage_account: Default::default(),
bucket: Default::default(),
database_directory,
google_service_account: Default::default(),
object_store,
object_store_connection_limit: NonZeroUsize::new(16).unwrap(),
}
}
}
#[derive(Debug, Copy, Clone, PartialEq, clap::ArgEnum)]
pub enum ObjectStoreType {
Memory,

View File

@ -95,4 +95,26 @@ impl RunConfig {
self.grpc_bind_address = grpc_bind_address;
self
}
/// Create a new instance for all-in-one mode, only allowing some arguments.
pub fn new(
logging_config: LoggingConfig,
tracing_config: TracingConfig,
http_bind_address: SocketAddr,
grpc_bind_address: SocketAddr,
max_http_request_size: usize,
object_store_config: ObjectStoreConfig,
) -> Self {
Self {
logging_config,
tracing_config,
// TODO: server_id isn't used in NG; this field should be removed when OG is removed
// https://github.com/influxdata/influxdb_iox/issues/4451
server_id_config: ServerIdConfig { server_id: None },
http_bind_address,
grpc_bind_address,
max_http_request_size,
object_store_config,
}
}
}

View File

@ -1,6 +1,8 @@
use data_types::write_buffer::{WriteBufferConnection, WriteBufferCreationConfig};
use iox_time::SystemProvider;
use std::{collections::BTreeMap, num::NonZeroU32, sync::Arc};
use observability_deps::tracing::*;
use std::{collections::BTreeMap, num::NonZeroU32, path::PathBuf, sync::Arc};
use tempfile::TempDir;
use trace::TraceCollector;
use write_buffer::{
config::WriteBufferConfigFactory,
@ -53,6 +55,30 @@ pub struct WriteBufferConfig {
}
impl WriteBufferConfig {
/// Create a new instance for all-in-one mode, only allowing some arguments.
/// If `database_directory` is not specified, creates a new temporary directory.
pub fn new(topic: &str, database_directory: Option<PathBuf>) -> Self {
let connection_string = database_directory
.map(|pathbuf| pathbuf.display().to_string())
.unwrap_or_else(|| {
TempDir::new()
.expect("Creating a temporary directory should work")
.into_path()
.display()
.to_string()
});
info!("Write buffer: File-based in `{}`", connection_string);
Self {
type_: "file".to_string(),
connection_string,
topic: topic.to_string(),
connection_config: Default::default(),
auto_create_topics: Some(NonZeroU32::new(1).unwrap()),
}
}
/// Initialize a [`WriteBufferWriting`].
pub async fn writing(
&self,

View File

@ -1,17 +1,16 @@
//! Implementation of command line option for running all in one mode
use std::{num::NonZeroU32, sync::Arc};
use clap_blocks::compactor::CompactorConfig;
use super::main;
use clap_blocks::{
catalog_dsn::CatalogDsnConfig,
ingester::IngesterConfig,
run_config::{RunConfig, DEFAULT_API_BIND_ADDR, DEFAULT_GRPC_BIND_ADDR},
socket_addr::SocketAddr,
catalog_dsn::CatalogDsnConfig, compactor::CompactorConfig, ingester::IngesterConfig,
object_store::ObjectStoreConfig, run_config::RunConfig, socket_addr::SocketAddr,
write_buffer::WriteBufferConfig,
};
use iox_time::{SystemProvider, TimeProvider};
use ioxd_common::server_type::{CommonServerState, CommonServerStateError};
use ioxd_common::Service;
use ioxd_common::{
server_type::{CommonServerState, CommonServerStateError},
Service,
};
use ioxd_compactor::create_compactor_server_type;
use ioxd_ingester::create_ingester_server_type;
use ioxd_querier::create_querier_server_type;
@ -19,9 +18,10 @@ use ioxd_router2::create_router2_server_type;
use object_store::{DynObjectStore, ObjectStoreImpl};
use observability_deps::tracing::*;
use query::exec::Executor;
use std::{path::PathBuf, sync::Arc};
use thiserror::Error;
use super::main;
use trace_exporters::TracingConfig;
use trogging::cli::LoggingConfig;
/// The default bind address for the Router HTTP API.
pub const DEFAULT_ROUTER_HTTP_BIND_ADDR: &str = "127.0.0.1:8080";
@ -32,12 +32,15 @@ pub const DEFAULT_ROUTER_GRPC_BIND_ADDR: &str = "127.0.0.1:8081";
/// The default bind address for the Querier gRPC (chosen to match default gRPC addr)
pub const DEFAULT_QUERIER_GRPC_BIND_ADDR: &str = "127.0.0.1:8082";
/// The default bind address for the Ingester gRPC (chosen to match default gRPC addr)
/// The default bind address for the Ingester gRPC
pub const DEFAULT_INGESTER_GRPC_BIND_ADDR: &str = "127.0.0.1:8083";
/// The default bind address for the Compactor gRPC (chosen to match default gRPC addr)
/// The default bind address for the Compactor gRPC
pub const DEFAULT_COMPACTOR_GRPC_BIND_ADDR: &str = "127.0.0.1:8084";
// If you want this level of control, should be instantiating the services individually
const QUERY_POOL_NAME: &str = "iox-shared";
#[derive(Debug, Error)]
pub enum Error {
#[error("Run: {0}")]
@ -105,7 +108,7 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
///
/// Currently, the starts services on 5 ports, designed so that the
/// ports used to interact with the old architecture are the same as
/// the new architecture (8082 write endpoint and query on 8082).
/// the new architecture (8081 write endpoint and query on 8082).
///
/// Router;
/// 8080 http (overrides INFLUXDB_IOX_BIND_ADDR)
@ -137,17 +140,90 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
about = "Runs in IOx All in One mode, containing router, ingester, compactor and querier."
)]
pub struct Config {
/// logging options
#[clap(flatten)]
pub(crate) run_config: RunConfig,
pub(crate) logging_config: LoggingConfig,
/// tracing options
#[clap(flatten)]
pub(crate) catalog_dsn: CatalogDsnConfig,
pub(crate) tracing_config: TracingConfig,
#[clap(flatten)]
pub(crate) write_buffer_config: WriteBufferConfig,
/// Maximum size of HTTP requests.
#[clap(
long = "--max-http-request-size",
env = "INFLUXDB_IOX_MAX_HTTP_REQUEST_SIZE",
default_value = "10485760" // 10 MiB
)]
pub max_http_request_size: usize,
#[clap(flatten)]
pub(crate) ingester_config: IngesterConfig,
/// The location InfluxDB IOx will use to store files locally. If not specified, will run in
/// ephemeral mode.
#[clap(long = "--data-dir", env = "INFLUXDB_IOX_DB_DIR")]
pub database_directory: Option<PathBuf>,
/// Postgres connection string. If not specified, will use an in-memory catalog.
#[clap(long = "--catalog-dsn", env = "INFLUXDB_IOX_CATALOG_DSN")]
pub dsn: Option<String>,
/// Schema name for PostgreSQL-based catalogs.
#[clap(
long = "--catalog-postgres-schema-name",
env = "INFLUXDB_IOX_CATALOG_POSTGRES_SCHEMA_NAME",
default_value = iox_catalog::postgres::SCHEMA_NAME,
)]
pub postgres_schema_name: String,
/// The ingester will continue to pull data and buffer it from the write buffer
/// as long as it is below this size. If it hits this size it will pause
/// ingest from the write buffer until persistence goes below this threshold.
/// The default value is 100 GB (in bytes).
#[clap(
long = "--pause-ingest-size-bytes",
env = "INFLUXDB_IOX_PAUSE_INGEST_SIZE_BYTES",
default_value = "107374182400"
)]
pub pause_ingest_size_bytes: usize,
/// Once the ingester crosses this threshold of data buffered across
/// all sequencers, it will pick the largest partitions and persist
/// them until it falls below this threshold. An ingester running in
/// a steady state is expected to take up this much memory.
/// The default value is 1 GB (in bytes).
#[clap(
long = "--persist-memory-threshold-bytes",
env = "INFLUXDB_IOX_PERSIST_MEMORY_THRESHOLD_BYTES",
default_value = "1073741824"
)]
pub persist_memory_threshold_bytes: usize,
/// If an individual partition crosses this size threshold, it will be persisted.
/// The default value is 300MB (in bytes).
#[clap(
long = "--persist-partition-size-threshold-bytes",
env = "INFLUXDB_IOX_PERSIST_PARTITION_SIZE_THRESHOLD_BYTES",
default_value = "314572800"
)]
pub persist_partition_size_threshold_bytes: usize,
/// If a partition has had data buffered for longer than this period of time
/// it will be persisted. This puts an upper bound on how far back the
/// ingester may need to read in Kafka on restart or recovery. The default value
/// is 30 minutes (in seconds).
#[clap(
long = "--persist-partition-age-threshold-seconds",
env = "INFLUXDB_IOX_PERSIST_PARTITION_AGE_THRESHOLD_SECONDS",
default_value = "1800"
)]
pub persist_partition_age_threshold_seconds: u64,
/// If a partition has had data buffered and hasn't received a write for this
/// period of time, it will be persisted. The default value is 300 seconds (5 minutes).
#[clap(
long = "--persist-partition-cold-threshold-seconds",
env = "INFLUXDB_IOX_PERSIST_PARTITION_COLD_THRESHOLD_SECONDS",
default_value = "300"
)]
pub persist_partition_cold_threshold_seconds: u64,
/// The address on which IOx will serve Router HTTP API requests
#[clap(
@ -194,10 +270,17 @@ impl Config {
/// Get a specialized run config to use for each service
fn specialize(self) -> SpecializedConfig {
let Self {
run_config,
catalog_dsn,
write_buffer_config,
ingester_config,
logging_config,
tracing_config,
max_http_request_size,
database_directory,
dsn,
postgres_schema_name,
pause_ingest_size_bytes,
persist_memory_threshold_bytes,
persist_partition_size_threshold_bytes,
persist_partition_age_threshold_seconds,
persist_partition_cold_threshold_seconds,
router_http_bind_address,
router_grpc_bind_address,
querier_grpc_bind_address,
@ -205,35 +288,54 @@ impl Config {
compactor_grpc_bind_address,
} = self;
if run_config.http_bind_address != DEFAULT_API_BIND_ADDR.parse().unwrap() {
eprintln!("Warning: --http-bind-addr ignored in all in one mode");
}
if run_config.grpc_bind_address != DEFAULT_GRPC_BIND_ADDR.parse().unwrap() {
eprintln!("Warning: --grpc-bind-addr ignored in all in one mode");
}
let object_store_config = ObjectStoreConfig::new(database_directory.clone());
let write_buffer_config = WriteBufferConfig::new(QUERY_POOL_NAME, database_directory);
let catalog_dsn = dsn
.map(|postgres_url| CatalogDsnConfig::new_postgres(postgres_url, postgres_schema_name))
.unwrap_or_else(CatalogDsnConfig::new_memory);
let router_run_config = run_config
.clone()
.with_http_bind_address(router_http_bind_address)
.with_grpc_bind_address(router_grpc_bind_address);
let router_run_config = RunConfig::new(
logging_config,
tracing_config,
router_http_bind_address,
router_grpc_bind_address,
max_http_request_size,
object_store_config,
);
let querier_run_config = run_config
let querier_run_config = router_run_config
.clone()
.with_grpc_bind_address(querier_grpc_bind_address);
let ingester_run_config = run_config
let ingester_run_config = router_run_config
.clone()
.with_grpc_bind_address(ingester_grpc_bind_address);
let compactor_run_config = run_config.with_grpc_bind_address(compactor_grpc_bind_address);
let compactor_run_config = router_run_config
.clone()
.with_grpc_bind_address(compactor_grpc_bind_address);
// All-in-one mode only supports one write buffer partition.
let write_buffer_partition_range_start = 0;
let write_buffer_partition_range_end = 0;
let ingester_config = IngesterConfig {
write_buffer_partition_range_start,
write_buffer_partition_range_end,
pause_ingest_size_bytes,
persist_memory_threshold_bytes,
persist_partition_size_threshold_bytes,
persist_partition_age_threshold_seconds,
persist_partition_cold_threshold_seconds,
};
// create a CompactorConfig for the all in one server based on
// settings from other configs. Cant use `#clap(flatten)` as the
// parameters are redundant with ingesters
// settings from other configs. Can't use `#clap(flatten)` as the
// parameters are redundant with ingester's
let compactor_config = CompactorConfig {
topic: write_buffer_config.topic().to_string(),
write_buffer_partition_range_start: ingester_config.write_buffer_partition_range_start,
write_buffer_partition_range_end: ingester_config.write_buffer_partition_range_end,
topic: QUERY_POOL_NAME.to_string(),
write_buffer_partition_range_start,
write_buffer_partition_range_end,
split_percentage: 90,
max_concurrent_compaction_size_bytes: 100000,
compaction_max_size_bytes: 100000,
@ -275,27 +377,11 @@ pub async fn command(config: Config) -> Result<()> {
ingester_run_config,
compactor_run_config,
catalog_dsn,
mut write_buffer_config,
write_buffer_config,
ingester_config,
compactor_config,
} = config.specialize();
// Ensure at least one topic is automatically created in all in one mode
write_buffer_config.set_auto_create_topics(Some(
write_buffer_config.auto_create_topics().unwrap_or_else(|| {
let default_config = NonZeroU32::new(1).unwrap();
info!(
?default_config,
"Automatically configuring creation of a single topic"
);
default_config
}),
));
// If you want this level of control, should be instatiating the
// services individually
let query_pool_name = "iox-shared";
let metrics = Arc::new(metric::Registry::default());
let catalog = catalog_dsn
@ -312,7 +398,7 @@ pub async fn command(config: Config) -> Result<()> {
.repositories()
.await
.kafka_topics()
.create_or_get(query_pool_name)
.create_or_get(QUERY_POOL_NAME)
.await?;
let object_store: Arc<DynObjectStore> = Arc::new(
@ -338,7 +424,7 @@ pub async fn command(config: Config) -> Result<()> {
Arc::clone(&catalog),
Arc::clone(&object_store),
&write_buffer_config,
query_pool_name,
QUERY_POOL_NAME,
1_000, // max 1,000 concurrent HTTP requests
)
.await?;

View File

@ -132,16 +132,20 @@ pub async fn main(
server_type,
} = service;
info!(?grpc_bind_address, "Binding gRPC services");
info!(?grpc_bind_address, ?server_type, "Binding gRPC services");
let grpc_listener = grpc_listener(grpc_bind_address.into()).await?;
let http_listener = match http_bind_address {
Some(http_bind_address) => {
info!(?http_bind_address, "Completed bind of gRPC, binding http");
info!(
?http_bind_address,
?server_type,
"Completed bind of gRPC, binding http"
);
Some(http_listener(http_bind_address.into()).await?)
}
None => {
info!("No http server specified");
info!(?server_type, "No http server specified");
None
}
};
@ -151,13 +155,14 @@ pub async fn main(
frontend_shutdown,
grpc_listener,
http_listener,
server_type,
Arc::clone(&server_type),
)
.await;
info!(
?grpc_bind_address,
?http_bind_address,
?server_type,
"done serving, draining futures"
);
if let Some(trace_exporter) = trace_exporter {

View File

@ -1,6 +1,5 @@
use snafu::{ResultExt, Snafu};
use clap_blocks::run_config::RunConfig;
use trogging::cli::LoggingConfig;
mod all_in_one;
mod compactor;
@ -54,17 +53,17 @@ pub struct Config {
}
impl Config {
pub fn run_config(&self) -> &RunConfig {
pub fn logging_config(&self) -> &LoggingConfig {
match &self.command {
None => &self.database_config.run_config,
Some(Command::Compactor(config)) => &config.run_config,
Some(Command::Database(config)) => &config.run_config,
Some(Command::Querier(config)) => &config.run_config,
Some(Command::Router(config)) => &config.run_config,
Some(Command::Router2(config)) => &config.run_config,
Some(Command::Ingester(config)) => &config.run_config,
Some(Command::AllInOne(config)) => &config.run_config,
Some(Command::Test(config)) => &config.run_config,
None => self.database_config.run_config.logging_config(),
Some(Command::Compactor(config)) => config.run_config.logging_config(),
Some(Command::Database(config)) => config.run_config.logging_config(),
Some(Command::Querier(config)) => config.run_config.logging_config(),
Some(Command::Router(config)) => config.run_config.logging_config(),
Some(Command::Router2(config)) => config.run_config.logging_config(),
Some(Command::Ingester(config)) => config.run_config.logging_config(),
Some(Command::AllInOne(config)) => &config.logging_config,
Some(Command::Test(config)) => config.run_config.logging_config(),
}
}
}

View File

@ -21,7 +21,7 @@ pub fn init_logs_and_tracing(
log_verbose_count: u8,
config: &crate::commands::run::Config,
) -> Result<TroggingGuard, trogging::Error> {
let mut logging_config = config.run_config().logging_config().clone();
let mut logging_config = config.logging_config().clone();
// Handle the case if -v/-vv is specified both before and after the server
// command

View File

@ -119,12 +119,13 @@ pub async fn serve(
frontend_shutdown.clone(),
)
.fuse();
info!("gRPC server listening");
info!(?server_type, "gRPC server listening");
let captured_server_type = Arc::clone(&server_type);
let captured_shutdown = frontend_shutdown.clone();
let http_server = async move {
if let Some(http_listener) = http_listener {
info!(server_type=?captured_server_type, "HTTP server listening");
http::serve(
http_listener,
captured_server_type,
@ -139,10 +140,9 @@ pub async fn serve(
Ok(())
}
.fuse();
info!("HTTP server listening");
// Purposefully use log not tokio-tracing to ensure correctly hooked up
log::info!("InfluxDB IOx server ready");
log::info!("InfluxDB IOx {:?} server ready", server_type);
// Get IOx background worker join handle
let server_handle = Arc::clone(&server_type).join().fuse();
@ -192,30 +192,30 @@ pub async fn serve(
// registry, don't exit before HTTP and gRPC requests dependent on them
while !grpc_server.is_terminated() && !http_server.is_terminated() {
futures::select! {
_ = signal => info!("Shutdown requested"),
_ = signal => info!(?server_type, "Shutdown requested"),
_ = server_handle => {
error!("server worker shutdown prematurely");
error!(?server_type, "server worker shutdown prematurely");
res = res.and(Err(Error::LostServer));
},
result = grpc_server => match result {
Ok(_) if frontend_shutdown.is_cancelled() => info!("gRPC server shutdown"),
Ok(_) if frontend_shutdown.is_cancelled() => info!(?server_type, "gRPC server shutdown"),
Ok(_) => {
error!("Early gRPC server exit");
error!(?server_type, "Early gRPC server exit");
res = res.and(Err(Error::LostRpc));
}
Err(error) => {
error!(%error, "gRPC server error");
error!(%error, ?server_type, "gRPC server error");
res = res.and(Err(Error::ServingRpc{source: error}));
}
},
result = http_server => match result {
Ok(_) if frontend_shutdown.is_cancelled() => info!("HTTP server shutdown"),
Ok(_) if frontend_shutdown.is_cancelled() => info!(?server_type, "HTTP server shutdown"),
Ok(_) => {
error!("Early HTTP server exit");
error!(?server_type, "Early HTTP server exit");
res = res.and(Err(Error::LostHttp));
}
Err(error) => {
error!(%error, "HTTP server error");
error!(%error, ?server_type, "HTTP server error");
res = res.and(Err(Error::ServingHttp{source: error}));
}
},
@ -223,13 +223,13 @@ pub async fn serve(
frontend_shutdown.cancel()
}
info!("frontend shutdown completed");
info!(?server_type, "frontend shutdown completed");
server_type.shutdown();
if !server_handle.is_terminated() {
server_handle.await;
}
info!("backend shutdown completed");
info!(?server_type, "backend shutdown completed");
res
}

View File

@ -42,12 +42,17 @@ pub enum Error {
pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug)]
pub struct CompactorServerType<C: CompactorHandler> {
server: CompactorServer<C>,
trace_collector: Option<Arc<dyn TraceCollector>>,
}
impl<C: CompactorHandler> std::fmt::Debug for CompactorServerType<C> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Compactor")
}
}
impl<C: CompactorHandler> CompactorServerType<C> {
pub fn new(server: CompactorServer<C>, common_state: &CommonServerState) -> Self {
Self {

View File

@ -51,12 +51,17 @@ pub enum Error {
pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug)]
pub struct IngesterServerType<I: IngestHandler> {
server: IngesterServer<I>,
trace_collector: Option<Arc<dyn TraceCollector>>,
}
impl<I: IngestHandler> std::fmt::Debug for IngesterServerType<I> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Ingester")
}
}
impl<I: IngestHandler> IngesterServerType<I> {
pub fn new(server: IngesterServer<I>, common_state: &CommonServerState) -> Self {
Self {

View File

@ -27,13 +27,18 @@ use ioxd_common::{
mod rpc;
#[derive(Debug)]
pub struct QuerierServerType<C: QuerierHandler> {
database: Arc<QuerierDatabase>,
server: QuerierServer<C>,
trace_collector: Option<Arc<dyn TraceCollector>>,
}
impl<C: QuerierHandler> std::fmt::Debug for QuerierServerType<C> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Querier")
}
}
impl<C: QuerierHandler> QuerierServerType<C> {
pub fn new(
server: QuerierServer<C>,

View File

@ -53,7 +53,6 @@ pub enum Error {
pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug)]
pub struct RouterServerType<D> {
server: RouterServer<D>,
shutdown: CancellationToken,
@ -70,6 +69,12 @@ impl<D> RouterServerType<D> {
}
}
impl<D> std::fmt::Debug for RouterServerType<D> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Router2")
}
}
#[async_trait]
impl<D> ServerType for RouterServerType<D>
where