feat: Formalizes the config system for IOx, including tests (#608)

* feat: Create configuration system, port IOx to use it

* docs: Apply suggestions from code review

Co-authored-by: Paul Dix <paul@influxdata.com>

* fix: fix test for setting values

Co-authored-by: Paul Dix <paul@influxdata.com>
pull/24376/head
Andrew Lamb 2020-12-31 07:02:31 -05:00 committed by GitHub
parent db6ce0503c
commit 9f0ff678f1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 1480 additions and 192 deletions

14
Cargo.lock generated
View File

@ -440,6 +440,16 @@ dependencies = [
"winapi 0.3.9",
]
[[package]]
name = "config"
version = "0.1.0"
dependencies = [
"dirs 3.0.1",
"dotenv",
"snafu",
"test_helpers",
]
[[package]]
name = "const_fn"
version = "0.4.4"
@ -1349,11 +1359,11 @@ dependencies = [
"byteorder",
"bytes",
"clap",
"config",
"criterion",
"csv",
"data_types",
"dirs 3.0.1",
"dotenv",
"env_logger",
"flate2",
"futures",
"generated_types",

View File

@ -9,6 +9,7 @@ default-run = "influxdb_iox"
members = [
"arrow_deps",
"server",
"config",
"data_types",
"generated_types",
"ingest",
@ -32,6 +33,7 @@ debug = true
debug = true
[dependencies]
config = { path = "config" }
data_types = { path = "data_types" }
arrow_deps = { path = "arrow_deps" }
generated_types = { path = "generated_types" }
@ -53,8 +55,6 @@ routerify = "1.1"
tokio = { version = "0.2", features = ["full"] }
clap = "2.33.1"
dotenv = "0.15.0"
dirs = "3.0.1"
futures = "0.3.1"
serde_json = "1.0.44"
@ -66,6 +66,7 @@ byteorder = "1.3.4"
tonic = "0.3.1"
prost = "0.6.1"
prost-types = "0.6.1"
env_logger = "0.7.1"
tracing = { version = "0.1", features = ["release_max_level_debug"] }
tracing-futures="0.2.4"

View File

@ -115,25 +115,19 @@ InstalledDir: /Library/Developer/CommandLineTools/usr/bin
### Specifying Configuration
**OPTIONAL:** There are a number of configuration variables you can choose to customize by
specifying values for environment variables in a `.env` file. To get an example file to start from,
run:
IOx is designed for running in modern containerized environments. As
such, it takes its configuration as enviroment variables.
```shell
cp docs/env.example .env
```
You can see a list of the current configuration values by running `influxdb_iox config show`.
then edit the newly-created `.env` file.
For development purposes, the most relevant environment variables are the `INFLUXDB_IOX_DB_DIR` and
`TEST_INFLUXDB_IOX_DB_DIR` variables that configure where files are stored on disk. The default
values are shown in the comments in the example file; to change them, uncomment the relevant lines
and change the values to the directories in which you'd like to store the files instead:
You can see a list of all available configuration items using the `influxdb_iox config help`
command.
Should you desire specifying config via a file, you can do so using a `.env` formatted file in `$HOME/.influxdb_iox/config`.
Note that you can save the current config values by saving the output of
`influxdb_iox config show` into `$HOME/.influxdb_iox/config`
```shell
INFLUXDB_IOX_DB_DIR=/some/place/else
TEST_INFLUXDB_IOX_DB_DIR=/another/place
```
### Compiling and Starting the Server

17
config/Cargo.toml Normal file
View File

@ -0,0 +1,17 @@
[package]
name = "config"
description = "InfluxDB IOx configuration management"
version = "0.1.0"
authors = ["Andrew Lamb <andrew@nerdnetworks.org>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
snafu = "0.6"
dotenv = "0.15.0"
dirs = "3.0.1"
[dev-dependencies]
test_helpers = { path = "../test_helpers" }

495
config/src/item.rs Normal file
View File

@ -0,0 +1,495 @@
//! Contains individual config item definitions
use std::{
net::SocketAddr,
path::{Path, PathBuf},
};
/// Represents a single typed configuration item, specified as a
/// name=value pair.
///
/// This metadata is present so that IOx can programatically create
/// readable config files and provide useful help and debugging
/// information
///
/// The type parameter `T` is the type of the value of the
/// configuration item
pub(crate) trait ConfigItem<T> {
/// Return the name of the config value (environment variable name)
fn name(&self) -> &'static str;
/// A one sentence description
fn short_description(&self) -> String;
/// Default value, if any
fn default(&self) -> Option<String> {
None
}
/// An optional example value
fn example(&self) -> Option<String> {
// (if default is provided, there is often no need for an
// additional example)
self.default()
}
/// An optional longer form description,
fn long_description(&self) -> Option<String> {
None
}
/// Parses an instance of this `ConfigItem` item from an optional
/// string representation, the value of `default()` is passed.
///
/// If an empty value is not valid for this item, an error should
/// be returned.
///
/// If an empty value is valid, then the config item should return
/// a value that encodes an empty value correctly.
fn parse(&self, val: Option<&str>) -> std::result::Result<T, String>;
/// Convert a parsed value of this config item (that came from a
/// call to `parse`) back into a String, for display purposes
fn unparse(&self, val: &T) -> String;
/// Display the value of an individual ConfigItem. If verbose is
/// true, shows full help information, if false, shows minimal
/// name=value env variable form
fn display(&self, f: &mut std::fmt::Formatter<'_>, val: &T, verbose: bool) -> std::fmt::Result {
let val = self.unparse(val);
if verbose {
writeln!(f, "-----------------")?;
writeln!(f, "{}: {}", self.name(), self.short_description())?;
writeln!(f, " current value: {}", val)?;
if let Some(default) = self.default() {
writeln!(f, " default value: {}", default)?;
}
if let Some(example) = self.example() {
if self.default() != self.example() {
writeln!(f, " example value: {}", example)?;
}
}
if let Some(long_description) = self.long_description() {
writeln!(f)?;
writeln!(f, "{}", long_description)?;
}
} else if !val.is_empty() {
write!(f, "{}={}", self.name(), val)?;
// also add a note if it is different than the default value
if let Some(default) = self.default() {
if default != val {
write!(f, " # (default {})", default)?;
}
}
writeln!(f)?;
}
Ok(())
}
}
pub(crate) struct HttpBindAddr {}
impl ConfigItem<SocketAddr> for HttpBindAddr {
fn name(&self) -> &'static str {
"INFLUXDB_IOX_BIND_ADDR"
}
fn short_description(&self) -> String {
"HTTP bind address".into()
}
fn default(&self) -> Option<String> {
Some("127.0.0.1:8080".into())
}
fn long_description(&self) -> Option<String> {
Some("The address on which IOx will serve HTTP API requests".into())
}
fn parse(&self, val: Option<&str>) -> std::result::Result<SocketAddr, String> {
let addr: &str = val.ok_or_else(|| String::from("Empty value is not valid"))?;
addr.parse()
.map_err(|e| format!("Error parsing as SocketAddress address: {}", e))
}
fn unparse(&self, val: &SocketAddr) -> String {
format!("{:?}", val)
}
}
pub(crate) struct GrpcBindAddr {}
impl ConfigItem<SocketAddr> for GrpcBindAddr {
fn name(&self) -> &'static str {
"INFLUXDB_IOX_GRPC_BIND_ADDR"
}
fn short_description(&self) -> String {
"gRPC bind address".into()
}
fn default(&self) -> Option<String> {
Some("127.0.0.1:8082".into())
}
fn long_description(&self) -> Option<String> {
Some("The address on which IOx will serve Storage gRPC API requests".into())
}
fn parse(&self, val: Option<&str>) -> std::result::Result<SocketAddr, String> {
let addr: &str = val.ok_or_else(|| String::from("Empty value is not valid"))?;
addr.parse()
.map_err(|e| format!("Error parsing as SocketAddress address: {}", e))
}
fn unparse(&self, val: &SocketAddr) -> String {
format!("{:?}", val)
}
}
pub(crate) struct DBDir {}
impl ConfigItem<PathBuf> for DBDir {
fn name(&self) -> &'static str {
"INFLUXDB_IOX_DB_DIR"
}
fn short_description(&self) -> String {
"Where to store files on disk:".into()
}
// default database path is $HOME/.influxdb_iox
fn default(&self) -> Option<String> {
dirs::home_dir()
.map(|mut path| {
path.push(".influxdb_iox");
path
})
.and_then(|dir| dir.to_str().map(|s| s.to_string()))
}
fn long_description(&self) -> Option<String> {
Some("The location InfluxDB IOx will use to store files locally".into())
}
fn parse(&self, val: Option<&str>) -> std::result::Result<PathBuf, String> {
let location: &str = val.ok_or_else(|| String::from("database directory not specified"))?;
Ok(Path::new(location).into())
}
fn unparse(&self, val: &PathBuf) -> String {
// path came from a string, so it should be able to go back
val.as_path().to_str().unwrap().to_string()
}
}
pub(crate) struct WriterID {}
impl ConfigItem<Option<u32>> for WriterID {
fn name(&self) -> &'static str {
"INFLUXDB_IOX_ID"
}
fn short_description(&self) -> String {
"The identifier for the server".into()
}
// There is no default datbase ID (on purpose)
fn long_description(&self) -> Option<String> {
Some(
"The identifier for the server. Used for writing to object storage and as\
an identifier that is added to replicated writes, WAL segments and Chunks. \
Must be unique in a group of connected or semi-connected IOx servers. \
Must be a number that can be represented by a 32-bit unsigned integer."
.into(),
)
}
fn parse(&self, val: Option<&str>) -> std::result::Result<Option<u32>, String> {
val.map(|val| {
val.parse::<u32>()
.map_err(|e| format!("Error parsing {} as a u32:: {}", val, e))
})
.transpose()
}
fn unparse(&self, val: &Option<u32>) -> String {
if let Some(val) = val.as_ref() {
format!("{}", val)
} else {
"".into()
}
}
}
pub(crate) struct GCPBucket {}
impl ConfigItem<Option<String>> for GCPBucket {
fn name(&self) -> &'static str {
"INFLUXDB_IOX_GCP_BUCKET"
}
fn short_description(&self) -> String {
"The bucket name, if using Google Cloud Storage as an object store".into()
}
fn example(&self) -> Option<String> {
Some("bucket_name".into())
}
fn long_description(&self) -> Option<String> {
Some(
"If using Google Cloud Storage for the object store, this item, \
as well as SERVICE_ACCOUNT must be set."
.into(),
)
}
fn parse(&self, val: Option<&str>) -> std::result::Result<Option<String>, String> {
Ok(val.map(|s| s.to_string()))
}
fn unparse(&self, val: &Option<String>) -> String {
if let Some(val) = val.as_ref() {
val.to_string()
} else {
"".into()
}
}
}
/// This value simply passed into the environment and used by the
/// various loggign / tracing libraries. It has its own structure here
/// for documentation purposes and so it can be loaded from config file.
pub(crate) struct RustLog {}
impl ConfigItem<Option<String>> for RustLog {
fn name(&self) -> &'static str {
"RUST_LOG"
}
fn short_description(&self) -> String {
"Rust logging level".into()
}
fn default(&self) -> Option<String> {
Some("warn".into())
}
fn example(&self) -> Option<String> {
Some("debug,hyper::proto::h1=info".into())
}
fn long_description(&self) -> Option<String> {
Some(
"This controls the IOx server logging level, as described in \
https://crates.io/crates/env_logger. Levels for different modules can \
be specified as well. For example `debug,hyper::proto::h1=info` \
specifies debug logging for all modules except for the `hyper::proto::h1' module \
which will only display info level logging."
.into(),
)
}
fn parse(&self, val: Option<&str>) -> std::result::Result<Option<String>, String> {
Ok(val.map(|s| s.to_string()))
}
fn unparse(&self, val: &Option<String>) -> String {
if let Some(val) = val.as_ref() {
val.to_string()
} else {
"".into()
}
}
}
/// This value simply passed into the environment and used by open
/// telemetry create. It has its own structure here
/// for documentation purposes and so it can be loaded from config file.
pub(crate) struct OTJaegerAgentHost {}
impl ConfigItem<Option<String>> for OTJaegerAgentHost {
fn name(&self) -> &'static str {
"OTEL_EXPORTER_JAEGER_AGENT_HOST"
}
fn short_description(&self) -> String {
"Open Telemetry Jaeger Host".into()
}
fn example(&self) -> Option<String> {
Some("jaeger.influxdata.net".into())
}
fn long_description(&self) -> Option<String> {
Some("If set, Jaeger traces are emitted to this host \
using the OpenTelemetry tracer.\n\n\
\
NOTE: The OpenTelemetry agent CAN ONLY be \
configured using environment variables. It CAN NOT be configured \
using the IOx config file at this time. Some useful variables:\n \
* OTEL_SERVICE_NAME: emitter service name (iox by default)\n \
* OTEL_EXPORTER_JAEGER_AGENT_HOST: hostname/address of the collector\n \
* OTEL_EXPORTER_JAEGER_AGENT_PORT: listening port of the collector.\n\n\
\
The entire list of variables can be found in \
https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/sdk-environment-variables.md#jaeger-exporter".into())
}
fn parse(&self, val: Option<&str>) -> std::result::Result<Option<String>, String> {
Ok(val.map(|s| s.to_string()))
}
fn unparse(&self, val: &Option<String>) -> String {
if let Some(val) = val.as_ref() {
val.to_string()
} else {
"".into()
}
}
}
#[cfg(test)]
mod test {
use test_helpers::{assert_contains, assert_not_contains};
use super::*;
use std::fmt;
#[test]
fn test_display() {
let item: TestConfigItem = Default::default();
let val = String::from("current_value");
assert_eq!(
item.convert_to_string(&val),
"TEST_CONFIG_ITEM=current_value\n"
);
let verbose_string = item.convert_to_verbose_string(&val);
assert_contains!(&verbose_string, "TEST_CONFIG_ITEM: short_description");
assert_contains!(&verbose_string, "current value: current_value");
assert_not_contains!(&verbose_string, "default");
assert_not_contains!(&verbose_string, "example");
}
#[test]
fn test_display_verbose_with_default() {
let item = TestConfigItem {
default: Some("the_default_value".into()),
..Default::default()
};
let val = String::from("current_value");
assert_eq!(
item.convert_to_string(&val),
"TEST_CONFIG_ITEM=current_value # (default the_default_value)\n"
);
let verbose_string = item.convert_to_verbose_string(&val);
assert_contains!(&verbose_string, "TEST_CONFIG_ITEM: short_description");
assert_contains!(&verbose_string, "current value: current_value");
assert_contains!(&verbose_string, "default value: the_default_value");
assert_not_contains!(&verbose_string, "example");
}
#[test]
fn test_display_verbose_with_default_and_different_example() {
let item = TestConfigItem {
default: Some("the_default_value".into()),
example: Some("the_example_value".into()),
..Default::default()
};
let val = String::from("current_value");
assert_eq!(
item.convert_to_string(&val),
"TEST_CONFIG_ITEM=current_value # (default the_default_value)\n"
);
let verbose_string = item.convert_to_verbose_string(&val);
assert_contains!(&verbose_string, "TEST_CONFIG_ITEM: short_description");
assert_contains!(&verbose_string, "current value: current_value");
assert_contains!(&verbose_string, "default value: the_default_value");
assert_contains!(&verbose_string, "example value: the_example_value");
}
#[test]
fn test_display_verbose_with_same_default_and_example() {
let item = TestConfigItem {
default: Some("the_value".into()),
example: Some("the_value".into()),
..Default::default()
};
let val = String::from("current_value");
assert_eq!(
item.convert_to_string(&val),
"TEST_CONFIG_ITEM=current_value # (default the_value)\n"
);
let verbose_string = item.convert_to_verbose_string(&val);
assert_contains!(&verbose_string, "TEST_CONFIG_ITEM: short_description");
assert_contains!(&verbose_string, "current value: current_value");
assert_contains!(&verbose_string, "default value: the_value");
assert_not_contains!(&verbose_string, "example");
}
#[test]
fn test_display_verbose_with_long_description() {
let item = TestConfigItem {
long_description: Some("this is a long description".into()),
..Default::default()
};
let val = String::from("current_value");
assert_eq!(
item.convert_to_string(&val),
"TEST_CONFIG_ITEM=current_value\n"
);
let verbose_string = item.convert_to_verbose_string(&val);
assert_contains!(&verbose_string, "TEST_CONFIG_ITEM: short_description");
assert_contains!(&verbose_string, "current value: current_value");
assert_contains!(&verbose_string, "this is a long description\n");
assert_not_contains!(&verbose_string, "example");
}
#[derive(Debug, Default)]
struct TestConfigItem {
pub default: Option<String>,
pub example: Option<String>,
pub long_description: Option<String>,
}
impl ConfigItem<String> for TestConfigItem {
fn name(&self) -> &'static str {
"TEST_CONFIG_ITEM"
}
fn short_description(&self) -> String {
"short_description".into()
}
fn parse(&self, val: Option<&str>) -> Result<String, String> {
Ok(val.map(|s| s.to_string()).unwrap_or_else(|| "".into()))
}
fn unparse(&self, val: &String) -> String {
val.to_string()
}
fn default(&self) -> Option<String> {
self.default.clone()
}
fn example(&self) -> Option<String> {
self.example.clone()
}
fn long_description(&self) -> Option<String> {
self.long_description.clone()
}
}
impl TestConfigItem {
/// convert the value to a string using the specified value
fn convert_to_string(&self, val: &str) -> String {
let val: String = val.to_string();
struct Wrapper<'a>(&'a TestConfigItem, &'a String);
impl<'a> fmt::Display for Wrapper<'a> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.display(f, self.1, false)
}
}
format!("{}", Wrapper(self, &val))
}
/// convert the value to a verbose string using the specified value
fn convert_to_verbose_string(&self, val: &str) -> String {
let val: String = val.to_string();
struct Wrapper<'a>(&'a TestConfigItem, &'a String);
impl<'a> fmt::Display for Wrapper<'a> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.display(f, self.1, true)
}
}
format!("{}", Wrapper(self, &val))
}
}
}

596
config/src/lib.rs Normal file
View File

@ -0,0 +1,596 @@
//! This crate contains the IOx "configuration management system" such
//! as it is.
//!
//! IOx follows [The 12 Factor
//! Application Guidance](https://12factor.net/config) in respect to
//! configuration, and thus expects to read its configuration from the
//! environment.
//!
//! To facilitate local development and testing, configuration values
//! can also be stored in a "config" file, which defaults to
//! `$HOME/.influxdb_iox/config`
//!
//! Configuration values are name=value pairs in the style of
//! [dotenv](https://crates.io/crates/dotenv), meant to closely mirror
//! environment variables.
//!
//! If there is a value specified both in the environment *and* in the
//! config file, the value in the environment will take precidence
//! over the value in the config file, and a warning will be displayed.
//!
//! Note that even though IOx reads all configuration values from the
//! environment, *internally* IOx code should use this structure
//! rather than reading on the environment (which are process-wide
//! global variables) directly. Not only does this consolidate the
//! configuration in a single location, it avoids all the bad side
//! effects of global variables.
use std::{
collections::HashMap,
fmt,
net::SocketAddr,
path::{Path, PathBuf},
};
use snafu::{ResultExt, Snafu};
mod item;
use item::ConfigItem;
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Error loading env config file '{:?}': {}", config_path, source))]
LoadingConfigFile {
config_path: PathBuf,
source: dotenv::Error,
},
#[snafu(display("Error parsing env config file '{:?}': {}", config_path, source))]
ParsingConfigFile {
config_path: PathBuf,
source: dotenv::Error,
},
#[snafu(display(
"Error setting config '{}' to value '{}': {}",
config_name,
config_value,
message
))]
ValidationError {
config_name: String,
config_value: String,
message: String,
},
#[snafu(display(
"Error setting config '{}'. Expected value like '{}'. Got value '{}': : {}",
config_name,
example,
config_value,
message
))]
ValidationErrorWithExample {
config_name: String,
example: String,
config_value: String,
message: String,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// The InfluxDB application configuration. This struct provides typed
/// access to all of IOx's configuration values.
#[derive(Debug)]
pub struct Config {
//--- Logging ---
/// basic logging level
pub rust_log: Option<String>,
/// Open Telemetry Jaeger hostname
pub otel_jaeger_host: Option<String>,
/// Database Writer ID (TODO make this mandatory)
pub writer_id: Option<u32>,
/// port to listen for HTTP API
pub http_bind_address: SocketAddr,
/// port to listen for gRPC API
pub grpc_bind_address: SocketAddr,
/// Directory to store local database files
pub database_directory: PathBuf,
// --- GCP fields ---
/// GCP object store bucekt
pub gcp_bucket: Option<String>,
}
impl Config {
/// Create the configuration object from a set of
/// name=value pairs
///
/// ADD NEW CONFIG VALUES HERE
fn new_from_map(name_values: HashMap<String, String>) -> Result<Self> {
use item::*;
Ok(Config {
rust_log: Self::parse_config(&name_values, &RustLog {})?,
otel_jaeger_host: Self::parse_config(&name_values, &OTJaegerAgentHost {})?,
writer_id: Self::parse_config(&name_values, &WriterID {})?,
http_bind_address: Self::parse_config(&name_values, &HttpBindAddr {})?,
grpc_bind_address: Self::parse_config(&name_values, &GrpcBindAddr {})?,
database_directory: Self::parse_config(&name_values, &DBDir {})?,
gcp_bucket: Self::parse_config(&name_values, &GCPBucket {})?,
})
}
/// Displays this config, item by item.
///
/// ADD NEW CONFIG VALUES HERE
fn display_items(&self, f: &mut fmt::Formatter<'_>, verbose: bool) -> fmt::Result {
use item::*;
RustLog {}.display(f, &self.rust_log, verbose)?;
OTJaegerAgentHost {}.display(f, &self.otel_jaeger_host, verbose)?;
WriterID {}.display(f, &self.writer_id, verbose)?;
HttpBindAddr {}.display(f, &self.http_bind_address, verbose)?;
GrpcBindAddr {}.display(f, &self.grpc_bind_address, verbose)?;
DBDir {}.display(f, &self.database_directory, verbose)?;
GCPBucket {}.display(f, &self.gcp_bucket, verbose)?;
Ok(())
}
/// returns the location of the default config file: ~/.influxdb_iox/config
pub fn default_config_file() -> PathBuf {
dirs::home_dir()
.map(|a| a.join(".influxdb_iox").join("config"))
.expect("Can not find home directory")
}
/// Creates a new Config object by reading from specified file, in
/// dotenv format, and from the the provided values.
///
/// Any values in `env_values` override the values in the file
///
/// Returns an error if there is a problem reading the specified
/// file or any validation fails
fn try_from_path_then_map(
config_path: &Path,
env_values: HashMap<String, String>,
) -> Result<Self> {
// load initial values from file
//
// Note, from_filename_iter method got "undeprecated" but that change is not yet
// released: https://github.com/dotenv-rs/dotenv/pull/54
#[allow(deprecated)]
let parsed_values: Vec<(String, String)> = dotenv::from_filename_iter(config_path)
.context(LoadingConfigFile { config_path })?
.map(|item| item.context(ParsingConfigFile { config_path }))
.collect::<Result<_>>()?;
let mut name_values: HashMap<String, String> = parsed_values.into_iter().collect();
// Apply values in `env_values` as an override to anything
// found in config file, warning if so
for (name, env_val) in env_values.iter() {
let file_value = name_values.get(name);
if let Some(file_value) = file_value {
if file_value != env_val {
eprintln!(
"WARNING value for configuration item {} in file, '{}' hidden by value in environment '{}'",
name, file_value, env_val
);
}
}
}
// Now, mash in the values
for (name, env_val) in env_values.into_iter() {
name_values.insert(name, env_val);
}
Self::new_from_map(name_values)
}
/// creates a new Config object by reading from specified file, in
/// dotenv format, and from the the provided values.
///
/// Any values in `name_values` override the values in the file
///
/// Returns an error if there is a problem reading the specified
/// file or any validation fails
pub fn try_from_path(config_path: &Path) -> Result<Self> {
let name_values: HashMap<String, String> = std::env::vars().collect();
Self::try_from_path_then_map(config_path, name_values)
}
/// creates a new Config object by reading from the environment variables
/// only.
///
/// Returns an error if any validation fails
pub fn new_from_env() -> Result<Self> {
// get all name/value pairs into a map and feed the config values one by one
let name_values: HashMap<String, String> = std::env::vars().collect();
Self::new_from_map(name_values)
}
/// Parse a single configuration item described with item and
/// returns the value parsed
fn parse_config<T>(
name_values: &HashMap<String, String>,
item: &impl ConfigItem<T>,
) -> Result<T> {
let config_name = item.name();
let config_value = name_values
.get(config_name)
.map(|s| s.to_owned())
// If no config value was specified in the map, use the default from the item
.or_else(|| item.default());
item.parse(config_value.as_ref().map(|s| s.as_ref()))
.map_err(|message| {
let config_value = config_value.unwrap_or_else(|| "".into());
let example = item.example().or_else(|| item.default());
if let Some(example) = example {
Error::ValidationErrorWithExample {
config_name: config_name.into(),
config_value,
example,
message,
}
} else {
Error::ValidationError {
config_name: config_name.into(),
config_value,
message,
}
}
})
}
/// return something which can be formatted using "{}" that gives
/// detailed information about each config value
pub fn verbose_display(&self) -> impl fmt::Display + '_ {
struct Wrapper<'a>(&'a Config);
impl<'a> fmt::Display for Wrapper<'a> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.display_items(f, true)
}
}
Wrapper(self)
}
}
impl fmt::Display for Config {
/// Default display is the minimal configuration
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.display_items(f, false)
}
}
#[cfg(test)]
mod test {
use test_helpers::{assert_contains, make_temp_file};
use super::*;
// End to end positive test case for all valid config values to validate they
// are hooked up
#[test]
fn test_all_values() {
let name_values: HashMap<String, String> = vec![
("INFLUXDB_IOX_BIND_ADDR".into(), "127.0.0.1:1010".into()),
(
"INFLUXDB_IOX_GRPC_BIND_ADDR".into(),
"127.0.0.2:2020".into(),
),
("INFLUXDB_IOX_DB_DIR".into(), "/foo/bar".into()),
("INFLUXDB_IOX_ID".into(), "42".into()),
("INFLUXDB_IOX_GCP_BUCKET".into(), "my_bucket".into()),
("RUST_LOG".into(), "rust_log_level".into()),
(
"OTEL_EXPORTER_JAEGER_AGENT_HOST".into(),
"example.com".into(),
),
]
.into_iter()
.collect();
let config = Config::new_from_map(name_values).unwrap();
assert_eq!(config.rust_log, Some("rust_log_level".into()));
assert_eq!(config.otel_jaeger_host, Some("example.com".into()));
assert_eq!(config.writer_id, Some(42));
assert_eq!(config.http_bind_address.to_string(), "127.0.0.1:1010");
assert_eq!(config.grpc_bind_address.to_string(), "127.0.0.2:2020");
assert_eq!(config.gcp_bucket, Some("my_bucket".into()));
}
#[test]
fn test_display() {
let config = Config::new_from_map(HashMap::new()).expect("IOx can work with just defaults");
let config_display = normalize(&format!("{}", config));
// test that the basic output is connected
assert_contains!(&config_display, "INFLUXDB_IOX_DB_DIR=$HOME/.influxdb_iox");
assert_contains!(&config_display, "INFLUXDB_IOX_BIND_ADDR=127.0.0.1:8080");
}
#[test]
fn test_verbose_display() {
let config = Config::new_from_map(HashMap::new()).expect("IOx can work with just defaults");
let config_display = normalize(&format!("{}", config.verbose_display()));
// test that the basic output is working and connected
assert_contains!(
&config_display,
r#"INFLUXDB_IOX_BIND_ADDR: HTTP bind address
current value: 127.0.0.1:8080
default value: 127.0.0.1:8080
The address on which IOx will serve HTTP API requests
"#
);
}
#[test]
fn test_default_config_file() {
assert_eq!(
&normalize(&Config::default_config_file().to_string_lossy()),
"$HOME/.influxdb_iox/config"
);
}
#[test]
fn test_default() {
// Since the actual implementation uses env variables
// directly, the tests use a different source
let config = Config::new_from_map(HashMap::new()).expect("IOx can work with just defaults");
// Spot some values to make sure they look good
assert_eq!(
&normalize(&config.database_directory.to_string_lossy()),
"$HOME/.influxdb_iox"
);
assert_eq!(&config.http_bind_address.to_string(), "127.0.0.1:8080");
// config items without default shouldn't be set
assert_eq!(config.otel_jaeger_host, None);
}
#[test]
fn test_default_override() {
let name_values: HashMap<String, String> = vec![
("INFLUXDB_IOX_BIND_ADDR".into(), "127.0.0.1:1010".into()),
(
"INFLUXDB_IOX_GRPC_BIND_ADDR".into(),
"127.0.0.2:2020".into(),
),
]
.into_iter()
.collect();
let config = Config::new_from_map(name_values).unwrap();
assert_eq!(&config.http_bind_address.to_string(), "127.0.0.1:1010");
assert_eq!(&config.grpc_bind_address.to_string(), "127.0.0.2:2020");
}
#[test]
fn test_vars_from_file() {
let config_file = make_temp_file(
"INFLUXDB_IOX_BIND_ADDR=127.0.0.3:3030\n\
INFLUXDB_IOX_GRPC_BIND_ADDR=127.0.0.4:4040",
);
let config = Config::try_from_path_then_map(config_file.path(), HashMap::new()).unwrap();
assert_eq!(&config.http_bind_address.to_string(), "127.0.0.3:3030");
assert_eq!(&config.grpc_bind_address.to_string(), "127.0.0.4:4040");
}
#[test]
fn test_vars_from_env_take_precidence() {
// Given variables specified in both the config file and the
// environment, the ones in the environment take precidence
let config_file = make_temp_file(
"INFLUXDB_IOX_BIND_ADDR=127.0.0.3:3030\n\
INFLUXDB_IOX_GRPC_BIND_ADDR=127.0.0.4:4040",
);
let name_values: HashMap<String, String> = vec![
("INFLUXDB_IOX_BIND_ADDR".into(), "127.0.0.1:1010".into()),
(
"INFLUXDB_IOX_GRPC_BIND_ADDR".into(),
"127.0.0.2:2020".into(),
),
]
.into_iter()
.collect();
let config = Config::try_from_path_then_map(config_file.path(), name_values).unwrap();
assert_eq!(&config.http_bind_address.to_string(), "127.0.0.1:1010");
assert_eq!(&config.grpc_bind_address.to_string(), "127.0.0.2:2020");
}
#[test]
fn test_vars_from_non_existent_file() {
let config_file = make_temp_file(
"INFLUXDB_IOX_BIND_ADDR=127.0.0.3:3030\n\
INFLUXDB_IOX_GRPC_BIND_ADDR=127.0.0.4:4040",
);
let dangling_path: PathBuf = config_file.path().into();
std::mem::drop(config_file); // force the temp file to be removed
let result = Config::try_from_path_then_map(&dangling_path, HashMap::new());
let error_message = format!("{}", result.unwrap_err());
assert_contains!(&error_message, "Error loading env config file");
assert_contains!(&error_message, dangling_path.to_string_lossy());
assert_contains!(&error_message, "path not found");
}
/// test for using variable substitution in config file (.env style)
/// it is really a test for .env but I use this test to document
/// the expected behavior of iox config files.
#[test]
fn test_var_substitution_in_file() {
let config_file = make_temp_file(
"THE_HOSTNAME=127.0.0.1\n\
INFLUXDB_IOX_BIND_ADDR=${THE_HOSTNAME}:3030\n\
INFLUXDB_IOX_GRPC_BIND_ADDR=${THE_HOSTNAME}:4040",
);
let config = Config::try_from_path_then_map(config_file.path(), HashMap::new()).unwrap();
assert_eq!(&config.http_bind_address.to_string(), "127.0.0.1:3030");
assert_eq!(&config.grpc_bind_address.to_string(), "127.0.0.1:4040");
}
/// test for using variable substitution in config file with
/// existing environment. Again, this is really a test for .env
/// but I use this test to document the expected behavior of iox
/// config files.
#[test]
fn test_var_substitution_in_file_from_env() {
std::env::set_var("MY_AWESOME_HOST", "192.100.100.42");
let config_file = make_temp_file("INFLUXDB_IOX_BIND_ADDR=${MY_AWESOME_HOST}:3030\n");
let config = Config::try_from_path_then_map(config_file.path(), HashMap::new()).unwrap();
assert_eq!(&config.http_bind_address.to_string(), "192.100.100.42:3030");
std::env::remove_var("MY_AWESOME_HOST");
}
/// test for using comments in config file (.env style)
/// it is really a test for .env but I use this test to document
/// the expected behavior of iox config files.
#[test]
fn test_comments_in_config_file() {
let config_file = make_temp_file(
"#INFLUXDB_IOX_BIND_ADDR=127.0.0.3:3030\n\
INFLUXDB_IOX_GRPC_BIND_ADDR=127.0.0.4:4040",
);
let config = Config::try_from_path_then_map(config_file.path(), HashMap::new()).unwrap();
// Should have the default value as the config file's value is commented out
assert_eq!(&config.http_bind_address.to_string(), "127.0.0.1:8080");
// Should have the value in the config file
assert_eq!(&config.grpc_bind_address.to_string(), "127.0.0.4:4040");
}
#[test]
fn test_invalid_config_data() {
let config_file = make_temp_file(
"HOSTNAME=127.0.0.1\n\
INFLUXDB_IOX_BIND_ADDR=${HO",
);
let error_message = Config::try_from_path_then_map(config_file.path(), HashMap::new())
.unwrap_err()
.to_string();
assert_contains!(&error_message, "Error parsing env config file");
assert_contains!(&error_message, config_file.path().to_string_lossy());
assert_contains!(&error_message, "'${HO', error at line index: 3");
}
#[test]
fn test_parse_config_value() {
let empty_values = HashMap::<String, String>::new();
let name_values: HashMap<String, String> = vec![
("TEST_CONFIG".into(), "THE_VALUE".into()),
("SOMETHING ELSE".into(), "QUE PASA?".into()),
]
.into_iter()
.collect();
let item_without_default = ConfigItemWithoutDefault {};
let error_message: String = Config::parse_config(&empty_values, &item_without_default)
.unwrap_err()
.to_string();
assert_eq!(error_message, "Error setting config 'TEST_CONFIG' to value '': no value specified for ConfigItemWithoutDefault");
assert_eq!(
Config::parse_config(&name_values, &item_without_default).unwrap(),
"THE_VALUE"
);
let item_with_default = ConfigItemWithDefault {};
assert_eq!(
Config::parse_config(&empty_values, &item_with_default).unwrap(),
"THE_DEFAULT"
);
assert_eq!(
Config::parse_config(&name_values, &item_without_default).unwrap(),
"THE_VALUE"
);
let item_without_default = ConfigItemWithoutDefaultButWithExample {};
let error_message: String = Config::parse_config(&empty_values, &item_without_default)
.unwrap_err()
.to_string();
assert_eq!(error_message, "Error setting config 'TEST_CONFIG'. Expected value like 'THE_EXAMPLE'. Got value '': : no value specified for ConfigItemWithoutDefaultButWithExample");
assert_eq!(
Config::parse_config(&name_values, &item_without_default).unwrap(),
"THE_VALUE"
);
}
/// normalizes things in a config file that change in different
/// environments (e.g. a home directory)
fn normalize(v: &str) -> String {
let home_dir: String = dirs::home_dir().unwrap().to_string_lossy().into();
v.replace(&home_dir, "$HOME")
}
struct ConfigItemWithDefault {}
impl ConfigItem<String> for ConfigItemWithDefault {
fn name(&self) -> &'static str {
"TEST_CONFIG"
}
fn short_description(&self) -> String {
"A test config".into()
}
fn default(&self) -> Option<String> {
Some("THE_DEFAULT".into())
}
fn parse(&self, val: Option<&str>) -> std::result::Result<String, String> {
val.map(|s| s.to_string())
.ok_or_else(|| String::from("no value specified for ConfigItemWithDefault"))
}
fn unparse(&self, val: &String) -> String {
val.to_string()
}
}
struct ConfigItemWithoutDefault {}
impl ConfigItem<String> for ConfigItemWithoutDefault {
fn name(&self) -> &'static str {
"TEST_CONFIG"
}
fn short_description(&self) -> String {
"A test config".into()
}
fn parse(&self, val: Option<&str>) -> std::result::Result<String, String> {
val.map(|s| s.to_string())
.ok_or_else(|| String::from("no value specified for ConfigItemWithoutDefault"))
}
fn unparse(&self, val: &String) -> String {
val.to_string()
}
}
struct ConfigItemWithoutDefaultButWithExample {}
impl ConfigItem<String> for ConfigItemWithoutDefaultButWithExample {
fn name(&self) -> &'static str {
"TEST_CONFIG"
}
fn short_description(&self) -> String {
"A test config".into()
}
fn example(&self) -> Option<String> {
Some("THE_EXAMPLE".into())
}
fn parse(&self, val: Option<&str>) -> std::result::Result<String, String> {
val.map(|s| s.to_string()).ok_or_else(|| {
String::from("no value specified for ConfigItemWithoutDefaultButWithExample")
})
}
fn unparse(&self, val: &String) -> String {
val.to_string()
}
}
}

View File

@ -1,31 +0,0 @@
# This is an example .env file showing all of the environment variables that can
# be configured within the project.
#
# The identifier for the server. Used for writing to object storage and as
# an identifier that is added to replicated writes, WAL segments and Chunks.
# Must be unique in a group of connected or semi-connected IOx servers.
# Must be a number that can be represented by a 32-bit unsigned integer.
# INFLUXDB_IOX_ID=1
#
# Where to store files on disk:
# INFLUXDB_IOX_DB_DIR=$HOME/.influxdb_iox
# TEST_INFLUXDB_IOX_DB_DIR=$HOME/.influxdb_iox
#
# Addresses for the server processes:
# INFLUXDB_IOX_BIND_ADDR=127.0.0.1:8080
# INFLUXDB_IOX_GRPC_BIND_ADDR=127.0.0.1:8082
#
# If using Amazon S3 as an object store:
# AWS_ACCESS_KEY_ID=access_key_value
# AWS_SECRET_ACCESS_KEY=secret_access_key_value
# AWS_DEFAULT_REGION=us-east-2
# AWS_S3_BUCKET_NAME=bucket-name
#
# If using Google Cloud Storage as an object store:
# GCS_BUCKET_NAME=bucket_name
# SERVICE_ACCOUNT=/path/to/auth/info.json
#
# To enable Jaeger tracing:
# OTEL_SERVICE_NAME="iox" # defaults to iox
# OTEL_EXPORTER_JAEGER_AGENT_HOST="jaeger.influxdata.net"
# OTEL_EXPORTER_JAEGER_AGENT_PORT="6831"

82
src/commands/config.rs Normal file
View File

@ -0,0 +1,82 @@
//! Implementation of command line option for manipulating and showing server
//! config
use config::Config;
pub fn show_config(ignore_config_file: bool) {
let verbose = false;
let config = load_config(verbose, ignore_config_file);
println!("{}", config);
}
pub fn describe_config(ignore_config_file: bool) {
let verbose = true;
let config = load_config(verbose, ignore_config_file);
println!("InfluxDB IOx Configuration:");
println!("{}", config.verbose_display());
}
/// Loads the configuration information for IOx, and `abort`s the
/// process on error.
///
/// The rationale for panic is that anything wrong with the config is
/// should be fixed now rather then when it is used subsequently
///
/// If verbose is true, then messages are printed to stdout
pub fn load_config(verbose: bool, ignore_config_file: bool) -> Config {
// Default configuraiton file is ~/.influxdb_iox/config
let default_config_file = Config::default_config_file();
// Try and create a useful error message / warnings
let read_from_file = match (ignore_config_file, default_config_file.exists()) {
// config exists but we got told to ignore if
(true, true) => {
println!("WARNING: Ignoring config file {:?}", default_config_file);
false
}
// we got told to ignore the config file, but it didn't exist anyways
(true, false) => {
if verbose {
println!(
"Loading config from environment (ignoring non existent file {:?})",
default_config_file
);
}
false
}
(false, true) => {
if verbose {
println!(
"Loading config from file and environment (file: {:?})",
default_config_file
);
}
true
}
(false, false) => {
if verbose {
println!(
"Loading config from environment (file: {:?} not found)",
default_config_file
);
}
false
}
};
//
let config = if read_from_file {
Config::try_from_path(&default_config_file)
} else {
Config::new_from_env()
};
match config {
Err(e) => {
eprintln!("FATAL Error loading config: {}", e);
eprintln!("Aborting");
std::process::exit(1);
}
Ok(config) => config,
}
}

122
src/commands/logging.rs Normal file
View File

@ -0,0 +1,122 @@
//! Logging initization and setup
use config::Config;
use tracing_subscriber::{prelude::*, EnvFilter};
/// Handles setting up logging levels
#[derive(Debug)]
pub enum LoggingLevel {
// Default log level is warn level for all components
Default,
// Verbose log level is info level for all components
Verbose,
// Debug log level is debug for everything except
// some especially noisy low level libraries
Debug,
}
impl LoggingLevel {
/// Creates a logging level usig the following rules.
///
/// 1. if `-vv` (multiple instances of verbose), use Debug
/// 2. if `-v` (single instances of verbose), use Verbose
/// 3. Otherwise use Default
pub fn new(num_verbose: u64) -> Self {
match num_verbose {
0 => Self::Default,
1 => Self::Verbose,
_ => Self::Debug,
}
}
/// set RUST_LOG to the level represented by self, unless RUST_LOG
/// is already set
fn set_rust_log_if_needed(&self) {
let rust_log_env = std::env::var("RUST_LOG");
/// Default debug level is debug for everything except
/// some especially noisy low level libraries
const DEFAULT_DEBUG_LOG_LEVEL: &str = "debug,hyper::proto::h1=info,h2=info";
// Default verbose log level is info level for all components
const DEFAULT_VERBOSE_LOG_LEVEL: &str = "info";
// Default log level is warn level for all components
const DEFAULT_LOG_LEVEL: &str = "warn";
match rust_log_env {
Ok(lvl) => {
if !matches!(self, Self::Default) {
eprintln!(
"WARNING: Using RUST_LOG='{}' environment, ignoring -v command line",
lvl
);
}
}
Err(_) => {
match self {
Self::Default => std::env::set_var("RUST_LOG", DEFAULT_LOG_LEVEL),
Self::Verbose => std::env::set_var("RUST_LOG", DEFAULT_VERBOSE_LOG_LEVEL),
Self::Debug => std::env::set_var("RUST_LOG", DEFAULT_DEBUG_LOG_LEVEL),
};
}
}
}
/// Configures basic logging for 'simple' command line tools. Note
/// this does not setup tracing or open telemetry
pub fn setup_basic_logging(&self) {
self.set_rust_log_if_needed();
env_logger::init();
}
/// Configures logging and tracing, based on the configuration
/// values, for the IOx server (the whole enchalada)
pub fn setup_logging(&self, config: &Config) -> Option<opentelemetry_jaeger::Uninstall> {
// Copy anything from the config to the rust log environment
if let Some(rust_log) = &config.rust_log {
println!("Setting RUST_LOG: {}", rust_log);
std::env::set_var("RUST_LOG", rust_log);
}
self.set_rust_log_if_needed();
// Configure the OpenTelemetry tracer, if requested.
let (opentelemetry, drop_handle) = if config.otel_jaeger_host.is_some() {
// For now, configure open telemetry directly from the
// environment. Eventually it would be cool to document
// all of the open telemetry options in IOx and pass them
// explicitly to opentelemetry for additional visibility
let (tracer, drop_handle) = opentelemetry_jaeger::new_pipeline()
.with_service_name("iox")
.from_env()
.install()
.expect("failed to initialise the Jaeger tracing sink");
// Initialise the opentelemetry tracing layer, giving it the jaeger emitter
let opentelemetry = tracing_opentelemetry::layer().with_tracer(tracer);
(Some(opentelemetry), Some(drop_handle))
} else {
(None, None)
};
// Configure the logger to write to stderr
let logger = tracing_subscriber::fmt::layer().with_writer(std::io::stderr);
// Register the chain of event subscribers:
//
// - Jaeger tracing emitter
// - Env filter (using RUST_LOG as the filter env)
// - A stdout logger
//
tracing_subscriber::registry()
.with(opentelemetry)
.with(EnvFilter::from_default_env())
.with(logger)
.init();
drop_handle
}
}

View File

@ -2,8 +2,8 @@ use tracing::{info, warn};
use std::fs;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::sync::Arc;
use std::{env::VarError, path::PathBuf};
use crate::server::http_routes;
use crate::server::rpc::service;
@ -15,6 +15,10 @@ use query::exec::Executor as QueryExecutor;
use snafu::{ResultExt, Snafu};
use crate::panic::SendPanicsToTracing;
use super::{config::load_config, logging::LoggingLevel};
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Unable to create database directory {:?}: {}", path, source))]
@ -40,11 +44,21 @@ pub enum Error {
bind_addr,
source
))]
StartListening {
StartListeningHttp {
bind_addr: SocketAddr,
source: hyper::error::Error,
},
#[snafu(display(
"Unable to bind to listen for gRPC requests on {}: {}",
grpc_bind_addr,
source
))]
StartListeningGrpc {
grpc_bind_addr: SocketAddr,
source: std::io::Error,
},
#[snafu(display("Error serving HTTP: {}", source))]
ServingHttp { source: hyper::error::Error },
@ -56,26 +70,33 @@ pub enum Error {
pub type Result<T, E = Error> = std::result::Result<T, E>;
pub async fn main() -> Result<()> {
dotenv::dotenv().ok();
/// This is the entry point for the IOx server -- it handles
/// instantiating all state and getting things ready
pub async fn main(logging_level: LoggingLevel, ignore_config_file: bool) -> Result<()> {
// try to load the configuration before doing anything else
let verbose = false;
let config = load_config(verbose, ignore_config_file);
let db_dir = match std::env::var("INFLUXDB_IOX_DB_DIR") {
Ok(val) => val,
Err(_) => {
// default database path is $HOME/.influxdb_iox
let mut path = dirs::home_dir().unwrap();
path.push(".influxdb_iox/");
path.into_os_string().into_string().unwrap()
}
};
let _drop_handle = logging_level.setup_logging(&config);
fs::create_dir_all(&db_dir).context(CreatingDatabaseDirectory { path: &db_dir })?;
// Install custom panic handler and forget about it.
//
// This leaks the handler and prevents it from ever being dropped during the
// lifetime of the program - this is actually a good thing, as it prevents
// the panic handler from being removed while unwinding a panic (which in
// turn, causes a panic - see #548)
let f = SendPanicsToTracing::new();
std::mem::forget(f);
let object_store = if let Ok(bucket) = std::env::var("INFLUXDB_IOX_GCP_BUCKET") {
info!("Using GCP bucket {} for storage", &bucket);
ObjectStore::new_google_cloud_storage(GoogleCloudStorage::new(bucket))
let db_dir = &config.database_directory;
fs::create_dir_all(db_dir).context(CreatingDatabaseDirectory { path: db_dir })?;
let object_store = if let Some(bucket_name) = &config.gcp_bucket {
info!("Using GCP bucket {} for storage", bucket_name);
ObjectStore::new_google_cloud_storage(GoogleCloudStorage::new(bucket_name))
} else {
info!("Using local dir {} for storage", &db_dir);
info!("Using local dir {:?} for storage", db_dir);
ObjectStore::new_file(object_store::File::new(&db_dir))
};
let object_storage = Arc::new(object_store);
@ -85,14 +106,10 @@ pub async fn main() -> Result<()> {
// if this ID isn't set the server won't be usable until this is set via an API
// call
if let Ok(id) = std::env::var("INFLUXDB_IOX_ID") {
let id = id
.parse::<u32>()
.expect("INFLUXDB_IOX_ID must be a u32 integer");
info!("setting server ID to {}", id);
if let Some(id) = config.writer_id {
app_server.set_id(id).await;
} else {
warn!("server ID not set. ID must be set via the INFLUXDB_IOX_ID environment variable or via API before writing or querying data.");
warn!("server ID not set. ID must be set via the INFLUXDB_IOX_ID config or API before writing or querying data.");
}
// Fire up the query executor
@ -100,19 +117,10 @@ pub async fn main() -> Result<()> {
// Construct and start up gRPC server
let grpc_bind_addr: SocketAddr = match std::env::var("INFLUXDB_IOX_GRPC_BIND_ADDR") {
Ok(addr) => addr
.parse()
.expect("INFLUXDB_IOX_GRPC_BIND_ADDR environment variable not a valid SocketAddr"),
Err(VarError::NotPresent) => "127.0.0.1:8082".parse().unwrap(),
Err(VarError::NotUnicode(_)) => {
panic!("INFLUXDB_IOX_GRPC_BIND_ADDR environment variable not a valid unicode string")
}
};
let grpc_bind_addr = config.grpc_bind_address;
let socket = tokio::net::TcpListener::bind(grpc_bind_addr)
.await
.expect("failed to bind server");
.context(StartListeningGrpc { grpc_bind_addr })?;
let grpc_server = service::make_server(socket, app_server.clone(), executor);
@ -120,20 +128,11 @@ pub async fn main() -> Result<()> {
// Construct and start up HTTP server
let bind_addr: SocketAddr = match std::env::var("INFLUXDB_IOX_BIND_ADDR") {
Ok(addr) => addr
.parse()
.expect("INFLUXDB_IOX_BIND_ADDR environment variable not a valid SocketAddr"),
Err(VarError::NotPresent) => "127.0.0.1:8080".parse().unwrap(),
Err(VarError::NotUnicode(_)) => {
panic!("INFLUXDB_IOX_BIND_ADDR environment variable not a valid unicode string")
}
};
let router_service = http_routes::router_service(app_server.clone());
let bind_addr = config.http_bind_address;
let http_server = Server::try_bind(&bind_addr)
.context(StartListening { bind_addr })?
.context(StartListeningHttp { bind_addr })?
.serve(router_service);
info!(bind_address=?bind_addr, "HTTP server listening");

View File

@ -16,15 +16,16 @@ mod panic;
pub mod server;
mod commands {
pub mod config;
pub mod convert;
pub mod file_meta;
mod input;
pub mod logging;
pub mod server;
pub mod stats;
}
use panic::SendPanicsToTracing;
use tracing_subscriber::{prelude::*, EnvFilter};
use commands::logging::LoggingLevel;
enum ReturnCode {
ConversionFailed = 1,
@ -40,6 +41,9 @@ Examples:
# Run the InfluxDB IOx server:
influxdb_iox
# Display all current config settings
influxdb_iox config show
# Run the InfluxDB IOx server with extra verbose logging
influxdb_iox -v
@ -118,6 +122,12 @@ Examples:
),
)
.subcommand(
SubCommand::with_name("config")
.about("Configuration display and manipulation")
.subcommand(SubCommand::with_name("show").help("show current configuration information"))
.subcommand(SubCommand::with_name("help").help("explain detailed configuration options"))
)
.subcommand(
SubCommand::with_name("server")
.about("Runs in server mode (default)")
)
@ -128,6 +138,9 @@ Examples:
.arg(Arg::with_name("num-threads").long("num-threads").takes_value(true).help(
"Set the maximum number of threads to use. Defaults to the number of cores on the system",
))
.arg(Arg::with_name("ignore-config-file").long("ignore-config-file").takes_value(false).help(
"If specified, ignores the default configuration file, if any. Configuration is read from the environment only",
))
.get_matches();
let mut tokio_runtime = get_runtime(matches.value_of("num-threads"))?;
@ -138,19 +151,18 @@ Examples:
}
async fn dispatch_args(matches: ArgMatches<'_>) {
let _drop_handle = setup_logging(matches.occurrences_of("verbose"));
// Logging level is determined via:
// 1. If RUST_LOG environment variable is set, use that value
// 2. if `-vv` (multiple instances of verbose), use DEFAULT_DEBUG_LOG_LEVEL
// 2. if `-v` (single instances of verbose), use DEFAULT_VERBOSE_LOG_LEVEL
// 3. Otherwise use DEFAULT_LOG_LEVEL
let logging_level = LoggingLevel::new(matches.occurrences_of("verbose"));
// Install custom panic handler and forget about it.
//
// This leaks the handler and prevents it from ever being dropped during the
// lifetime of the program - this is actually a good thing, as it prevents
// the panic handler from being removed while unwinding a panic (which in
// turn, causes a panic - see #548)
let f = SendPanicsToTracing::new();
std::mem::forget(f);
let ignore_config_file = matches.occurrences_of("ignore-config-file") > 0;
match matches.subcommand() {
("convert", Some(sub_matches)) => {
logging_level.setup_basic_logging();
let input_path = sub_matches.value_of("INPUT").unwrap();
let output_path = sub_matches.value_of("OUTPUT").unwrap();
let compression_level =
@ -164,6 +176,7 @@ async fn dispatch_args(matches: ArgMatches<'_>) {
}
}
("meta", Some(sub_matches)) => {
logging_level.setup_basic_logging();
let input_filename = sub_matches.value_of("INPUT").unwrap();
match commands::file_meta::dump_meta(&input_filename) {
Ok(()) => debug!("Metadata dump completed successfully"),
@ -174,6 +187,7 @@ async fn dispatch_args(matches: ArgMatches<'_>) {
}
}
("stats", Some(sub_matches)) => {
logging_level.setup_basic_logging();
let config = commands::stats::StatsConfig {
input_path: sub_matches.value_of("INPUT").unwrap().into(),
per_file: sub_matches.is_present("per-file"),
@ -188,9 +202,19 @@ async fn dispatch_args(matches: ArgMatches<'_>) {
}
}
}
("config", Some(sub_matches)) => {
logging_level.setup_basic_logging();
match sub_matches.subcommand() {
("show", _) => commands::config::show_config(ignore_config_file),
("help", _) => commands::config::describe_config(ignore_config_file),
(command, _) => panic!("Unknown subcommand for config: {}", command),
}
}
("server", Some(_)) | (_, _) => {
// Note don't set up basic logging here, different logging rules appy in server
// mode
println!("InfluxDB IOx server starting");
match commands::server::main().await {
match commands::server::main(logging_level, ignore_config_file).await {
Ok(()) => eprintln!("Shutdown OK"),
Err(e) => {
error!("Server shutdown with error: {}", e);
@ -201,88 +225,6 @@ async fn dispatch_args(matches: ArgMatches<'_>) {
}
}
/// Default debug level is debug for everything except
/// some especially noisy low level libraries
const DEFAULT_DEBUG_LOG_LEVEL: &str = "debug,hyper::proto::h1=info,h2=info";
// Default verbose log level is info level for all components
const DEFAULT_VERBOSE_LOG_LEVEL: &str = "info";
// Default log level is warn level for all components
const DEFAULT_LOG_LEVEL: &str = "warn";
/// Configures logging in the following precedence:
///
/// 1. If RUST_LOG environment variable is set, use that value
/// 2. if `-vv` (multiple instances of verbose), use DEFAULT_DEBUG_LOG_LEVEL
/// 2. if `-v` (single instances of verbose), use DEFAULT_VERBOSE_LOG_LEVEL
/// 3. Otherwise use DEFAULT_LOG_LEVEL
fn setup_logging(num_verbose: u64) -> Option<opentelemetry_jaeger::Uninstall> {
let rust_log_env = std::env::var("RUST_LOG");
match rust_log_env {
Ok(lvl) => {
if num_verbose > 0 {
eprintln!(
"WARNING: Using RUST_LOG='{}' environment, ignoring -v command line",
lvl
);
}
}
Err(_) => match num_verbose {
0 => std::env::set_var("RUST_LOG", DEFAULT_LOG_LEVEL),
1 => std::env::set_var("RUST_LOG", DEFAULT_VERBOSE_LOG_LEVEL),
_ => std::env::set_var("RUST_LOG", DEFAULT_DEBUG_LOG_LEVEL),
},
}
// Configure the OpenTelemetry tracer, if requested.
//
// To enable the tracing functionality, set OTEL_EXPORTER_JAEGER_AGENT_HOST
// env to some suitable value (see below).
//
// The Jaeger layer emits traces under the service name of "iox" if not
// overwrote by the user by setting the env below. All configuration is
// sourced from the environment:
//
// - OTEL_SERVICE_NAME: emitter service name (iox by default)
// - OTEL_EXPORTER_JAEGER_AGENT_HOST: hostname/address of the collector
// - OTEL_EXPORTER_JAEGER_AGENT_PORT: listening port of the collector
//
let (opentelemetry, drop_handle) = if std::env::var("OTEL_EXPORTER_JAEGER_AGENT_HOST").is_ok() {
// Initialise the jaeger event emitter
let (tracer, drop_handle) = opentelemetry_jaeger::new_pipeline()
.with_service_name("iox")
.from_env()
.install()
.expect("failed to initialise the Jaeger tracing sink");
// Initialise the opentelemetry tracing layer, giving it the jaeger emitter
let opentelemetry = tracing_opentelemetry::layer().with_tracer(tracer);
(Some(opentelemetry), Some(drop_handle))
} else {
(None, None)
};
// Configure the logger to write to stderr
let logger = tracing_subscriber::fmt::layer().with_writer(std::io::stderr);
// Register the chain of event subscribers:
//
// - Jaeger tracing emitter
// - Env filter (using RUST_LOG as the filter env)
// - A stdout logger
//
tracing_subscriber::registry()
.with(opentelemetry)
.with(EnvFilter::from_default_env())
.with(logger)
.init();
drop_handle
}
/// Creates the tokio runtime for executing IOx
///
/// if nthreads is none, uses the default scheduler

View File

@ -25,6 +25,7 @@ pub fn all_approximately_equal(f1: &[f64], f2: &[f64]) -> bool {
f1.len() == f2.len() && f1.iter().zip(f2).all(|(&a, &b)| approximately_equal(a, b))
}
/// Return a temporary directory that is deleted when the object is dropped
pub fn tmp_dir() -> Result<tempfile::TempDir> {
let _ = dotenv::dotenv();
@ -35,6 +36,25 @@ pub fn tmp_dir() -> Result<tempfile::TempDir> {
.tempdir_in(root)?)
}
pub fn tmp_file() -> Result<tempfile::NamedTempFile> {
let _ = dotenv::dotenv();
let root = env::var_os("TEST_INFLUXDB_IOX_DB_DIR").unwrap_or_else(|| env::temp_dir().into());
Ok(tempfile::Builder::new()
.prefix("influxdb_iox")
.tempfile_in(root)?)
}
/// Writes the specified string to a new temporary file, returning the Path to
/// the file
pub fn make_temp_file<C: AsRef<[u8]>>(contents: C) -> tempfile::NamedTempFile {
let file = tmp_file().expect("creating temp file");
std::fs::write(&file, contents).expect("writing data to temp file");
file
}
/// convert form that is easier to type in tests to what some code needs
pub fn str_vec_to_arc_vec(str_vec: &[&str]) -> Arc<Vec<Arc<String>>> {
Arc::new(str_vec.iter().map(|s| Arc::new(String::from(*s))).collect())
@ -64,3 +84,41 @@ pub fn enable_logging() {
std::env::set_var("RUST_LOG", "debug");
env_logger::init();
}
#[macro_export]
/// A macro to assert that one string is contained within another with
/// a nice error message if they are not. Is a macro so test error
/// messages are on the same line as the failure;
///
/// Both arguments must be convertable into Strings (Into<String>)
macro_rules! assert_contains {
($ACTUAL: expr, $EXPECTED: expr) => {
let actual_value: String = $ACTUAL.into();
let expected_value: String = $EXPECTED.into();
assert!(
actual_value.contains(&expected_value),
"Can not find expected in actual.\n\nExpected:\n{}\n\nActual:\n{}",
expected_value,
actual_value
);
};
}
#[macro_export]
/// A macro to assert that one string is NOT contained within another with
/// a nice error message if that check fails. Is a macro so test error
/// messages are on the same line as the failure;
///
/// Both arguments must be convertable into Strings (Into<String>)
macro_rules! assert_not_contains {
($ACTUAL: expr, $UNEXPECTED: expr) => {
let actual_value: String = $ACTUAL.into();
let unexpected_value: String = $UNEXPECTED.into();
assert!(
!actual_value.contains(&unexpected_value),
"Found unexpected value in actual.\n\nUnexpected:\n{}\n\nActual:\n{}",
unexpected_value,
actual_value
);
};
}

View File

@ -842,13 +842,13 @@ struct TestServer {
impl TestServer {
fn new() -> Result<Self> {
let _ = dotenv::dotenv(); // load .env file if present
let dir = test_helpers::tmp_dir()?;
let server_process = Command::cargo_bin("influxdb_iox")?
// Can enable for debbugging
//.arg("-vv")
// ignore any config file in the user's home directory
.arg("--ignore-config-file")
.env("INFLUXDB_IOX_DB_DIR", dir.path())
.env("INFLUXDB_IOX_ID", "1")
.spawn()?;
@ -866,7 +866,10 @@ impl TestServer {
self.server_process = Command::cargo_bin("influxdb_iox")?
// Can enable for debbugging
//.arg("-vv")
// ignore any config file in the user's home directory
.arg("--ignore-config-file")
.env("INFLUXDB_IOX_DB_DIR", self.dir.path())
.env("INFLUXDB_IOX_ID", "1")
.spawn()?;
Ok(())
}