feat: Add remote_template for simpler remote configuration
parent
8c3e597cf3
commit
aa90329c1f
|
@ -48,10 +48,14 @@ impl<E> From<Error> for UpdateError<E> {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Config {
|
impl Config {
|
||||||
pub(crate) fn new(jobs: Arc<JobRegistry>, metric_registry: Arc<MetricRegistry>) -> Self {
|
pub(crate) fn new(
|
||||||
|
jobs: Arc<JobRegistry>,
|
||||||
|
metric_registry: Arc<MetricRegistry>,
|
||||||
|
remote_template: Option<RemoteTemplate>,
|
||||||
|
) -> Self {
|
||||||
Self {
|
Self {
|
||||||
shutdown: Default::default(),
|
shutdown: Default::default(),
|
||||||
state: Default::default(),
|
state: RwLock::new(ConfigState::new(remote_template)),
|
||||||
jobs,
|
jobs,
|
||||||
metric_registry,
|
metric_registry,
|
||||||
}
|
}
|
||||||
|
@ -120,7 +124,11 @@ impl Config {
|
||||||
|
|
||||||
pub(crate) fn resolve_remote(&self, id: ServerId) -> Option<GRpcConnectionString> {
|
pub(crate) fn resolve_remote(&self, id: ServerId) -> Option<GRpcConnectionString> {
|
||||||
let state = self.state.read().expect("mutex poisoned");
|
let state = self.state.read().expect("mutex poisoned");
|
||||||
state.remotes.get(&id).cloned()
|
state
|
||||||
|
.remotes
|
||||||
|
.get(&id)
|
||||||
|
.cloned()
|
||||||
|
.or_else(|| state.remote_template.as_ref().map(|t| t.get(&id)))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn commit(
|
fn commit(
|
||||||
|
@ -233,6 +241,36 @@ struct ConfigState {
|
||||||
databases: BTreeMap<DatabaseName<'static>, DatabaseState>,
|
databases: BTreeMap<DatabaseName<'static>, DatabaseState>,
|
||||||
/// Map between remote IOx server IDs and management API connection strings.
|
/// Map between remote IOx server IDs and management API connection strings.
|
||||||
remotes: BTreeMap<ServerId, GRpcConnectionString>,
|
remotes: BTreeMap<ServerId, GRpcConnectionString>,
|
||||||
|
/// Static map between remote server IDs and hostnames based on a template
|
||||||
|
remote_template: Option<RemoteTemplate>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ConfigState {
|
||||||
|
fn new(remote_template: Option<RemoteTemplate>) -> Self {
|
||||||
|
Self {
|
||||||
|
remote_template,
|
||||||
|
..Default::default()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A RemoteTemplate string is a remote connection template string.
|
||||||
|
/// Occurrences of the substring "{id}" in the template will be replaced
|
||||||
|
/// by the server ID.
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct RemoteTemplate {
|
||||||
|
template: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RemoteTemplate {
|
||||||
|
pub fn new(template: impl Into<String>) -> Self {
|
||||||
|
let template = template.into();
|
||||||
|
Self { template }
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get(&self, id: &ServerId) -> GRpcConnectionString {
|
||||||
|
self.template.replace("{id}", &format!("{}", id.get_u32()))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
|
@ -316,12 +354,17 @@ mod test {
|
||||||
use crate::db::load_preserved_catalog;
|
use crate::db::load_preserved_catalog;
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
|
use std::num::NonZeroU32;
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn create_db() {
|
async fn create_db() {
|
||||||
let name = DatabaseName::new("foo").unwrap();
|
let name = DatabaseName::new("foo").unwrap();
|
||||||
let metric_registry = Arc::new(metrics::MetricRegistry::new());
|
let metric_registry = Arc::new(metrics::MetricRegistry::new());
|
||||||
let config = Config::new(Arc::new(JobRegistry::new()), Arc::clone(&metric_registry));
|
let config = Config::new(
|
||||||
|
Arc::new(JobRegistry::new()),
|
||||||
|
Arc::clone(&metric_registry),
|
||||||
|
None,
|
||||||
|
);
|
||||||
let rules = DatabaseRules::new(name.clone());
|
let rules = DatabaseRules::new(name.clone());
|
||||||
|
|
||||||
{
|
{
|
||||||
|
@ -363,7 +406,11 @@ mod test {
|
||||||
async fn test_db_drop() {
|
async fn test_db_drop() {
|
||||||
let name = DatabaseName::new("foo").unwrap();
|
let name = DatabaseName::new("foo").unwrap();
|
||||||
let metric_registry = Arc::new(metrics::MetricRegistry::new());
|
let metric_registry = Arc::new(metrics::MetricRegistry::new());
|
||||||
let config = Config::new(Arc::new(JobRegistry::new()), Arc::clone(&metric_registry));
|
let config = Config::new(
|
||||||
|
Arc::new(JobRegistry::new()),
|
||||||
|
Arc::clone(&metric_registry),
|
||||||
|
None,
|
||||||
|
);
|
||||||
let rules = DatabaseRules::new(name.clone());
|
let rules = DatabaseRules::new(name.clone());
|
||||||
|
|
||||||
let db_reservation = config.create_db(rules).unwrap();
|
let db_reservation = config.create_db(rules).unwrap();
|
||||||
|
@ -412,4 +459,28 @@ mod test {
|
||||||
|
|
||||||
assert_eq!(rules_path, expected_path);
|
assert_eq!(rules_path, expected_path);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn resolve_remote() {
|
||||||
|
let metric_registry = Arc::new(metrics::MetricRegistry::new());
|
||||||
|
let config = Config::new(
|
||||||
|
Arc::new(JobRegistry::new()),
|
||||||
|
Arc::clone(&metric_registry),
|
||||||
|
Some(RemoteTemplate::new("http://iox-query-{id}:8082")),
|
||||||
|
);
|
||||||
|
|
||||||
|
let server_id = ServerId::new(NonZeroU32::new(42).unwrap());
|
||||||
|
let remote = config.resolve_remote(server_id);
|
||||||
|
assert_eq!(
|
||||||
|
remote,
|
||||||
|
Some(GRpcConnectionString::from("http://iox-query-42:8082"))
|
||||||
|
);
|
||||||
|
|
||||||
|
let server_id = ServerId::new(NonZeroU32::new(24).unwrap());
|
||||||
|
let remote = config.resolve_remote(server_id);
|
||||||
|
assert_eq!(
|
||||||
|
remote,
|
||||||
|
Some(GRpcConnectionString::from("http://iox-query-24:8082"))
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -93,6 +93,7 @@ use object_store::{path::ObjectStorePath, ObjectStore, ObjectStoreApi};
|
||||||
use query::{exec::Executor, DatabaseStore};
|
use query::{exec::Executor, DatabaseStore};
|
||||||
use tracker::{TaskId, TaskRegistration, TaskRegistryWithHistory, TaskTracker, TrackedFutureExt};
|
use tracker::{TaskId, TaskRegistration, TaskRegistryWithHistory, TaskTracker, TrackedFutureExt};
|
||||||
|
|
||||||
|
pub use crate::config::RemoteTemplate;
|
||||||
use crate::{
|
use crate::{
|
||||||
config::{
|
config::{
|
||||||
object_store_path_for_database_config, Config, GRpcConnectionString, DB_RULES_FILE_NAME,
|
object_store_path_for_database_config, Config, GRpcConnectionString, DB_RULES_FILE_NAME,
|
||||||
|
@ -229,15 +230,22 @@ pub struct ServerConfig {
|
||||||
object_store: Arc<ObjectStore>,
|
object_store: Arc<ObjectStore>,
|
||||||
|
|
||||||
metric_registry: Arc<MetricRegistry>,
|
metric_registry: Arc<MetricRegistry>,
|
||||||
|
|
||||||
|
remote_template: Option<RemoteTemplate>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ServerConfig {
|
impl ServerConfig {
|
||||||
/// Create a new config using the specified store.
|
/// Create a new config using the specified store.
|
||||||
pub fn new(object_store: Arc<ObjectStore>, metric_registry: Arc<MetricRegistry>) -> Self {
|
pub fn new(
|
||||||
|
object_store: Arc<ObjectStore>,
|
||||||
|
metric_registry: Arc<MetricRegistry>,
|
||||||
|
remote_template: Option<RemoteTemplate>,
|
||||||
|
) -> Self {
|
||||||
Self {
|
Self {
|
||||||
num_worker_threads: None,
|
num_worker_threads: None,
|
||||||
object_store,
|
object_store,
|
||||||
metric_registry,
|
metric_registry,
|
||||||
|
remote_template,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -391,12 +399,17 @@ impl<M: ConnectionManager> Server<M> {
|
||||||
object_store,
|
object_store,
|
||||||
// to test the metrics provide a different registry to the `ServerConfig`.
|
// to test the metrics provide a different registry to the `ServerConfig`.
|
||||||
metric_registry,
|
metric_registry,
|
||||||
|
remote_template,
|
||||||
} = config;
|
} = config;
|
||||||
let num_worker_threads = num_worker_threads.unwrap_or_else(num_cpus::get);
|
let num_worker_threads = num_worker_threads.unwrap_or_else(num_cpus::get);
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
id: Default::default(),
|
id: Default::default(),
|
||||||
config: Arc::new(Config::new(Arc::clone(&jobs), Arc::clone(&metric_registry))),
|
config: Arc::new(Config::new(
|
||||||
|
Arc::clone(&jobs),
|
||||||
|
Arc::clone(&metric_registry),
|
||||||
|
remote_template,
|
||||||
|
)),
|
||||||
store: object_store,
|
store: object_store,
|
||||||
connection_manager: Arc::new(connection_manager),
|
connection_manager: Arc::new(connection_manager),
|
||||||
exec: Arc::new(Executor::new(num_worker_threads)),
|
exec: Arc::new(Executor::new(num_worker_threads)),
|
||||||
|
@ -1058,11 +1071,8 @@ mod tests {
|
||||||
let test_registry = metrics::TestMetricRegistry::new(Arc::clone(®istry));
|
let test_registry = metrics::TestMetricRegistry::new(Arc::clone(®istry));
|
||||||
(
|
(
|
||||||
test_registry,
|
test_registry,
|
||||||
ServerConfig::new(
|
ServerConfig::new(Arc::new(object_store), registry, Option::None)
|
||||||
Arc::new(object_store),
|
.with_num_worker_threads(1),
|
||||||
registry, // new registry ensures test isolation of metrics
|
|
||||||
)
|
|
||||||
.with_num_worker_threads(1),
|
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1161,8 +1171,8 @@ mod tests {
|
||||||
store.list_with_delimiter(&store.new_path()).await.unwrap();
|
store.list_with_delimiter(&store.new_path()).await.unwrap();
|
||||||
|
|
||||||
let manager = TestConnectionManager::new();
|
let manager = TestConnectionManager::new();
|
||||||
let config2 =
|
let config2 = ServerConfig::new(store, Arc::new(MetricRegistry::new()), Option::None)
|
||||||
ServerConfig::new(store, Arc::new(MetricRegistry::new())).with_num_worker_threads(1);
|
.with_num_worker_threads(1);
|
||||||
let server2 = Server::new(manager, config2);
|
let server2 = Server::new(manager, config2);
|
||||||
server2.set_id(ServerId::try_from(1).unwrap()).unwrap();
|
server2.set_id(ServerId::try_from(1).unwrap()).unwrap();
|
||||||
server2.load_database_configs().await.unwrap();
|
server2.load_database_configs().await.unwrap();
|
||||||
|
|
|
@ -397,6 +397,15 @@ Possible values (case insensitive):
|
||||||
/// environments.
|
/// environments.
|
||||||
#[structopt(long = "--azure-storage-access-key", env = "AZURE_STORAGE_ACCESS_KEY")]
|
#[structopt(long = "--azure-storage-access-key", env = "AZURE_STORAGE_ACCESS_KEY")]
|
||||||
pub azure_storage_access_key: Option<String>,
|
pub azure_storage_access_key: Option<String>,
|
||||||
|
|
||||||
|
/// When IOx nodes need to talk to remote peers they consult an internal remote address
|
||||||
|
/// mapping. This mapping is populated via API calls. If the mapping doesn't produce
|
||||||
|
/// a result, this config entry allows to generate a hostname from at template:
|
||||||
|
/// occurrences of the "{id}" substring will be replaced with the remote Server ID.
|
||||||
|
///
|
||||||
|
/// Example: http://node-{id}.ioxmydomain.com:8082
|
||||||
|
#[structopt(long = "--remote-template", env = "INFLUXDB_IOX_REMOTE_TEMPLATE")]
|
||||||
|
pub remote_template: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn command(config: Config) -> Result<()> {
|
pub async fn command(config: Config) -> Result<()> {
|
||||||
|
|
|
@ -7,7 +7,7 @@ use object_store::{
|
||||||
use observability_deps::tracing::{self, error, info, warn, Instrument};
|
use observability_deps::tracing::{self, error, info, warn, Instrument};
|
||||||
use panic_logging::SendPanicsToTracing;
|
use panic_logging::SendPanicsToTracing;
|
||||||
use server::{
|
use server::{
|
||||||
ConnectionManagerImpl as ConnectionManager, Server as AppServer,
|
ConnectionManagerImpl as ConnectionManager, RemoteTemplate, Server as AppServer,
|
||||||
ServerConfig as AppServerConfig,
|
ServerConfig as AppServerConfig,
|
||||||
};
|
};
|
||||||
use snafu::{ResultExt, Snafu};
|
use snafu::{ResultExt, Snafu};
|
||||||
|
@ -123,7 +123,8 @@ pub async fn main(config: Config) -> Result<()> {
|
||||||
let object_store = ObjectStore::try_from(&config)?;
|
let object_store = ObjectStore::try_from(&config)?;
|
||||||
let object_storage = Arc::new(object_store);
|
let object_storage = Arc::new(object_store);
|
||||||
let metric_registry = Arc::new(metrics::MetricRegistry::new());
|
let metric_registry = Arc::new(metrics::MetricRegistry::new());
|
||||||
let server_config = AppServerConfig::new(object_storage, metric_registry);
|
let remote_template = config.remote_template.map(RemoteTemplate::new);
|
||||||
|
let server_config = AppServerConfig::new(object_storage, metric_registry, remote_template);
|
||||||
|
|
||||||
let server_config = if let Some(n) = config.num_worker_threads {
|
let server_config = if let Some(n) = config.num_worker_threads {
|
||||||
info!(
|
info!(
|
||||||
|
|
|
@ -943,6 +943,7 @@ mod tests {
|
||||||
AppServerConfig::new(
|
AppServerConfig::new(
|
||||||
Arc::new(ObjectStore::new_in_memory(InMemory::new())),
|
Arc::new(ObjectStore::new_in_memory(InMemory::new())),
|
||||||
registry,
|
registry,
|
||||||
|
Option::None,
|
||||||
)
|
)
|
||||||
.with_num_worker_threads(1),
|
.with_num_worker_threads(1),
|
||||||
)
|
)
|
||||||
|
|
Loading…
Reference in New Issue