Merge branch 'main' into crepererum/remove_snapshotting
commit
aac00d4fa6
|
@ -48,10 +48,14 @@ impl<E> From<Error> for UpdateError<E> {
|
|||
}
|
||||
|
||||
impl Config {
|
||||
pub(crate) fn new(jobs: Arc<JobRegistry>, metric_registry: Arc<MetricRegistry>) -> Self {
|
||||
pub(crate) fn new(
|
||||
jobs: Arc<JobRegistry>,
|
||||
metric_registry: Arc<MetricRegistry>,
|
||||
remote_template: Option<RemoteTemplate>,
|
||||
) -> Self {
|
||||
Self {
|
||||
shutdown: Default::default(),
|
||||
state: Default::default(),
|
||||
state: RwLock::new(ConfigState::new(remote_template)),
|
||||
jobs,
|
||||
metric_registry,
|
||||
}
|
||||
|
@ -120,7 +124,11 @@ impl Config {
|
|||
|
||||
pub(crate) fn resolve_remote(&self, id: ServerId) -> Option<GRpcConnectionString> {
|
||||
let state = self.state.read().expect("mutex poisoned");
|
||||
state.remotes.get(&id).cloned()
|
||||
state
|
||||
.remotes
|
||||
.get(&id)
|
||||
.cloned()
|
||||
.or_else(|| state.remote_template.as_ref().map(|t| t.get(&id)))
|
||||
}
|
||||
|
||||
fn commit(
|
||||
|
@ -233,6 +241,36 @@ struct ConfigState {
|
|||
databases: BTreeMap<DatabaseName<'static>, DatabaseState>,
|
||||
/// Map between remote IOx server IDs and management API connection strings.
|
||||
remotes: BTreeMap<ServerId, GRpcConnectionString>,
|
||||
/// Static map between remote server IDs and hostnames based on a template
|
||||
remote_template: Option<RemoteTemplate>,
|
||||
}
|
||||
|
||||
impl ConfigState {
|
||||
fn new(remote_template: Option<RemoteTemplate>) -> Self {
|
||||
Self {
|
||||
remote_template,
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A RemoteTemplate string is a remote connection template string.
|
||||
/// Occurrences of the substring "{id}" in the template will be replaced
|
||||
/// by the server ID.
|
||||
#[derive(Debug)]
|
||||
pub struct RemoteTemplate {
|
||||
template: String,
|
||||
}
|
||||
|
||||
impl RemoteTemplate {
|
||||
pub fn new(template: impl Into<String>) -> Self {
|
||||
let template = template.into();
|
||||
Self { template }
|
||||
}
|
||||
|
||||
fn get(&self, id: &ServerId) -> GRpcConnectionString {
|
||||
self.template.replace("{id}", &format!("{}", id.get_u32()))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
|
@ -316,12 +354,17 @@ mod test {
|
|||
use crate::db::load_preserved_catalog;
|
||||
|
||||
use super::*;
|
||||
use std::num::NonZeroU32;
|
||||
|
||||
#[tokio::test]
|
||||
async fn create_db() {
|
||||
let name = DatabaseName::new("foo").unwrap();
|
||||
let metric_registry = Arc::new(metrics::MetricRegistry::new());
|
||||
let config = Config::new(Arc::new(JobRegistry::new()), Arc::clone(&metric_registry));
|
||||
let config = Config::new(
|
||||
Arc::new(JobRegistry::new()),
|
||||
Arc::clone(&metric_registry),
|
||||
None,
|
||||
);
|
||||
let rules = DatabaseRules::new(name.clone());
|
||||
|
||||
{
|
||||
|
@ -363,7 +406,11 @@ mod test {
|
|||
async fn test_db_drop() {
|
||||
let name = DatabaseName::new("foo").unwrap();
|
||||
let metric_registry = Arc::new(metrics::MetricRegistry::new());
|
||||
let config = Config::new(Arc::new(JobRegistry::new()), Arc::clone(&metric_registry));
|
||||
let config = Config::new(
|
||||
Arc::new(JobRegistry::new()),
|
||||
Arc::clone(&metric_registry),
|
||||
None,
|
||||
);
|
||||
let rules = DatabaseRules::new(name.clone());
|
||||
|
||||
let db_reservation = config.create_db(rules).unwrap();
|
||||
|
@ -412,4 +459,28 @@ mod test {
|
|||
|
||||
assert_eq!(rules_path, expected_path);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn resolve_remote() {
|
||||
let metric_registry = Arc::new(metrics::MetricRegistry::new());
|
||||
let config = Config::new(
|
||||
Arc::new(JobRegistry::new()),
|
||||
Arc::clone(&metric_registry),
|
||||
Some(RemoteTemplate::new("http://iox-query-{id}:8082")),
|
||||
);
|
||||
|
||||
let server_id = ServerId::new(NonZeroU32::new(42).unwrap());
|
||||
let remote = config.resolve_remote(server_id);
|
||||
assert_eq!(
|
||||
remote,
|
||||
Some(GRpcConnectionString::from("http://iox-query-42:8082"))
|
||||
);
|
||||
|
||||
let server_id = ServerId::new(NonZeroU32::new(24).unwrap());
|
||||
let remote = config.resolve_remote(server_id);
|
||||
assert_eq!(
|
||||
remote,
|
||||
Some(GRpcConnectionString::from("http://iox-query-24:8082"))
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -93,6 +93,7 @@ use object_store::{path::ObjectStorePath, ObjectStore, ObjectStoreApi};
|
|||
use query::{exec::Executor, DatabaseStore};
|
||||
use tracker::{TaskId, TaskRegistration, TaskRegistryWithHistory, TaskTracker, TrackedFutureExt};
|
||||
|
||||
pub use crate::config::RemoteTemplate;
|
||||
use crate::{
|
||||
config::{
|
||||
object_store_path_for_database_config, Config, GRpcConnectionString, DB_RULES_FILE_NAME,
|
||||
|
@ -228,15 +229,22 @@ pub struct ServerConfig {
|
|||
object_store: Arc<ObjectStore>,
|
||||
|
||||
metric_registry: Arc<MetricRegistry>,
|
||||
|
||||
remote_template: Option<RemoteTemplate>,
|
||||
}
|
||||
|
||||
impl ServerConfig {
|
||||
/// Create a new config using the specified store.
|
||||
pub fn new(object_store: Arc<ObjectStore>, metric_registry: Arc<MetricRegistry>) -> Self {
|
||||
pub fn new(
|
||||
object_store: Arc<ObjectStore>,
|
||||
metric_registry: Arc<MetricRegistry>,
|
||||
remote_template: Option<RemoteTemplate>,
|
||||
) -> Self {
|
||||
Self {
|
||||
num_worker_threads: None,
|
||||
object_store,
|
||||
metric_registry,
|
||||
remote_template,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -390,12 +398,17 @@ impl<M: ConnectionManager> Server<M> {
|
|||
object_store,
|
||||
// to test the metrics provide a different registry to the `ServerConfig`.
|
||||
metric_registry,
|
||||
remote_template,
|
||||
} = config;
|
||||
let num_worker_threads = num_worker_threads.unwrap_or_else(num_cpus::get);
|
||||
|
||||
Self {
|
||||
id: Default::default(),
|
||||
config: Arc::new(Config::new(Arc::clone(&jobs), Arc::clone(&metric_registry))),
|
||||
config: Arc::new(Config::new(
|
||||
Arc::clone(&jobs),
|
||||
Arc::clone(&metric_registry),
|
||||
remote_template,
|
||||
)),
|
||||
store: object_store,
|
||||
connection_manager: Arc::new(connection_manager),
|
||||
exec: Arc::new(Executor::new(num_worker_threads)),
|
||||
|
@ -1057,11 +1070,7 @@ mod tests {
|
|||
let test_registry = metrics::TestMetricRegistry::new(Arc::clone(®istry));
|
||||
(
|
||||
test_registry,
|
||||
ServerConfig::new(
|
||||
Arc::new(object_store),
|
||||
registry, // new registry ensures test isolation of metrics
|
||||
)
|
||||
.with_num_worker_threads(1),
|
||||
ServerConfig::new(Arc::new(object_store), registry, None).with_num_worker_threads(1),
|
||||
)
|
||||
}
|
||||
|
||||
|
@ -1160,8 +1169,8 @@ mod tests {
|
|||
store.list_with_delimiter(&store.new_path()).await.unwrap();
|
||||
|
||||
let manager = TestConnectionManager::new();
|
||||
let config2 =
|
||||
ServerConfig::new(store, Arc::new(MetricRegistry::new())).with_num_worker_threads(1);
|
||||
let config2 = ServerConfig::new(store, Arc::new(MetricRegistry::new()), Option::None)
|
||||
.with_num_worker_threads(1);
|
||||
let server2 = Server::new(manager, config2);
|
||||
server2.set_id(ServerId::try_from(1).unwrap()).unwrap();
|
||||
server2.load_database_configs().await.unwrap();
|
||||
|
|
|
@ -397,6 +397,15 @@ Possible values (case insensitive):
|
|||
/// environments.
|
||||
#[structopt(long = "--azure-storage-access-key", env = "AZURE_STORAGE_ACCESS_KEY")]
|
||||
pub azure_storage_access_key: Option<String>,
|
||||
|
||||
/// When IOx nodes need to talk to remote peers they consult an internal remote address
|
||||
/// mapping. This mapping is populated via API calls. If the mapping doesn't produce
|
||||
/// a result, this config entry allows to generate a hostname from at template:
|
||||
/// occurrences of the "{id}" substring will be replaced with the remote Server ID.
|
||||
///
|
||||
/// Example: http://node-{id}.ioxmydomain.com:8082
|
||||
#[structopt(long = "--remote-template", env = "INFLUXDB_IOX_REMOTE_TEMPLATE")]
|
||||
pub remote_template: Option<String>,
|
||||
}
|
||||
|
||||
pub async fn command(config: Config) -> Result<()> {
|
||||
|
|
|
@ -7,7 +7,7 @@ use object_store::{
|
|||
use observability_deps::tracing::{self, error, info, warn, Instrument};
|
||||
use panic_logging::SendPanicsToTracing;
|
||||
use server::{
|
||||
ConnectionManagerImpl as ConnectionManager, Server as AppServer,
|
||||
ConnectionManagerImpl as ConnectionManager, RemoteTemplate, Server as AppServer,
|
||||
ServerConfig as AppServerConfig,
|
||||
};
|
||||
use snafu::{ResultExt, Snafu};
|
||||
|
@ -123,7 +123,8 @@ pub async fn main(config: Config) -> Result<()> {
|
|||
let object_store = ObjectStore::try_from(&config)?;
|
||||
let object_storage = Arc::new(object_store);
|
||||
let metric_registry = Arc::new(metrics::MetricRegistry::new());
|
||||
let server_config = AppServerConfig::new(object_storage, metric_registry);
|
||||
let remote_template = config.remote_template.map(RemoteTemplate::new);
|
||||
let server_config = AppServerConfig::new(object_storage, metric_registry, remote_template);
|
||||
|
||||
let server_config = if let Some(n) = config.num_worker_threads {
|
||||
info!(
|
||||
|
|
|
@ -868,6 +868,7 @@ mod tests {
|
|||
AppServerConfig::new(
|
||||
Arc::new(ObjectStore::new_in_memory(InMemory::new())),
|
||||
registry,
|
||||
None,
|
||||
)
|
||||
.with_num_worker_threads(1),
|
||||
)
|
||||
|
|
Loading…
Reference in New Issue