diff --git a/server/src/config.rs b/server/src/config.rs index fc4e2d6f8e..521c98b03e 100644 --- a/server/src/config.rs +++ b/server/src/config.rs @@ -48,10 +48,14 @@ impl From for UpdateError { } impl Config { - pub(crate) fn new(jobs: Arc, metric_registry: Arc) -> Self { + pub(crate) fn new( + jobs: Arc, + metric_registry: Arc, + remote_template: Option, + ) -> Self { Self { shutdown: Default::default(), - state: Default::default(), + state: RwLock::new(ConfigState::new(remote_template)), jobs, metric_registry, } @@ -120,7 +124,11 @@ impl Config { pub(crate) fn resolve_remote(&self, id: ServerId) -> Option { let state = self.state.read().expect("mutex poisoned"); - state.remotes.get(&id).cloned() + state + .remotes + .get(&id) + .cloned() + .or_else(|| state.remote_template.as_ref().map(|t| t.get(&id))) } fn commit( @@ -233,6 +241,36 @@ struct ConfigState { databases: BTreeMap, DatabaseState>, /// Map between remote IOx server IDs and management API connection strings. remotes: BTreeMap, + /// Static map between remote server IDs and hostnames based on a template + remote_template: Option, +} + +impl ConfigState { + fn new(remote_template: Option) -> Self { + Self { + remote_template, + ..Default::default() + } + } +} + +/// A RemoteTemplate string is a remote connection template string. +/// Occurrences of the substring "{id}" in the template will be replaced +/// by the server ID. +#[derive(Debug)] +pub struct RemoteTemplate { + template: String, +} + +impl RemoteTemplate { + pub fn new(template: impl Into) -> Self { + let template = template.into(); + Self { template } + } + + fn get(&self, id: &ServerId) -> GRpcConnectionString { + self.template.replace("{id}", &format!("{}", id.get_u32())) + } } #[derive(Debug)] @@ -316,12 +354,17 @@ mod test { use crate::db::load_preserved_catalog; use super::*; + use std::num::NonZeroU32; #[tokio::test] async fn create_db() { let name = DatabaseName::new("foo").unwrap(); let metric_registry = Arc::new(metrics::MetricRegistry::new()); - let config = Config::new(Arc::new(JobRegistry::new()), Arc::clone(&metric_registry)); + let config = Config::new( + Arc::new(JobRegistry::new()), + Arc::clone(&metric_registry), + None, + ); let rules = DatabaseRules::new(name.clone()); { @@ -363,7 +406,11 @@ mod test { async fn test_db_drop() { let name = DatabaseName::new("foo").unwrap(); let metric_registry = Arc::new(metrics::MetricRegistry::new()); - let config = Config::new(Arc::new(JobRegistry::new()), Arc::clone(&metric_registry)); + let config = Config::new( + Arc::new(JobRegistry::new()), + Arc::clone(&metric_registry), + None, + ); let rules = DatabaseRules::new(name.clone()); let db_reservation = config.create_db(rules).unwrap(); @@ -412,4 +459,28 @@ mod test { assert_eq!(rules_path, expected_path); } + + #[test] + fn resolve_remote() { + let metric_registry = Arc::new(metrics::MetricRegistry::new()); + let config = Config::new( + Arc::new(JobRegistry::new()), + Arc::clone(&metric_registry), + Some(RemoteTemplate::new("http://iox-query-{id}:8082")), + ); + + let server_id = ServerId::new(NonZeroU32::new(42).unwrap()); + let remote = config.resolve_remote(server_id); + assert_eq!( + remote, + Some(GRpcConnectionString::from("http://iox-query-42:8082")) + ); + + let server_id = ServerId::new(NonZeroU32::new(24).unwrap()); + let remote = config.resolve_remote(server_id); + assert_eq!( + remote, + Some(GRpcConnectionString::from("http://iox-query-24:8082")) + ); + } } diff --git a/server/src/lib.rs b/server/src/lib.rs index 10bb5cee3f..04735ee697 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -93,6 +93,7 @@ use object_store::{path::ObjectStorePath, ObjectStore, ObjectStoreApi}; use query::{exec::Executor, DatabaseStore}; use tracker::{TaskId, TaskRegistration, TaskRegistryWithHistory, TaskTracker, TrackedFutureExt}; +pub use crate::config::RemoteTemplate; use crate::{ config::{ object_store_path_for_database_config, Config, GRpcConnectionString, DB_RULES_FILE_NAME, @@ -229,15 +230,22 @@ pub struct ServerConfig { object_store: Arc, metric_registry: Arc, + + remote_template: Option, } impl ServerConfig { /// Create a new config using the specified store. - pub fn new(object_store: Arc, metric_registry: Arc) -> Self { + pub fn new( + object_store: Arc, + metric_registry: Arc, + remote_template: Option, + ) -> Self { Self { num_worker_threads: None, object_store, metric_registry, + remote_template, } } @@ -391,12 +399,17 @@ impl Server { object_store, // to test the metrics provide a different registry to the `ServerConfig`. metric_registry, + remote_template, } = config; let num_worker_threads = num_worker_threads.unwrap_or_else(num_cpus::get); Self { id: Default::default(), - config: Arc::new(Config::new(Arc::clone(&jobs), Arc::clone(&metric_registry))), + config: Arc::new(Config::new( + Arc::clone(&jobs), + Arc::clone(&metric_registry), + remote_template, + )), store: object_store, connection_manager: Arc::new(connection_manager), exec: Arc::new(Executor::new(num_worker_threads)), @@ -1058,11 +1071,8 @@ mod tests { let test_registry = metrics::TestMetricRegistry::new(Arc::clone(®istry)); ( test_registry, - ServerConfig::new( - Arc::new(object_store), - registry, // new registry ensures test isolation of metrics - ) - .with_num_worker_threads(1), + ServerConfig::new(Arc::new(object_store), registry, Option::None) + .with_num_worker_threads(1), ) } @@ -1161,8 +1171,8 @@ mod tests { store.list_with_delimiter(&store.new_path()).await.unwrap(); let manager = TestConnectionManager::new(); - let config2 = - ServerConfig::new(store, Arc::new(MetricRegistry::new())).with_num_worker_threads(1); + let config2 = ServerConfig::new(store, Arc::new(MetricRegistry::new()), Option::None) + .with_num_worker_threads(1); let server2 = Server::new(manager, config2); server2.set_id(ServerId::try_from(1).unwrap()).unwrap(); server2.load_database_configs().await.unwrap(); diff --git a/src/commands/run.rs b/src/commands/run.rs index 1ea077e940..56fa12ed7a 100644 --- a/src/commands/run.rs +++ b/src/commands/run.rs @@ -397,6 +397,15 @@ Possible values (case insensitive): /// environments. #[structopt(long = "--azure-storage-access-key", env = "AZURE_STORAGE_ACCESS_KEY")] pub azure_storage_access_key: Option, + + /// When IOx nodes need to talk to remote peers they consult an internal remote address + /// mapping. This mapping is populated via API calls. If the mapping doesn't produce + /// a result, this config entry allows to generate a hostname from at template: + /// occurrences of the "{id}" substring will be replaced with the remote Server ID. + /// + /// Example: http://node-{id}.ioxmydomain.com:8082 + #[structopt(long = "--remote-template", env = "INFLUXDB_IOX_REMOTE_TEMPLATE")] + pub remote_template: Option, } pub async fn command(config: Config) -> Result<()> { diff --git a/src/influxdb_ioxd.rs b/src/influxdb_ioxd.rs index 9856b73a35..590df2a72c 100644 --- a/src/influxdb_ioxd.rs +++ b/src/influxdb_ioxd.rs @@ -7,7 +7,7 @@ use object_store::{ use observability_deps::tracing::{self, error, info, warn, Instrument}; use panic_logging::SendPanicsToTracing; use server::{ - ConnectionManagerImpl as ConnectionManager, Server as AppServer, + ConnectionManagerImpl as ConnectionManager, RemoteTemplate, Server as AppServer, ServerConfig as AppServerConfig, }; use snafu::{ResultExt, Snafu}; @@ -123,7 +123,8 @@ pub async fn main(config: Config) -> Result<()> { let object_store = ObjectStore::try_from(&config)?; let object_storage = Arc::new(object_store); let metric_registry = Arc::new(metrics::MetricRegistry::new()); - let server_config = AppServerConfig::new(object_storage, metric_registry); + let remote_template = config.remote_template.map(RemoteTemplate::new); + let server_config = AppServerConfig::new(object_storage, metric_registry, remote_template); let server_config = if let Some(n) = config.num_worker_threads { info!( diff --git a/src/influxdb_ioxd/http.rs b/src/influxdb_ioxd/http.rs index 9bebdfe420..ee3c2d064b 100644 --- a/src/influxdb_ioxd/http.rs +++ b/src/influxdb_ioxd/http.rs @@ -943,6 +943,7 @@ mod tests { AppServerConfig::new( Arc::new(ObjectStore::new_in_memory(InMemory::new())), registry, + Option::None, ) .with_num_worker_threads(1), )