diff --git a/server/src/config.rs b/server/src/config.rs index d4dbe43f65..04b025f34d 100644 --- a/server/src/config.rs +++ b/server/src/config.rs @@ -68,7 +68,6 @@ impl Config { exec: Arc, server_id: ServerId, metric_registry: Arc, - remote_template: Option, ) -> Self { Self { jobs, @@ -77,7 +76,7 @@ impl Config { server_id, metric_registry, shutdown: Default::default(), - state: RwLock::new(ConfigState::new(remote_template)), + state: Default::default(), } } @@ -228,34 +227,6 @@ impl Config { db.update_rules(update).map_err(UpdateError::Closure) } - /// Get all registered remote servers. - pub(crate) fn remotes_sorted(&self) -> Vec<(ServerId, String)> { - let state = self.state.read().expect("mutex poisoned"); - state.remotes.iter().map(|(&a, b)| (a, b.clone())).collect() - } - - /// Update given remote server. - pub(crate) fn update_remote(&self, id: ServerId, addr: GRpcConnectionString) { - let mut state = self.state.write().expect("mutex poisoned"); - state.remotes.insert(id, addr); - } - - /// Delete remote server by ID. - pub(crate) fn delete_remote(&self, id: ServerId) -> Option { - let mut state = self.state.write().expect("mutex poisoned"); - state.remotes.remove(&id) - } - - /// Get remote server by ID. - pub(crate) fn resolve_remote(&self, id: ServerId) -> Option { - let state = self.state.read().expect("mutex poisoned"); - state - .remotes - .get(&id) - .cloned() - .or_else(|| state.remote_template.as_ref().map(|t| t.get(&id))) - } - /// Commit new or unchanged database state. fn commit_db(&self, db_state: Arc) { let mut state = self.state.write().expect("mutex poisoned"); @@ -336,9 +307,6 @@ pub fn object_store_path_for_database_config( path } -/// A gRPC connection string. -pub type GRpcConnectionString = String; - /// Inner config state that is protected by a lock. #[derive(Default, Debug)] struct ConfigState { @@ -347,40 +315,6 @@ struct ConfigState { /// Databases in different states. databases: BTreeMap, Arc>, - - /// Map between remote IOx server IDs and management API connection strings. - remotes: BTreeMap, - - /// Static map between remote server IDs and hostnames based on a template - remote_template: Option, -} - -impl ConfigState { - fn new(remote_template: Option) -> Self { - Self { - remote_template, - ..Default::default() - } - } -} - -/// A RemoteTemplate string is a remote connection template string. -/// Occurrences of the substring "{id}" in the template will be replaced -/// by the server ID. -#[derive(Debug)] -pub struct RemoteTemplate { - template: String, -} - -impl RemoteTemplate { - pub fn new(template: impl Into) -> Self { - let template = template.into(); - Self { template } - } - - fn get(&self, id: &ServerId) -> GRpcConnectionString { - self.template.replace("{id}", &format!("{}", id.get_u32())) - } } /// Internal representation of the different database states. @@ -703,9 +637,8 @@ mod test { use crate::db::load::load_or_create_preserved_catalog; use super::*; - use std::num::NonZeroU32; - fn make_config(remote_template: Option) -> Config { + fn make_config() -> Config { let store = Arc::new(ObjectStore::new_in_memory()); let server_id = ServerId::try_from(1).unwrap(); let metric_registry = Arc::new(metrics::MetricRegistry::new()); @@ -715,7 +648,6 @@ mod test { Arc::new(Executor::new(1)), server_id, Arc::clone(&metric_registry), - remote_template, ) } @@ -723,7 +655,7 @@ mod test { async fn create_db() { // setup let name = DatabaseName::new("foo").unwrap(); - let config = make_config(None); + let config = make_config(); let rules = DatabaseRules::new(name.clone()); // getting handle while DB is reserved => fails @@ -827,7 +759,7 @@ mod test { async fn recover_db() { // setup let name = DatabaseName::new("foo").unwrap(); - let config = make_config(None); + let config = make_config(); let rules = DatabaseRules::new(name.clone()); // create DB but don't continue with rules loaded (e.g. because the rules file is broken) @@ -914,7 +846,7 @@ mod test { async fn block_db() { // setup let name = DatabaseName::new("foo").unwrap(); - let config = make_config(None); + let config = make_config(); // block DB let handle = config.block_db(name.clone()).unwrap(); @@ -943,7 +875,7 @@ mod test { async fn test_db_drop() { // setup let name = DatabaseName::new("foo").unwrap(); - let config = make_config(None); + let config = make_config(); let rules = DatabaseRules::new(name.clone()); let (preserved_catalog, catalog, replay_plan) = load_or_create_preserved_catalog( &name, @@ -1000,23 +932,4 @@ mod test { assert_eq!(rules_path, expected_path); } - - #[test] - fn resolve_remote() { - let config = make_config(Some(RemoteTemplate::new("http://iox-query-{id}:8082"))); - - let server_id = ServerId::new(NonZeroU32::new(42).unwrap()); - let remote = config.resolve_remote(server_id); - assert_eq!( - remote, - Some(GRpcConnectionString::from("http://iox-query-42:8082")) - ); - - let server_id = ServerId::new(NonZeroU32::new(24).unwrap()); - let remote = config.resolve_remote(server_id); - assert_eq!( - remote, - Some(GRpcConnectionString::from("http://iox-query-24:8082")) - ); - } } diff --git a/server/src/lib.rs b/server/src/lib.rs index 14bdbfa716..e6e879ad93 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -74,7 +74,7 @@ use std::sync::Arc; use async_trait::async_trait; use bytes::BytesMut; -use config::{object_store_path_for_database_config, Config, GRpcConnectionString}; +use config::{object_store_path_for_database_config, Config}; use data_types::database_rules::ShardConfig; use data_types::{ database_rules::{ @@ -97,18 +97,20 @@ use observability_deps::tracing::{error, info, warn}; use parking_lot::{Mutex, RwLock, RwLockUpgradableReadGuard}; use query::{exec::Executor, DatabaseStore}; use rand::seq::SliceRandom; +use resolver::Resolver; use snafu::{OptionExt, ResultExt, Snafu}; use tracker::{TaskId, TaskRegistration, TaskRegistryWithHistory, TaskTracker, TrackedFutureExt}; use write_buffer::config::WriteBufferConfig; -pub use config::RemoteTemplate; pub use connection::{ConnectionManager, ConnectionManagerImpl, RemoteServer}; pub use db::Db; +pub use resolver::{GrpcConnectionString, RemoteTemplate}; mod config; mod connection; pub mod db; mod init; +mod resolver; /// Utility modules used by benchmarks and tests pub mod utils; @@ -210,7 +212,7 @@ pub enum Error { #[snafu(display("all remotes failed connecting: {:?}", errors))] NoRemoteReachable { - errors: HashMap, + errors: HashMap, }, #[snafu(display("remote error: {}", source))] @@ -429,6 +431,9 @@ pub struct Server { /// and populates the endpoint with this data. registry: Arc, + /// Resolver for mapping ServerId to gRPC connection strings + resolver: RwLock, + /// The state machine for server startup stage: Arc>, } @@ -448,10 +453,7 @@ pub struct Server { #[derive(Debug)] enum ServerStage { /// Server has started but doesn't have a server id yet - Startup { - remote_template: Option, - wipe_catalog_on_error: bool, - }, + Startup { wipe_catalog_on_error: bool }, /// Server can be initialized InitReady { @@ -513,8 +515,8 @@ where jobs, metrics: Arc::new(ServerMetrics::new(Arc::clone(&metric_registry))), registry: Arc::clone(&metric_registry), + resolver: RwLock::new(Resolver::new(remote_template)), stage: Arc::new(RwLock::new(ServerStage::Startup { - remote_template, wipe_catalog_on_error, })), } @@ -528,11 +530,8 @@ where let mut stage = self.stage.write(); match &mut *stage { ServerStage::Startup { - remote_template, wipe_catalog_on_error, } => { - let remote_template = remote_template.take(); - *stage = ServerStage::InitReady { wipe_catalog_on_error: *wipe_catalog_on_error, config: Arc::new(Config::new( @@ -541,7 +540,6 @@ where Arc::clone(&self.exec), id, Arc::clone(&self.registry), - remote_template, )), last_error: None, }; @@ -907,12 +905,16 @@ where entry: Entry, ) -> Result<()> { // Return an error if this server is not yet ready - let config = self.config()?; + self.require_initialized()?; + + let addrs: Vec<_> = { + let resolver = self.resolver.read(); + node_group + .iter() + .filter_map(|&node| resolver.resolve_remote(node)) + .collect() + }; - let addrs: Vec<_> = node_group - .iter() - .filter_map(|&node| config.resolve_remote(node)) - .collect(); if addrs.is_empty() { return NoRemoteConfigured { node_group }.fail(); } @@ -1018,23 +1020,16 @@ where Ok(rules) } - pub fn remotes_sorted(&self) -> Result> { - // TODO: Should these be on ConnectionManager and not Config - let config = self.config()?; - Ok(config.remotes_sorted()) + pub fn remotes_sorted(&self) -> Vec<(ServerId, String)> { + self.resolver.read().remotes_sorted() } - pub fn update_remote(&self, id: ServerId, addr: GRpcConnectionString) -> Result<()> { - // TODO: Should these be on ConnectionManager and not Config - let config = self.config()?; - config.update_remote(id, addr); - Ok(()) + pub fn update_remote(&self, id: ServerId, addr: GrpcConnectionString) { + self.resolver.write().update_remote(id, addr) } - pub fn delete_remote(&self, id: ServerId) -> Result> { - // TODO: Should these be on ConnectionManager and not Config - let config = self.config()?; - Ok(config.delete_remote(id)) + pub fn delete_remote(&self, id: ServerId) -> Option { + self.resolver.write().delete_remote(id) } pub fn spawn_dummy_job(&self, nanos: Vec) -> TaskTracker { @@ -1686,9 +1681,7 @@ mod tests { ); // one remote is configured but it's down and we'll get connection error - server - .update_remote(bad_remote_id, BAD_REMOTE_ADDR.into()) - .unwrap(); + server.update_remote(bad_remote_id, BAD_REMOTE_ADDR.into()); let err = server .write_lines(&db_name, &lines, ARBITRARY_DEFAULT_TIME) .await @@ -1705,12 +1698,8 @@ mod tests { // We configure the address for the other remote, this time connection will succeed // despite the bad remote failing to connect. - server - .update_remote(good_remote_id_1, GOOD_REMOTE_ADDR_1.into()) - .unwrap(); - server - .update_remote(good_remote_id_2, GOOD_REMOTE_ADDR_2.into()) - .unwrap(); + server.update_remote(good_remote_id_1, GOOD_REMOTE_ADDR_1.into()); + server.update_remote(good_remote_id_2, GOOD_REMOTE_ADDR_2.into()); // Remotes are tried in random order, so we need to repeat the test a few times to have a reasonable // probability both the remotes will get hit. diff --git a/server/src/resolver.rs b/server/src/resolver.rs new file mode 100644 index 0000000000..19ed62ac97 --- /dev/null +++ b/server/src/resolver.rs @@ -0,0 +1,91 @@ +use data_types::server_id::ServerId; +use std::collections::BTreeMap; + +/// A RemoteTemplate string is a remote connection template string. +/// Occurrences of the substring "{id}" in the template will be replaced +/// by the server ID. +#[derive(Debug)] +pub struct RemoteTemplate { + template: String, +} + +impl RemoteTemplate { + pub fn new(template: impl Into) -> Self { + let template = template.into(); + Self { template } + } + + fn get(&self, id: &ServerId) -> GrpcConnectionString { + self.template.replace("{id}", &format!("{}", id.get_u32())) + } +} + +/// A gRPC connection string. +pub type GrpcConnectionString = String; + +/// The Resolver provides a mapping between ServerId and GRpcConnectionString +#[derive(Debug)] +pub struct Resolver { + /// Map between remote IOx server IDs and management API connection strings. + remotes: BTreeMap, + + /// Static map between remote server IDs and hostnames based on a template + remote_template: Option, +} + +impl Resolver { + pub fn new(remote_template: Option) -> Self { + Self { + remotes: Default::default(), + remote_template, + } + } + + /// Get all registered remote servers. + pub fn remotes_sorted(&self) -> Vec<(ServerId, String)> { + self.remotes.iter().map(|(&a, b)| (a, b.clone())).collect() + } + + /// Update given remote server. + pub fn update_remote(&mut self, id: ServerId, addr: GrpcConnectionString) { + self.remotes.insert(id, addr); + } + + /// Delete remote server by ID. + pub fn delete_remote(&mut self, id: ServerId) -> Option { + self.remotes.remove(&id) + } + + /// Get remote server by ID. + pub fn resolve_remote(&self, id: ServerId) -> Option { + self.remotes + .get(&id) + .cloned() + .or_else(|| self.remote_template.as_ref().map(|t| t.get(&id))) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::num::NonZeroU32; + + #[test] + fn resolve_remote() { + let resolver = Resolver::new(Some(RemoteTemplate::new("http://iox-query-{id}:8082"))); + + let server_id = ServerId::new(NonZeroU32::new(42).unwrap()); + let remote = resolver.resolve_remote(server_id); + assert_eq!( + remote, + Some(GrpcConnectionString::from("http://iox-query-42:8082")) + ); + + let server_id = ServerId::new(NonZeroU32::new(24).unwrap()); + let remote = resolver.resolve_remote(server_id); + assert_eq!( + remote, + Some(GrpcConnectionString::from("http://iox-query-24:8082")) + ); + } +} diff --git a/src/influxdb_ioxd/rpc/management.rs b/src/influxdb_ioxd/rpc/management.rs index 1ba5c326a5..31bedc284e 100644 --- a/src/influxdb_ioxd/rpc/management.rs +++ b/src/influxdb_ioxd/rpc/management.rs @@ -198,17 +198,15 @@ where &self, _: Request, ) -> Result, Status> { - let result = self.server.remotes_sorted(); - let remotes = match result { - Ok(remotes) => remotes - .into_iter() - .map(|(id, connection_string)| Remote { - id: id.get_u32(), - connection_string, - }) - .collect(), - Err(e) => return Err(default_server_error_handler(e)), - }; + let remotes = self + .server + .remotes_sorted() + .into_iter() + .map(|(id, connection_string)| Remote { + id: id.get_u32(), + connection_string, + }) + .collect(); Ok(Response::new(ListRemotesResponse { remotes })) } @@ -224,15 +222,9 @@ where let remote_id = ServerId::try_from(remote.id) .map_err(|_| FieldViolation::required("id").scope("remote"))?; - let result = self - .server + self.server .update_remote(remote_id, remote.connection_string); - match result { - Ok(_) => {} - Err(e) => return Err(default_server_error_handler(e)), - } - Ok(Response::new(UpdateRemoteResponse {})) } @@ -245,9 +237,8 @@ where ServerId::try_from(request.id).map_err(|_| FieldViolation::required("id"))?; match self.server.delete_remote(remote_id) { - Ok(Some(_)) => {} - Ok(None) => return Err(NotFound::default().into()), - Err(e) => return Err(default_server_error_handler(e)), + Some(_) => {} + None => return Err(NotFound::default().into()), } Ok(Response::new(DeleteRemoteResponse {})) diff --git a/tests/end_to_end_cases/management_api.rs b/tests/end_to_end_cases/management_api.rs index 07d8eb1df3..dc5d9fa9e2 100644 --- a/tests/end_to_end_cases/management_api.rs +++ b/tests/end_to_end_cases/management_api.rs @@ -67,8 +67,6 @@ async fn test_list_update_remotes() { const TEST_REMOTE_ADDR_2: &str = "4.3.2.1:4321"; const TEST_REMOTE_ADDR_2_UPDATED: &str = "40.30.20.10:4321"; - client.update_server_id(123).await.unwrap(); - let res = client.list_remotes().await.expect("list remotes failed"); assert_eq!(res.len(), 0);