feat: expose catalog timeouts via CLI/env (#4472)

This is useful for local instances that run against a prod system,
because port forwarding can lead to long connection delays.

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Marco Neumann 2022-04-29 13:14:15 +02:00 committed by GitHub
parent ed1ad858c0
commit 0a20086a58
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 194 additions and 89 deletions

1
Cargo.lock generated
View File

@ -687,6 +687,7 @@ dependencies = [
"clap 3.1.12",
"data_types",
"futures",
"humantime",
"iox_catalog",
"iox_time",
"metric",

View File

@ -7,6 +7,7 @@ edition = "2021"
clap = { version = "3", features = ["derive", "env"] }
data_types = { path = "../data_types" }
futures = "0.3"
humantime = "2.1.0"
iox_catalog = { path = "../iox_catalog" }
iox_time = { path = "../iox_time" }
metric = { path = "../metric" }

View File

@ -1,9 +1,12 @@
use iox_catalog::{
create_or_get_default_records, interface::Catalog, mem::MemCatalog, postgres::PostgresCatalog,
create_or_get_default_records,
interface::Catalog,
mem::MemCatalog,
postgres::{PostgresCatalog, PostgresConnectionOptions},
};
use observability_deps::tracing::*;
use snafu::{OptionExt, ResultExt, Snafu};
use std::{ops::DerefMut, sync::Arc};
use std::{ops::DerefMut, sync::Arc, time::Duration};
#[derive(Debug, Snafu)]
pub enum Error {
@ -16,6 +19,29 @@ pub enum Error {
},
}
fn default_max_connections() -> &'static str {
let s = PostgresConnectionOptions::DEFAULT_MAX_CONNS.to_string();
Box::leak(Box::new(s))
}
fn default_connect_timeout() -> &'static str {
let s =
humantime::format_duration(PostgresConnectionOptions::DEFAULT_CONNECT_TIMETOUT).to_string();
Box::leak(Box::new(s))
}
fn default_idle_timeout() -> &'static str {
let s =
humantime::format_duration(PostgresConnectionOptions::DEFAULT_IDLE_TIMETOUT).to_string();
Box::leak(Box::new(s))
}
fn default_hotswap_poll_interval_timeout() -> &'static str {
let s = humantime::format_duration(PostgresConnectionOptions::DEFAULT_HOTSWAP_POLL_INTERVAL)
.to_string();
Box::leak(Box::new(s))
}
/// CLI config for catalog DSN.
#[derive(Debug, Clone, clap::Parser)]
pub struct CatalogDsnConfig {
@ -36,7 +62,7 @@ pub struct CatalogDsnConfig {
#[clap(
long = "--catalog-max-connections",
env = "INFLUXDB_IOX_CATALOG_MAX_CONNECTIONS",
default_value = "10"
default_value = default_max_connections(),
)]
pub max_catalog_connections: u32,
@ -44,9 +70,39 @@ pub struct CatalogDsnConfig {
#[clap(
long = "--catalog-postgres-schema-name",
env = "INFLUXDB_IOX_CATALOG_POSTGRES_SCHEMA_NAME",
default_value = iox_catalog::postgres::SCHEMA_NAME,
default_value = PostgresConnectionOptions::DEFAULT_SCHEMA_NAME,
)]
pub postgres_schema_name: String,
/// Set the amount of time to attempt connecting to the database.
#[clap(
long = "--catalog-connect-timeout",
env = "INFLUXDB_IOX_CATALOG_CONNECT_TIMEOUT",
default_value = default_connect_timeout(),
parse(try_from_str = humantime::parse_duration),
)]
pub connect_timeout: Duration,
/// Set a maximum idle duration for individual connections.
#[clap(
long = "--catalog-idle-timeout",
env = "INFLUXDB_IOX_CATALOG_IDLE_TIMEOUT",
default_value = default_idle_timeout(),
parse(try_from_str = humantime::parse_duration),
)]
pub idle_timeout: Duration,
/// If the DSN points to a file (i.e. starts with `dsn-file://`), this sets the interval how often the the file
/// should be polled for updates.
///
/// If an update is encountered, the underlying connection pool will be hot-swapped.
#[clap(
long = "--catalog-hotswap-poll-interval",
env = "INFLUXDB_IOX_CATALOG_HOTSWAP_POLL_INTERVAL",
default_value = default_hotswap_poll_interval_timeout(),
parse(try_from_str = humantime::parse_duration),
)]
pub hotswap_poll_interval: Duration,
}
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, clap::ArgEnum)]
@ -64,8 +120,11 @@ impl CatalogDsnConfig {
Self {
catalog_type_: CatalogType::Memory,
dsn: None,
max_catalog_connections: 10,
postgres_schema_name: iox_catalog::postgres::SCHEMA_NAME.to_string(),
max_catalog_connections: PostgresConnectionOptions::DEFAULT_MAX_CONNS,
postgres_schema_name: PostgresConnectionOptions::DEFAULT_SCHEMA_NAME.to_string(),
connect_timeout: PostgresConnectionOptions::DEFAULT_CONNECT_TIMETOUT,
idle_timeout: PostgresConnectionOptions::DEFAULT_IDLE_TIMETOUT,
hotswap_poll_interval: PostgresConnectionOptions::DEFAULT_HOTSWAP_POLL_INTERVAL,
}
}
@ -76,8 +135,11 @@ impl CatalogDsnConfig {
Self {
catalog_type_: CatalogType::Postgres,
dsn: Some(dsn),
max_catalog_connections: 10,
max_catalog_connections: PostgresConnectionOptions::DEFAULT_MAX_CONNS,
postgres_schema_name,
connect_timeout: PostgresConnectionOptions::DEFAULT_CONNECT_TIMETOUT,
idle_timeout: PostgresConnectionOptions::DEFAULT_IDLE_TIMETOUT,
hotswap_poll_interval: PostgresConnectionOptions::DEFAULT_HOTSWAP_POLL_INTERVAL,
}
}
@ -87,17 +149,26 @@ impl CatalogDsnConfig {
metrics: Arc<metric::Registry>,
) -> Result<Arc<dyn Catalog>, Error> {
let catalog = match self.catalog_type_ {
CatalogType::Postgres => Arc::new(
PostgresCatalog::connect(
app_name,
&self.postgres_schema_name,
self.dsn.as_ref().context(ConnectionStringRequiredSnafu)?,
self.max_catalog_connections,
metrics,
)
.await
.context(CatalogSnafu)?,
) as Arc<dyn Catalog>,
CatalogType::Postgres => {
let options = PostgresConnectionOptions {
app_name: app_name.to_string(),
schema_name: self.postgres_schema_name.clone(),
dsn: self
.dsn
.as_ref()
.context(ConnectionStringRequiredSnafu)?
.clone(),
max_conns: self.max_catalog_connections,
connect_timeout: self.connect_timeout,
idle_timeout: self.idle_timeout,
hotswap_poll_interval: self.hotswap_poll_interval,
};
Arc::new(
PostgresCatalog::connect(options, metrics)
.await
.context(CatalogSnafu)?,
) as Arc<dyn Catalog>
}
CatalogType::Memory => {
let mem = MemCatalog::new(metrics);

View File

@ -169,7 +169,7 @@ pub struct Config {
#[clap(
long = "--catalog-postgres-schema-name",
env = "INFLUXDB_IOX_CATALOG_POSTGRES_SCHEMA_NAME",
default_value = iox_catalog::postgres::SCHEMA_NAME,
default_value = iox_catalog::postgres::PostgresConnectionOptions::DEFAULT_SCHEMA_NAME,
)]
pub postgres_schema_name: String,

View File

@ -23,16 +23,69 @@ use sqlx::{migrate::Migrator, postgres::PgPoolOptions, Acquire, Executor, Postgr
use sqlx_hotswap_pool::HotSwapPool;
use std::{sync::Arc, time::Duration};
const CONNECT_TIMEOUT: Duration = Duration::from_secs(2);
const IDLE_TIMEOUT: Duration = Duration::from_secs(60);
/// the default schema name to use in Postgres
pub const SCHEMA_NAME: &str = "iox_catalog";
/// The file pointed to by a `dsn-file://` DSN is polled for change every `HOTSWAP_POLL_INTERVAL`.
const HOTSWAP_POLL_INTERVAL: std::time::Duration = Duration::from_secs(5);
static MIGRATOR: Migrator = sqlx::migrate!();
/// Postgres connection options.
#[derive(Debug, Clone)]
pub struct PostgresConnectionOptions {
/// Application name.
///
/// This will be reported to postgres.
pub app_name: String,
/// Schema name.
pub schema_name: String,
/// DSN.
pub dsn: String,
/// Maximum number of concurrent connections.
pub max_conns: u32,
/// Set the amount of time to attempt connecting to the database.
pub connect_timeout: Duration,
/// Set a maximum idle duration for individual connections.
pub idle_timeout: Duration,
/// If the DSN points to a file (i.e. starts with `dsn-file://`), this sets the interval how often the the file
/// should be polled for updates.
///
/// If an update is encountered, the underlying connection pool will be hot-swapped.
pub hotswap_poll_interval: Duration,
}
impl PostgresConnectionOptions {
/// Default value for [`schema_name`](Self::schema_name).
pub const DEFAULT_SCHEMA_NAME: &'static str = "iox_catalog";
/// Default value for [`max_conns`](Self::max_conns).
pub const DEFAULT_MAX_CONNS: u32 = 10;
/// Default value for [`connect_timeout`](Self::connect_timeout).
pub const DEFAULT_CONNECT_TIMETOUT: Duration = Duration::from_secs(2);
/// Default value for [`idle_timeout`](Self::idle_timeout).
pub const DEFAULT_IDLE_TIMETOUT: Duration = Duration::from_secs(10);
/// Default value for [`hotswap_poll_interval`](Self::hotswap_poll_interval).
pub const DEFAULT_HOTSWAP_POLL_INTERVAL: Duration = Duration::from_secs(5);
}
impl Default for PostgresConnectionOptions {
fn default() -> Self {
Self {
app_name: String::from("iox"),
schema_name: String::from(Self::DEFAULT_SCHEMA_NAME),
dsn: String::new(),
max_conns: Self::DEFAULT_MAX_CONNS,
connect_timeout: Self::DEFAULT_CONNECT_TIMETOUT,
idle_timeout: Self::DEFAULT_IDLE_TIMETOUT,
hotswap_poll_interval: Self::DEFAULT_HOTSWAP_POLL_INTERVAL,
}
}
}
/// PostgreSQL catalog.
#[derive(Debug)]
pub struct PostgresCatalog {
@ -51,23 +104,14 @@ struct Count {
impl PostgresCatalog {
/// Connect to the catalog store.
pub async fn connect(
app_name: &str,
schema_name: &str,
dsn: &str,
max_conns: u32,
options: PostgresConnectionOptions,
metrics: Arc<metric::Registry>,
) -> Result<Self> {
let schema_name = schema_name.to_string();
let pool = new_pool(
app_name,
&schema_name,
dsn,
max_conns,
HOTSWAP_POLL_INTERVAL,
)
.await
.map_err(|e| Error::SqlxError { source: e })?;
let pool = new_pool(&options)
.await
.map_err(|e| Error::SqlxError { source: e })?;
let schema_name = options.schema_name;
Ok(Self {
pool,
metrics,
@ -280,19 +324,17 @@ impl Catalog for PostgresCatalog {
///
/// This function doesn't support the IDPE specific `dsn-file://` uri scheme.
async fn new_raw_pool(
app_name: &str,
schema_name: &str,
dsn: &str,
max_conns: u32,
options: &PostgresConnectionOptions,
parsed_dsn: &str,
) -> Result<sqlx::Pool<Postgres>, sqlx::Error> {
let app_name = app_name.to_owned();
let app_name2 = app_name.clone(); // just to log below
let schema_name = schema_name.to_owned();
let app_name = options.app_name.clone();
let app_name2 = options.app_name.clone(); // just to log below
let schema_name = options.schema_name.clone();
let pool = PgPoolOptions::new()
.min_connections(1)
.max_connections(max_conns)
.connect_timeout(CONNECT_TIMEOUT)
.idle_timeout(IDLE_TIMEOUT)
.max_connections(options.max_conns)
.connect_timeout(options.connect_timeout)
.idle_timeout(options.idle_timeout)
.test_before_acquire(true)
.after_connect(move |c| {
let app_name = app_name.to_owned();
@ -318,7 +360,7 @@ async fn new_raw_pool(
Ok(())
})
})
.connect(dsn)
.connect(parsed_dsn)
.await?;
// Log a connection was successfully established and include the application
@ -341,23 +383,18 @@ async fn new_raw_pool(
/// The [`new_raw_pool`] function will return a new pool only if the connection
/// is successfull (see [`sqlx::pool::PoolOptions::test_before_acquire`]).
async fn new_pool(
app_name: &str,
schema_name: &str,
dsn: &str,
max_conns: u32,
polling_interval: Duration,
options: &PostgresConnectionOptions,
) -> Result<HotSwapPool<Postgres>, sqlx::Error> {
let app_name: Arc<str> = Arc::from(app_name);
let schema_name: Arc<str> = Arc::from(schema_name);
let parsed_dsn = match get_dsn_file_path(dsn) {
let parsed_dsn = match get_dsn_file_path(&options.dsn) {
Some(filename) => std::fs::read_to_string(&filename)?,
None => dsn.to_owned(),
None => options.dsn.clone(),
};
let pool =
HotSwapPool::new(new_raw_pool(&app_name, &schema_name, &parsed_dsn, max_conns).await?);
let pool = HotSwapPool::new(new_raw_pool(options, &parsed_dsn).await?);
let polling_interval = options.hotswap_poll_interval;
if let Some(dsn_file) = get_dsn_file_path(dsn) {
if let Some(dsn_file) = get_dsn_file_path(&options.dsn) {
let pool = pool.clone();
let options = options.clone();
// TODO(mkm): return a guard that stops this background worker.
// We create only one pool per process, but it would be cleaner to be
@ -372,34 +409,22 @@ async fn new_pool(
tokio::time::sleep(polling_interval).await;
async fn try_update(
app_name: &str,
schema_name: &str,
options: &PostgresConnectionOptions,
current_dsn: &str,
dsn_file: &str,
max_conns: u32,
pool: &HotSwapPool<Postgres>,
) -> Result<Option<String>, sqlx::Error> {
let new_dsn = std::fs::read_to_string(&dsn_file)?;
if new_dsn == current_dsn {
Ok(None)
} else {
let new_pool =
new_raw_pool(app_name, schema_name, &new_dsn, max_conns).await?;
let new_pool = new_raw_pool(options, &new_dsn).await?;
pool.replace(new_pool);
Ok(Some(new_dsn))
}
}
match try_update(
&app_name,
&schema_name,
&current_dsn,
&dsn_file,
max_conns,
&pool,
)
.await
{
match try_update(&options, &current_dsn, &dsn_file, &pool).await {
Ok(None) => {}
Ok(Some(new_dsn)) => {
info!("replaced hotswap pool");
@ -1995,7 +2020,14 @@ mod tests {
create_db(&dsn).await;
let pg = PostgresCatalog::connect("test", &schema_name, &dsn, 3, metrics)
let options = PostgresConnectionOptions {
app_name: String::from("test"),
schema_name: schema_name.clone(),
dsn,
max_conns: 3,
..Default::default()
};
let pg = PostgresCatalog::connect(options, metrics)
.await
.expect("failed to connect catalog");
@ -2348,15 +2380,15 @@ mod tests {
// create a hot swap pool with test application name and dsn file pointing to tmp file.
// we will later update this file and the pool should be replaced.
let pool = new_pool(
TEST_APPLICATION_NAME,
"test",
dsn_good.as_str(),
3,
POLLING_INTERVAL,
)
.await
.expect("connect");
let options = PostgresConnectionOptions {
app_name: TEST_APPLICATION_NAME.to_owned(),
schema_name: String::from("test"),
dsn: dsn_good,
max_conns: 3,
hotswap_poll_interval: POLLING_INTERVAL,
..Default::default()
};
let pool = new_pool(&options).await.expect("connect");
eprintln!("got a pool");
// ensure the application name is set as expected