Merge pull request #2978 from influxdata/crepererum/run_modes_boilerplate3
refactor: make `influxdb_ioxd` server-type-genericpull/24376/head
commit
7f2594c012
|
@ -1,7 +1,18 @@
|
|||
//! Implementation of command line option for running server
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::{
|
||||
influxdb_ioxd,
|
||||
influxdb_ioxd::{
|
||||
self,
|
||||
server_type::{
|
||||
common_state::{CommonServerState, CommonServerStateError},
|
||||
database::{
|
||||
setup::{make_application, make_server},
|
||||
DatabaseServerType,
|
||||
},
|
||||
},
|
||||
},
|
||||
structopt_blocks::{boolean_flag::BooleanFlag, run_config::RunConfig},
|
||||
};
|
||||
use structopt::StructOpt;
|
||||
|
@ -10,7 +21,13 @@ use thiserror::Error;
|
|||
#[derive(Debug, Error)]
|
||||
pub enum Error {
|
||||
#[error("Run: {0}")]
|
||||
ServerError(#[from] influxdb_ioxd::Error),
|
||||
Run(#[from] influxdb_ioxd::Error),
|
||||
|
||||
#[error("Cannot setup server: {0}")]
|
||||
Setup(#[from] crate::influxdb_ioxd::server_type::database::setup::Error),
|
||||
|
||||
#[error("Invalid config: {0}")]
|
||||
InvalidConfig(#[from] CommonServerStateError),
|
||||
}
|
||||
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
|
@ -75,5 +92,15 @@ pub struct Config {
|
|||
}
|
||||
|
||||
pub async fn command(config: Config) -> Result<()> {
|
||||
Ok(influxdb_ioxd::main(config).await?)
|
||||
let common_state = CommonServerState::from_config(config.run_config.clone())?;
|
||||
|
||||
let application = make_application(&config, common_state.trace_collector()).await?;
|
||||
let app_server = make_server(Arc::clone(&application), &config);
|
||||
let server_type = Arc::new(DatabaseServerType::new(
|
||||
Arc::clone(&application),
|
||||
Arc::clone(&app_server),
|
||||
&common_state,
|
||||
));
|
||||
|
||||
Ok(influxdb_ioxd::main(common_state, server_type).await?)
|
||||
}
|
||||
|
|
|
@ -1,30 +1,17 @@
|
|||
use crate::{
|
||||
commands::run::database::Config,
|
||||
influxdb_ioxd::server_type::database::DatabaseServerType,
|
||||
structopt_blocks::{
|
||||
object_store::{check_object_store, warn_about_inmem_store},
|
||||
run_config::RunConfig,
|
||||
},
|
||||
};
|
||||
use crate::influxdb_ioxd::server_type::{common_state::CommonServerState, ServerType};
|
||||
use futures::{future::FusedFuture, pin_mut, FutureExt};
|
||||
use hyper::server::conn::AddrIncoming;
|
||||
use object_store::{self, ObjectStore};
|
||||
use observability_deps::tracing::{error, info, warn};
|
||||
use observability_deps::tracing::{error, info};
|
||||
use panic_logging::SendPanicsToTracing;
|
||||
use server::{
|
||||
connection::ConnectionManagerImpl as ConnectionManager, ApplicationState, RemoteTemplate,
|
||||
Server as AppServer, ServerConfig,
|
||||
};
|
||||
use snafu::{ResultExt, Snafu};
|
||||
use std::{convert::TryFrom, net::SocketAddr, sync::Arc};
|
||||
use trace::TraceCollector;
|
||||
use std::{net::SocketAddr, sync::Arc};
|
||||
use trace_http::ctx::TraceHeaderParser;
|
||||
|
||||
mod http;
|
||||
mod jemalloc;
|
||||
mod planner;
|
||||
pub(crate) mod rpc;
|
||||
mod server_type;
|
||||
pub(crate) mod server_type;
|
||||
pub(crate) mod serving_readiness;
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
|
@ -46,19 +33,6 @@ pub enum Error {
|
|||
|
||||
#[snafu(display("Error serving RPC: {}", source))]
|
||||
ServingRpc { source: server_type::RpcError },
|
||||
|
||||
#[snafu(display("Cannot parse object store config: {}", source))]
|
||||
ObjectStoreParsing {
|
||||
source: crate::structopt_blocks::object_store::ParseError,
|
||||
},
|
||||
|
||||
#[snafu(display("Cannot check object store config: {}", source))]
|
||||
ObjectStoreCheck {
|
||||
source: crate::structopt_blocks::object_store::CheckError,
|
||||
},
|
||||
|
||||
#[snafu(display("Cannot create tracing pipeline: {}", source))]
|
||||
Tracing { source: trace_exporters::Error },
|
||||
}
|
||||
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
|
@ -84,64 +58,6 @@ async fn wait_for_signal() {
|
|||
let _ = tokio::signal::ctrl_c().await;
|
||||
}
|
||||
|
||||
async fn make_application(
|
||||
config: &Config,
|
||||
trace_collector: Option<Arc<dyn TraceCollector>>,
|
||||
) -> Result<Arc<ApplicationState>> {
|
||||
warn_about_inmem_store(&config.run_config.object_store_config);
|
||||
let object_store = ObjectStore::try_from(&config.run_config.object_store_config)
|
||||
.context(ObjectStoreParsing)?;
|
||||
check_object_store(&object_store)
|
||||
.await
|
||||
.context(ObjectStoreCheck)?;
|
||||
let object_storage = Arc::new(object_store);
|
||||
|
||||
Ok(Arc::new(ApplicationState::new(
|
||||
object_storage,
|
||||
config.num_worker_threads,
|
||||
trace_collector,
|
||||
)))
|
||||
}
|
||||
|
||||
fn make_server(
|
||||
application: Arc<ApplicationState>,
|
||||
config: &Config,
|
||||
) -> Arc<AppServer<ConnectionManager>> {
|
||||
let server_config = ServerConfig {
|
||||
remote_template: config.remote_template.clone().map(RemoteTemplate::new),
|
||||
wipe_catalog_on_error: config.wipe_catalog_on_error.into(),
|
||||
skip_replay_and_seek_instead: config.skip_replay_and_seek_instead.into(),
|
||||
};
|
||||
|
||||
if (config.run_config.grpc_bind_address == config.run_config.http_bind_address)
|
||||
&& (config.run_config.grpc_bind_address.port() != 0)
|
||||
{
|
||||
error!(
|
||||
%config.run_config.grpc_bind_address,
|
||||
%config.run_config.http_bind_address,
|
||||
"grpc and http bind addresses must differ",
|
||||
);
|
||||
std::process::exit(1);
|
||||
}
|
||||
|
||||
let connection_manager = ConnectionManager::new();
|
||||
let app_server = Arc::new(AppServer::new(
|
||||
connection_manager,
|
||||
application,
|
||||
server_config,
|
||||
));
|
||||
|
||||
// if this ID isn't set the server won't be usable until this is set via an API
|
||||
// call
|
||||
if let Some(id) = config.run_config.server_id_config.server_id {
|
||||
app_server.set_id(id).expect("server id already set");
|
||||
} else {
|
||||
warn!("server ID not set. ID must be set via the INFLUXDB_IOX_ID config or API before writing or querying data.");
|
||||
}
|
||||
|
||||
app_server
|
||||
}
|
||||
|
||||
#[cfg(all(not(feature = "heappy"), not(feature = "jemalloc_replacing_malloc")))]
|
||||
fn build_malloc_conf() -> String {
|
||||
"system".to_string()
|
||||
|
@ -166,9 +82,16 @@ fn build_malloc_conf() -> String {
|
|||
compile_error!("must use exactly one memory allocator")
|
||||
}
|
||||
|
||||
/// This is the entry point for the IOx server. `config` represents
|
||||
/// command line arguments, if any.
|
||||
pub async fn main(config: Config) -> Result<()> {
|
||||
/// This is the entry point for the IOx server.
|
||||
///
|
||||
/// The precise server type depends on `T`. This entry point ensures that the given `server_type` is started using best
|
||||
/// practice, e.g. that we print the GIT-hash and malloc-configs, that a panic handler is installed, etc.
|
||||
///
|
||||
/// Due to the invasive nature of the setup routine, this should not be used during unit tests.
|
||||
pub async fn main<T>(common_state: CommonServerState, server_type: Arc<T>) -> Result<()>
|
||||
where
|
||||
T: ServerType,
|
||||
{
|
||||
let git_hash = option_env!("GIT_HASH").unwrap_or("UNKNOWN");
|
||||
let num_cpus = num_cpus::get();
|
||||
let build_malloc_conf = build_malloc_conf();
|
||||
|
@ -179,6 +102,17 @@ pub async fn main(config: Config) -> Result<()> {
|
|||
"InfluxDB IOx server starting",
|
||||
);
|
||||
|
||||
if (common_state.run_config().grpc_bind_address == common_state.run_config().http_bind_address)
|
||||
&& (common_state.run_config().grpc_bind_address.port() != 0)
|
||||
{
|
||||
error!(
|
||||
grpc_bind_address=%common_state.run_config().grpc_bind_address,
|
||||
http_bind_address=%common_state.run_config().http_bind_address,
|
||||
"grpc and http bind addresses must differ",
|
||||
);
|
||||
std::process::exit(1);
|
||||
}
|
||||
|
||||
// Install custom panic handler and forget about it.
|
||||
//
|
||||
// This leaks the handler and prevents it from ever being dropped during the
|
||||
|
@ -188,40 +122,26 @@ pub async fn main(config: Config) -> Result<()> {
|
|||
let f = SendPanicsToTracing::new();
|
||||
std::mem::forget(f);
|
||||
|
||||
let async_exporter = config.run_config.tracing_config.build().context(Tracing)?;
|
||||
let trace_collector = async_exporter
|
||||
.clone()
|
||||
.map(|x| -> Arc<dyn TraceCollector> { x });
|
||||
let application = make_application(&config, trace_collector).await?;
|
||||
|
||||
// Register jemalloc metrics
|
||||
application
|
||||
server_type
|
||||
.metric_registry()
|
||||
.register_instrument("jemalloc_metrics", jemalloc::JemallocMetrics::new);
|
||||
|
||||
let app_server = make_server(Arc::clone(&application), &config);
|
||||
let grpc_listener = grpc_listener(common_state.run_config().grpc_bind_address.into()).await?;
|
||||
let http_listener = http_listener(common_state.run_config().http_bind_address.into()).await?;
|
||||
|
||||
let grpc_listener = grpc_listener(config.run_config.grpc_bind_address.into()).await?;
|
||||
let http_listener = http_listener(config.run_config.http_bind_address.into()).await?;
|
||||
let trace_exporter = common_state.trace_exporter();
|
||||
let r = serve(common_state, grpc_listener, http_listener, server_type).await;
|
||||
|
||||
let r = serve(
|
||||
config.run_config,
|
||||
application,
|
||||
grpc_listener,
|
||||
http_listener,
|
||||
app_server,
|
||||
)
|
||||
.await;
|
||||
|
||||
if let Some(async_exporter) = async_exporter {
|
||||
if let Err(e) = async_exporter.drain().await {
|
||||
if let Some(trace_exporter) = trace_exporter {
|
||||
if let Err(e) = trace_exporter.drain().await {
|
||||
error!(%e, "error draining trace exporter");
|
||||
}
|
||||
}
|
||||
r
|
||||
}
|
||||
|
||||
async fn grpc_listener(addr: SocketAddr) -> Result<tokio::net::TcpListener> {
|
||||
pub async fn grpc_listener(addr: SocketAddr) -> Result<tokio::net::TcpListener> {
|
||||
let listener = tokio::net::TcpListener::bind(addr)
|
||||
.await
|
||||
.context(StartListeningGrpc { addr })?;
|
||||
|
@ -234,7 +154,7 @@ async fn grpc_listener(addr: SocketAddr) -> Result<tokio::net::TcpListener> {
|
|||
Ok(listener)
|
||||
}
|
||||
|
||||
async fn http_listener(addr: SocketAddr) -> Result<AddrIncoming> {
|
||||
pub async fn http_listener(addr: SocketAddr) -> Result<AddrIncoming> {
|
||||
let listener = AddrIncoming::bind(&addr).context(StartListeningHttp { addr })?;
|
||||
info!(bind_addr=%listener.local_addr(), "bound HTTP listener");
|
||||
|
||||
|
@ -245,30 +165,31 @@ async fn http_listener(addr: SocketAddr) -> Result<AddrIncoming> {
|
|||
/// these listeners, the Server, Databases, etc... have all exited.
|
||||
///
|
||||
/// This is effectively the "main loop" for influxdb_iox
|
||||
async fn serve(
|
||||
config: RunConfig,
|
||||
application: Arc<ApplicationState>,
|
||||
async fn serve<T>(
|
||||
common_state: CommonServerState,
|
||||
grpc_listener: tokio::net::TcpListener,
|
||||
http_listener: AddrIncoming,
|
||||
app_server: Arc<AppServer<ConnectionManager>>,
|
||||
) -> Result<()> {
|
||||
server_type: Arc<T>,
|
||||
) -> Result<()>
|
||||
where
|
||||
T: ServerType,
|
||||
{
|
||||
// Construct a token to trigger shutdown of API services
|
||||
let frontend_shutdown = tokio_util::sync::CancellationToken::new();
|
||||
|
||||
let trace_header_parser = TraceHeaderParser::new()
|
||||
.with_jaeger_trace_context_header_name(
|
||||
config
|
||||
&common_state
|
||||
.run_config()
|
||||
.tracing_config
|
||||
.traces_jaeger_trace_context_header_name,
|
||||
)
|
||||
.with_jaeger_debug_name(config.tracing_config.traces_jaeger_debug_name);
|
||||
|
||||
let server_type = Arc::new(DatabaseServerType::new(
|
||||
Arc::clone(&application),
|
||||
Arc::clone(&app_server),
|
||||
config.max_http_request_size,
|
||||
config.initial_serving_state.into(),
|
||||
));
|
||||
.with_jaeger_debug_name(
|
||||
&common_state
|
||||
.run_config()
|
||||
.tracing_config
|
||||
.traces_jaeger_debug_name,
|
||||
);
|
||||
|
||||
// Construct and start up gRPC server
|
||||
|
||||
|
@ -277,6 +198,7 @@ async fn serve(
|
|||
Arc::clone(&server_type),
|
||||
trace_header_parser.clone(),
|
||||
frontend_shutdown.clone(),
|
||||
common_state.serving_readiness().clone(),
|
||||
)
|
||||
.fuse();
|
||||
|
||||
|
@ -284,7 +206,7 @@ async fn serve(
|
|||
|
||||
let http_server = http::serve(
|
||||
http_listener,
|
||||
server_type,
|
||||
Arc::clone(&server_type),
|
||||
frontend_shutdown.clone(),
|
||||
trace_header_parser,
|
||||
)
|
||||
|
@ -295,7 +217,7 @@ async fn serve(
|
|||
log::info!("InfluxDB IOx server ready");
|
||||
|
||||
// Get IOx background worker task
|
||||
let server_worker = app_server.join().fuse();
|
||||
let server_worker = Arc::clone(&server_type).background_worker().fuse();
|
||||
|
||||
// Shutdown signal
|
||||
let signal = wait_for_signal().fuse();
|
||||
|
@ -364,492 +286,13 @@ async fn serve(
|
|||
|
||||
frontend_shutdown.cancel()
|
||||
}
|
||||
|
||||
info!("frontend shutdown completed");
|
||||
app_server.shutdown();
|
||||
|
||||
server_type.shutdown_background_worker();
|
||||
if !server_worker.is_terminated() {
|
||||
match server_worker.await {
|
||||
Ok(_) => info!("server worker shutdown"),
|
||||
Err(error) => error!(%error, "server worker error"),
|
||||
}
|
||||
server_worker.await;
|
||||
}
|
||||
|
||||
info!("server completed shutting down");
|
||||
|
||||
application.join();
|
||||
info!("shared application state completed shutting down");
|
||||
info!("backend shutdown completed");
|
||||
|
||||
res
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use ::http::{header::HeaderName, HeaderValue};
|
||||
use data_types::{database_rules::DatabaseRules, DatabaseName};
|
||||
use influxdb_iox_client::{connection::Connection, flight::PerformQuery};
|
||||
use server::rules::ProvidedDatabaseRules;
|
||||
use std::{convert::TryInto, num::NonZeroU64};
|
||||
use structopt::StructOpt;
|
||||
use tokio::task::JoinHandle;
|
||||
use trace::{
|
||||
span::{Span, SpanStatus},
|
||||
RingBufferTraceCollector,
|
||||
};
|
||||
use trace_exporters::export::{AsyncExporter, TestAsyncExporter};
|
||||
|
||||
fn test_config(server_id: Option<u32>) -> Config {
|
||||
let mut config = Config::from_iter(&[
|
||||
"run",
|
||||
"--api-bind",
|
||||
"127.0.0.1:0",
|
||||
"--grpc-bind",
|
||||
"127.0.0.1:0",
|
||||
]);
|
||||
config.run_config.server_id_config.server_id = server_id.map(|x| x.try_into().unwrap());
|
||||
config
|
||||
}
|
||||
|
||||
async fn test_serve(
|
||||
config: RunConfig,
|
||||
application: Arc<ApplicationState>,
|
||||
server: Arc<AppServer<ConnectionManager>>,
|
||||
) {
|
||||
let grpc_listener = grpc_listener(config.grpc_bind_address.into())
|
||||
.await
|
||||
.unwrap();
|
||||
let http_listener = http_listener(config.grpc_bind_address.into())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
serve(config, application, grpc_listener, http_listener, server)
|
||||
.await
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_server_shutdown() {
|
||||
test_helpers::maybe_start_logging();
|
||||
|
||||
// Create a server and wait for it to initialize
|
||||
let config = test_config(Some(23));
|
||||
let application = make_application(&config, None).await.unwrap();
|
||||
let server = make_server(Arc::clone(&application), &config);
|
||||
server.wait_for_init().await.unwrap();
|
||||
|
||||
// Start serving
|
||||
let serve_fut = test_serve(config.run_config, application, Arc::clone(&server)).fuse();
|
||||
pin_mut!(serve_fut);
|
||||
|
||||
// Nothing to trigger termination, so serve future should continue running
|
||||
futures::select! {
|
||||
_ = serve_fut => panic!("serve shouldn't finish"),
|
||||
_ = tokio::time::sleep(tokio::time::Duration::from_millis(10)).fuse() => {}
|
||||
}
|
||||
|
||||
// Trigger shutdown
|
||||
server.shutdown();
|
||||
|
||||
// The serve future should now complete
|
||||
futures::select! {
|
||||
_ = serve_fut => {},
|
||||
_ = tokio::time::sleep(tokio::time::Duration::from_secs(5)).fuse() => panic!("timeout shouldn't expire")
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_server_shutdown_uninit() {
|
||||
// Create a server but don't set a server id
|
||||
let config = test_config(None);
|
||||
let application = make_application(&config, None).await.unwrap();
|
||||
let server = make_server(Arc::clone(&application), &config);
|
||||
|
||||
let serve_fut = test_serve(config.run_config, application, Arc::clone(&server)).fuse();
|
||||
pin_mut!(serve_fut);
|
||||
|
||||
// Nothing should have triggered shutdown so serve shouldn't finish
|
||||
futures::select! {
|
||||
_ = serve_fut => panic!("serve shouldn't finish"),
|
||||
_ = tokio::time::sleep(tokio::time::Duration::from_millis(10)).fuse() => {}
|
||||
}
|
||||
|
||||
// We never set the server ID and so the server should not initialize
|
||||
assert!(
|
||||
server.wait_for_init().now_or_never().is_none(),
|
||||
"shouldn't have initialized"
|
||||
);
|
||||
|
||||
// But it should still be possible to shut it down
|
||||
server.shutdown();
|
||||
|
||||
futures::select! {
|
||||
_ = serve_fut => {},
|
||||
_ = tokio::time::sleep(tokio::time::Duration::from_secs(5)).fuse() => panic!("timeout shouldn't expire")
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_server_panic() {
|
||||
// Create a server and wait for it to initialize
|
||||
let config = test_config(Some(999999999));
|
||||
let application = make_application(&config, None).await.unwrap();
|
||||
let server = make_server(Arc::clone(&application), &config);
|
||||
server.wait_for_init().await.unwrap();
|
||||
|
||||
let serve_fut = test_serve(config.run_config, application, Arc::clone(&server)).fuse();
|
||||
pin_mut!(serve_fut);
|
||||
|
||||
// Nothing should have triggered shutdown so serve shouldn't finish
|
||||
futures::select! {
|
||||
_ = serve_fut => panic!("serve shouldn't finish"),
|
||||
_ = tokio::time::sleep(tokio::time::Duration::from_millis(10)).fuse() => {}
|
||||
}
|
||||
|
||||
// Trigger a panic in the Server background worker
|
||||
server::utils::register_panic_key("server background worker: 999999999");
|
||||
|
||||
// This should trigger shutdown
|
||||
futures::select! {
|
||||
_ = serve_fut => {},
|
||||
_ = tokio::time::sleep(tokio::time::Duration::from_secs(5)).fuse() => panic!("timeout shouldn't expire")
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_database_panic() {
|
||||
// Create a server and wait for it to initialize
|
||||
let config = test_config(Some(23));
|
||||
let application = make_application(&config, None).await.unwrap();
|
||||
let server = make_server(Arc::clone(&application), &config);
|
||||
server.wait_for_init().await.unwrap();
|
||||
|
||||
// Create a database that won't panic
|
||||
let other_db_name = DatabaseName::new("other").unwrap();
|
||||
server
|
||||
.create_database(make_rules(&other_db_name))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let other_db = server.database(&other_db_name).unwrap();
|
||||
|
||||
let serve_fut = test_serve(
|
||||
config.run_config,
|
||||
Arc::clone(&application),
|
||||
Arc::clone(&server),
|
||||
)
|
||||
.fuse();
|
||||
pin_mut!(serve_fut);
|
||||
|
||||
// Nothing should have triggered shutdown so serve shouldn't finish
|
||||
futures::select! {
|
||||
_ = serve_fut => panic!("serve shouldn't finish"),
|
||||
_ = tokio::time::sleep(tokio::time::Duration::from_millis(10)).fuse() => {}
|
||||
}
|
||||
|
||||
// Configure a panic in the worker of the database we're about to create
|
||||
server::utils::register_panic_key("database background worker: panic_test");
|
||||
|
||||
// Spawn a dummy job that will delay shutdown as it runs to completion
|
||||
let task = application
|
||||
.job_registry()
|
||||
.spawn_dummy_job(vec![1_000_000_000], None);
|
||||
|
||||
// Create database that will panic in its worker loop
|
||||
server
|
||||
.create_database(make_rules("panic_test"))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// The serve future shouldn't resolve until the dummy job finishes
|
||||
futures::select! {
|
||||
_ = serve_fut => panic!("should wait for jobs to finish"),
|
||||
_ = tokio::time::sleep(tokio::time::Duration::from_millis(10)).fuse() => {}
|
||||
}
|
||||
|
||||
assert!(!task.is_complete(), "task should still be running");
|
||||
|
||||
// But the databases should have been shutdown
|
||||
assert!(
|
||||
other_db.join().now_or_never().is_some(),
|
||||
"database should have been terminated and have finished"
|
||||
);
|
||||
|
||||
// Once the dummy job completes - the serve future should resolve
|
||||
futures::select! {
|
||||
_ = serve_fut => {},
|
||||
_ = tokio::time::sleep(tokio::time::Duration::from_secs(5)).fuse() => panic!("timeout shouldn't expire")
|
||||
}
|
||||
|
||||
assert_eq!(
|
||||
task.get_status().result().unwrap(),
|
||||
tracker::TaskResult::Success
|
||||
)
|
||||
}
|
||||
|
||||
async fn jaeger_client(addr: SocketAddr, trace: &'static str) -> Connection {
|
||||
influxdb_iox_client::connection::Builder::default()
|
||||
.header(
|
||||
HeaderName::from_static("uber-trace-id"),
|
||||
HeaderValue::from_static(trace),
|
||||
)
|
||||
.build(format!("http://{}", addr))
|
||||
.await
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
async fn tracing_server<T: TraceCollector + 'static>(
|
||||
collector: &Arc<T>,
|
||||
) -> (
|
||||
SocketAddr,
|
||||
Arc<AppServer<ConnectionManager>>,
|
||||
JoinHandle<Result<()>>,
|
||||
) {
|
||||
let config = test_config(Some(23));
|
||||
let application = make_application(&config, Some(Arc::<T>::clone(collector)))
|
||||
.await
|
||||
.unwrap();
|
||||
let server = make_server(Arc::clone(&application), &config);
|
||||
server.wait_for_init().await.unwrap();
|
||||
|
||||
let grpc_listener = grpc_listener(config.run_config.grpc_bind_address.into())
|
||||
.await
|
||||
.unwrap();
|
||||
let http_listener = http_listener(config.run_config.grpc_bind_address.into())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let addr = grpc_listener.local_addr().unwrap();
|
||||
|
||||
let fut = serve(
|
||||
config.run_config,
|
||||
application,
|
||||
grpc_listener,
|
||||
http_listener,
|
||||
Arc::clone(&server),
|
||||
);
|
||||
|
||||
let join = tokio::spawn(fut);
|
||||
(addr, server, join)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_tracing() {
|
||||
let trace_collector = Arc::new(RingBufferTraceCollector::new(20));
|
||||
let (addr, server, join) = tracing_server(&trace_collector).await;
|
||||
|
||||
let client = influxdb_iox_client::connection::Builder::default()
|
||||
.build(format!("http://{}", addr))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mut client = influxdb_iox_client::management::Client::new(client);
|
||||
|
||||
client.list_database_names().await.unwrap();
|
||||
|
||||
assert_eq!(trace_collector.spans().len(), 0);
|
||||
|
||||
let b3_tracing_client = influxdb_iox_client::connection::Builder::default()
|
||||
.header(
|
||||
HeaderName::from_static("x-b3-sampled"),
|
||||
HeaderValue::from_static("1"),
|
||||
)
|
||||
.header(
|
||||
HeaderName::from_static("x-b3-traceid"),
|
||||
HeaderValue::from_static("fea24902"),
|
||||
)
|
||||
.header(
|
||||
HeaderName::from_static("x-b3-spanid"),
|
||||
HeaderValue::from_static("ab3409"),
|
||||
)
|
||||
.build(format!("http://{}", addr))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mut b3_tracing_client = influxdb_iox_client::management::Client::new(b3_tracing_client);
|
||||
|
||||
b3_tracing_client.list_database_names().await.unwrap();
|
||||
b3_tracing_client.get_server_status().await.unwrap();
|
||||
|
||||
let conn = jaeger_client(addr, "34f9495:30e34:0:1").await;
|
||||
influxdb_iox_client::management::Client::new(conn)
|
||||
.list_database_names()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let spans = trace_collector.spans();
|
||||
assert_eq!(spans.len(), 3);
|
||||
|
||||
assert_eq!(spans[0].name, "IOx");
|
||||
assert_eq!(spans[0].ctx.parent_span_id.unwrap().0.get(), 0xab3409);
|
||||
assert_eq!(spans[0].ctx.trace_id.0.get(), 0xfea24902);
|
||||
assert!(spans[0].start.is_some());
|
||||
assert!(spans[0].end.is_some());
|
||||
assert_eq!(spans[0].status, SpanStatus::Ok);
|
||||
|
||||
assert_eq!(spans[1].name, "IOx");
|
||||
assert_eq!(spans[1].ctx.parent_span_id.unwrap().0.get(), 0xab3409);
|
||||
assert_eq!(spans[1].ctx.trace_id.0.get(), 0xfea24902);
|
||||
assert!(spans[1].start.is_some());
|
||||
assert!(spans[1].end.is_some());
|
||||
|
||||
assert_eq!(spans[2].name, "IOx");
|
||||
assert_eq!(spans[2].ctx.parent_span_id.unwrap().0.get(), 0x30e34);
|
||||
assert_eq!(spans[2].ctx.trace_id.0.get(), 0x34f9495);
|
||||
assert!(spans[2].start.is_some());
|
||||
assert!(spans[2].end.is_some());
|
||||
|
||||
assert_ne!(spans[0].ctx.span_id, spans[1].ctx.span_id);
|
||||
server.shutdown();
|
||||
join.await.unwrap().unwrap();
|
||||
}
|
||||
|
||||
/// Ensure that query is fully executed.
|
||||
async fn consume_query(mut query: PerformQuery) {
|
||||
while query.next().await.unwrap().is_some() {}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_query_tracing() {
|
||||
let collector = Arc::new(RingBufferTraceCollector::new(100));
|
||||
let (addr, server, join) = tracing_server(&collector).await;
|
||||
let conn = jaeger_client(addr, "34f8495:35e32:0:1").await;
|
||||
|
||||
let db_info = influxdb_storage_client::OrgAndBucket::new(
|
||||
NonZeroU64::new(12).unwrap(),
|
||||
NonZeroU64::new(344).unwrap(),
|
||||
);
|
||||
|
||||
// Perform a number of different requests to generate traces
|
||||
|
||||
let mut management = influxdb_iox_client::management::Client::new(conn.clone());
|
||||
management
|
||||
.create_database(
|
||||
influxdb_iox_client::management::generated_types::DatabaseRules {
|
||||
name: db_info.db_name().to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mut write = influxdb_iox_client::write::Client::new(conn.clone());
|
||||
write
|
||||
.write(db_info.db_name(), "cpu,tag0=foo val=1 100\n")
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mut flight = influxdb_iox_client::flight::Client::new(conn.clone());
|
||||
consume_query(
|
||||
flight
|
||||
.perform_query(db_info.db_name(), "select * from cpu;")
|
||||
.await
|
||||
.unwrap(),
|
||||
)
|
||||
.await;
|
||||
|
||||
flight
|
||||
.perform_query("nonexistent", "select * from cpu;")
|
||||
.await
|
||||
.unwrap_err();
|
||||
|
||||
flight
|
||||
.perform_query(db_info.db_name(), "select * from nonexistent;")
|
||||
.await
|
||||
.unwrap_err();
|
||||
|
||||
let mut storage = influxdb_storage_client::Client::new(conn);
|
||||
storage
|
||||
.tag_values(influxdb_storage_client::generated_types::TagValuesRequest {
|
||||
tags_source: Some(influxdb_storage_client::Client::read_source(&db_info, 1)),
|
||||
range: None,
|
||||
predicate: None,
|
||||
tag_key: "tag0".into(),
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
server.shutdown();
|
||||
join.await.unwrap().unwrap();
|
||||
|
||||
// Check generated traces
|
||||
|
||||
let spans = collector.spans();
|
||||
|
||||
let root_spans: Vec<_> = spans.iter().filter(|span| &span.name == "IOx").collect();
|
||||
// Made 6 requests
|
||||
assert_eq!(root_spans.len(), 6);
|
||||
|
||||
let child = |parent: &Span, name: &'static str| -> Option<&Span> {
|
||||
spans.iter().find(|span| {
|
||||
span.ctx.parent_span_id == Some(parent.ctx.span_id) && span.name == name
|
||||
})
|
||||
};
|
||||
|
||||
// Test SQL
|
||||
let sql_query_span = root_spans[2];
|
||||
assert_eq!(sql_query_span.status, SpanStatus::Ok);
|
||||
|
||||
let ctx_span = child(sql_query_span, "Query Execution").unwrap();
|
||||
let planner_span = child(ctx_span, "Planner").unwrap();
|
||||
let sql_span = child(planner_span, "sql").unwrap();
|
||||
let prepare_sql_span = child(sql_span, "prepare_sql").unwrap();
|
||||
child(prepare_sql_span, "prepare_plan").unwrap();
|
||||
|
||||
let execute_span = child(ctx_span, "execute_stream_partitioned").unwrap();
|
||||
let coalesce_span = child(execute_span, "CoalescePartitionsEx").unwrap();
|
||||
|
||||
// validate spans from DataFusion ExecutionPlan are present
|
||||
child(coalesce_span, "ProjectionExec: expr").unwrap();
|
||||
|
||||
let database_not_found = root_spans[3];
|
||||
assert_eq!(database_not_found.status, SpanStatus::Err);
|
||||
assert!(database_not_found
|
||||
.events
|
||||
.iter()
|
||||
.any(|event| event.msg.as_ref() == "not found"));
|
||||
|
||||
let table_not_found = root_spans[4];
|
||||
assert_eq!(table_not_found.status, SpanStatus::Err);
|
||||
assert!(table_not_found
|
||||
.events
|
||||
.iter()
|
||||
.any(|event| event.msg.as_ref() == "invalid argument"));
|
||||
|
||||
// Test tag_values
|
||||
let storage_span = root_spans[5];
|
||||
let ctx_span = child(storage_span, "Query Execution").unwrap();
|
||||
child(ctx_span, "Planner").unwrap();
|
||||
|
||||
let to_string_set = child(ctx_span, "to_string_set").unwrap();
|
||||
child(to_string_set, "run_logical_plans").unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_async_exporter() {
|
||||
let (sender, mut receiver) = tokio::sync::mpsc::channel(20);
|
||||
let collector = Arc::new(AsyncExporter::new(TestAsyncExporter::new(sender)));
|
||||
|
||||
let (addr, server, join) = tracing_server(&collector).await;
|
||||
let conn = jaeger_client(addr, "34f8495:30e34:0:1").await;
|
||||
influxdb_iox_client::management::Client::new(conn)
|
||||
.list_database_names()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
collector.drain().await.unwrap();
|
||||
|
||||
server.shutdown();
|
||||
join.await.unwrap().unwrap();
|
||||
|
||||
let span = receiver.recv().await.unwrap();
|
||||
assert_eq!(span.ctx.trace_id.get(), 0x34f8495);
|
||||
assert_eq!(span.ctx.parent_span_id.unwrap().get(), 0x30e34);
|
||||
}
|
||||
|
||||
fn make_rules(db_name: impl Into<String>) -> ProvidedDatabaseRules {
|
||||
let db_name = DatabaseName::new(db_name.into()).unwrap();
|
||||
ProvidedDatabaseRules::new_rules(DatabaseRules::new(db_name).into())
|
||||
.expect("Tests should create valid DatabaseRules")
|
||||
}
|
||||
}
|
||||
|
|
|
@ -6,7 +6,10 @@ use tonic::transport::NamedService;
|
|||
use tonic_health::server::HealthReporter;
|
||||
use trace_http::ctx::TraceHeaderParser;
|
||||
|
||||
use crate::influxdb_ioxd::server_type::{RpcError, ServerType};
|
||||
use crate::influxdb_ioxd::{
|
||||
server_type::{RpcError, ServerType},
|
||||
serving_readiness::ServingReadiness,
|
||||
};
|
||||
|
||||
pub mod error;
|
||||
pub(crate) mod testing;
|
||||
|
@ -16,17 +19,21 @@ pub fn service_name<S: NamedService>(_: &S) -> &'static str {
|
|||
S::NAME
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct RpcBuilderInput {
|
||||
pub socket: TcpListener,
|
||||
pub trace_header_parser: TraceHeaderParser,
|
||||
pub shutdown: CancellationToken,
|
||||
pub serving_readiness: ServingReadiness,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct RpcBuilder<T> {
|
||||
pub inner: T,
|
||||
pub health_reporter: HealthReporter,
|
||||
pub shutdown: CancellationToken,
|
||||
pub socket: TcpListener,
|
||||
pub serving_readiness: ServingReadiness,
|
||||
}
|
||||
|
||||
/// Adds a gRPC service to the builder, and registers it with the
|
||||
|
@ -46,6 +53,7 @@ macro_rules! add_service {
|
|||
mut health_reporter,
|
||||
shutdown,
|
||||
socket,
|
||||
serving_readiness,
|
||||
} = $builder;
|
||||
let service = $svc;
|
||||
|
||||
|
@ -61,6 +69,7 @@ macro_rules! add_service {
|
|||
health_reporter,
|
||||
shutdown,
|
||||
socket,
|
||||
serving_readiness,
|
||||
}
|
||||
}
|
||||
};
|
||||
|
@ -72,11 +81,11 @@ pub(crate) use add_service;
|
|||
/// Adds a gRPC service to the builder gated behind the serving
|
||||
/// readiness check, and registers it with the health reporter
|
||||
macro_rules! add_gated_service {
|
||||
($builder:ident, $serving_readiness:expr, $svc:expr) => {
|
||||
($builder:ident, $svc:expr) => {
|
||||
let $builder = {
|
||||
let service = $svc;
|
||||
|
||||
let interceptor = $serving_readiness.clone().into_interceptor();
|
||||
let interceptor = $builder.serving_readiness.clone().into_interceptor();
|
||||
let service = tonic::codegen::InterceptedService::new(service, interceptor);
|
||||
|
||||
add_service!($builder, service);
|
||||
|
@ -103,6 +112,7 @@ macro_rules! setup_builder {
|
|||
socket,
|
||||
trace_header_parser,
|
||||
shutdown,
|
||||
serving_readiness,
|
||||
} = $input;
|
||||
|
||||
let (health_reporter, health_service) = tonic_health::server::health_reporter();
|
||||
|
@ -124,6 +134,7 @@ macro_rules! setup_builder {
|
|||
health_reporter,
|
||||
shutdown,
|
||||
socket,
|
||||
serving_readiness,
|
||||
};
|
||||
|
||||
// important that this one is NOT gated so that it can answer health requests
|
||||
|
@ -168,6 +179,7 @@ pub async fn serve<T>(
|
|||
server_type: Arc<T>,
|
||||
trace_header_parser: TraceHeaderParser,
|
||||
shutdown: CancellationToken,
|
||||
serving_readiness: ServingReadiness,
|
||||
) -> Result<(), RpcError>
|
||||
where
|
||||
T: ServerType,
|
||||
|
@ -176,6 +188,7 @@ where
|
|||
socket,
|
||||
trace_header_parser,
|
||||
shutdown,
|
||||
serving_readiness,
|
||||
};
|
||||
|
||||
server_type.server_grpc(builder_input).await
|
||||
|
|
|
@ -0,0 +1,64 @@
|
|||
use std::sync::Arc;
|
||||
|
||||
use snafu::{ResultExt, Snafu};
|
||||
use trace::TraceCollector;
|
||||
|
||||
use crate::{
|
||||
influxdb_ioxd::serving_readiness::ServingReadiness, structopt_blocks::run_config::RunConfig,
|
||||
};
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
pub enum CommonServerStateError {
|
||||
#[snafu(display("Cannot create tracing pipeline: {}", source))]
|
||||
Tracing { source: trace_exporters::Error },
|
||||
}
|
||||
|
||||
/// Common state used by all server types (e.g. `Database` and `Router`)
|
||||
#[derive(Debug)]
|
||||
pub struct CommonServerState {
|
||||
run_config: RunConfig,
|
||||
serving_readiness: ServingReadiness,
|
||||
trace_exporter: Option<Arc<trace_exporters::export::AsyncExporter>>,
|
||||
}
|
||||
|
||||
impl CommonServerState {
|
||||
pub fn from_config(run_config: RunConfig) -> Result<Self, CommonServerStateError> {
|
||||
let serving_readiness = run_config.initial_serving_state.clone().into();
|
||||
let trace_exporter = run_config.tracing_config.build().context(Tracing)?;
|
||||
|
||||
Ok(Self {
|
||||
run_config,
|
||||
serving_readiness,
|
||||
trace_exporter,
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub fn for_testing() -> Self {
|
||||
use structopt::StructOpt;
|
||||
|
||||
Self::from_config(
|
||||
RunConfig::from_iter_safe(["not_used".to_string()].into_iter())
|
||||
.expect("default parsing should work"),
|
||||
)
|
||||
.expect("default configs should work")
|
||||
}
|
||||
|
||||
pub fn run_config(&self) -> &RunConfig {
|
||||
&self.run_config
|
||||
}
|
||||
|
||||
pub fn serving_readiness(&self) -> &ServingReadiness {
|
||||
&self.serving_readiness
|
||||
}
|
||||
|
||||
pub fn trace_exporter(&self) -> Option<Arc<trace_exporters::export::AsyncExporter>> {
|
||||
self.trace_exporter.clone()
|
||||
}
|
||||
|
||||
pub fn trace_collector(&self) -> Option<Arc<dyn TraceCollector>> {
|
||||
self.trace_exporter
|
||||
.clone()
|
||||
.map(|x| -> Arc<dyn TraceCollector> { x })
|
||||
}
|
||||
}
|
|
@ -566,8 +566,7 @@ async fn query<M: ConnectionManager + Send + Sync + Debug + 'static>(
|
|||
mod tests {
|
||||
use crate::influxdb_ioxd::{
|
||||
http::test_utils::{check_response, get_content_type, TestServer},
|
||||
server_type::ServerType,
|
||||
serving_readiness::ServingReadiness,
|
||||
server_type::{common_state::CommonServerState, ServerType},
|
||||
};
|
||||
|
||||
use super::*;
|
||||
|
@ -1301,13 +1300,10 @@ mod tests {
|
|||
application: Arc<ApplicationState>,
|
||||
server: Arc<Server<ConnectionManagerImpl>>,
|
||||
) -> TestServer<DatabaseServerType<ConnectionManagerImpl>> {
|
||||
let server_type = Arc::new(DatabaseServerType::new(
|
||||
application,
|
||||
server,
|
||||
TEST_MAX_REQUEST_SIZE,
|
||||
ServingReadiness::new(Default::default()),
|
||||
));
|
||||
TestServer::new(server_type)
|
||||
let mut server_type =
|
||||
DatabaseServerType::new(application, server, &CommonServerState::for_testing());
|
||||
server_type.max_request_size = TEST_MAX_REQUEST_SIZE;
|
||||
TestServer::new(Arc::new(server_type))
|
||||
}
|
||||
|
||||
/// Run the specified SQL query and return formatted results as a string
|
||||
|
|
|
@ -1,9 +1,12 @@
|
|||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use futures::{future::FusedFuture, FutureExt};
|
||||
use hyper::{Body, Request, Response};
|
||||
use metric::Registry;
|
||||
use observability_deps::tracing::{error, info};
|
||||
use server::{connection::ConnectionManager, ApplicationState, Server};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use trace::TraceCollector;
|
||||
|
||||
use crate::influxdb_ioxd::{
|
||||
|
@ -15,9 +18,12 @@ use crate::influxdb_ioxd::{
|
|||
|
||||
mod http;
|
||||
mod rpc;
|
||||
pub mod setup;
|
||||
|
||||
pub use self::http::ApplicationError;
|
||||
|
||||
use super::common_state::CommonServerState;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct DatabaseServerType<M>
|
||||
where
|
||||
|
@ -28,6 +34,7 @@ where
|
|||
pub lp_metrics: Arc<LineProtocolMetrics>,
|
||||
pub max_request_size: usize,
|
||||
pub serving_readiness: ServingReadiness,
|
||||
shutdown: CancellationToken,
|
||||
}
|
||||
|
||||
impl<M> DatabaseServerType<M>
|
||||
|
@ -37,8 +44,7 @@ where
|
|||
pub fn new(
|
||||
application: Arc<ApplicationState>,
|
||||
server: Arc<Server<M>>,
|
||||
max_request_size: usize,
|
||||
serving_readiness: ServingReadiness,
|
||||
common_state: &CommonServerState,
|
||||
) -> Self {
|
||||
let lp_metrics = Arc::new(LineProtocolMetrics::new(
|
||||
application.metric_registry().as_ref(),
|
||||
|
@ -48,8 +54,9 @@ where
|
|||
application,
|
||||
server,
|
||||
lp_metrics,
|
||||
max_request_size,
|
||||
serving_readiness,
|
||||
max_request_size: common_state.run_config().max_http_request_size,
|
||||
serving_readiness: common_state.serving_readiness().clone(),
|
||||
shutdown: CancellationToken::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -79,4 +86,518 @@ where
|
|||
async fn server_grpc(self: Arc<Self>, builder_input: RpcBuilderInput) -> Result<(), RpcError> {
|
||||
self::rpc::server_grpc(self, builder_input).await
|
||||
}
|
||||
|
||||
async fn background_worker(self: Arc<Self>) {
|
||||
let server_worker = self.server.join().fuse();
|
||||
futures::pin_mut!(server_worker);
|
||||
|
||||
futures::select! {
|
||||
_ = server_worker => {},
|
||||
_ = self.shutdown.cancelled().fuse() => {},
|
||||
}
|
||||
|
||||
self.server.shutdown();
|
||||
|
||||
if !server_worker.is_terminated() {
|
||||
match server_worker.await {
|
||||
Ok(_) => info!("server worker shutdown"),
|
||||
Err(error) => error!(%error, "server worker error"),
|
||||
}
|
||||
}
|
||||
|
||||
info!("server completed shutting down");
|
||||
|
||||
self.application.join();
|
||||
info!("shared application state completed shutting down");
|
||||
}
|
||||
|
||||
fn shutdown_background_worker(&self) {
|
||||
self.server.shutdown();
|
||||
self.application.join();
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::{
|
||||
commands::run::database::Config,
|
||||
influxdb_ioxd::{
|
||||
grpc_listener, http_listener, serve,
|
||||
server_type::database::setup::{make_application, make_server},
|
||||
},
|
||||
structopt_blocks::run_config::RunConfig,
|
||||
};
|
||||
|
||||
use super::*;
|
||||
use ::http::{header::HeaderName, HeaderValue};
|
||||
use data_types::{database_rules::DatabaseRules, DatabaseName};
|
||||
use futures::pin_mut;
|
||||
use influxdb_iox_client::{connection::Connection, flight::PerformQuery};
|
||||
use server::{connection::ConnectionManagerImpl, rules::ProvidedDatabaseRules};
|
||||
use std::{convert::TryInto, net::SocketAddr, num::NonZeroU64};
|
||||
use structopt::StructOpt;
|
||||
use tokio::task::JoinHandle;
|
||||
use trace::{
|
||||
span::{Span, SpanStatus},
|
||||
RingBufferTraceCollector,
|
||||
};
|
||||
use trace_exporters::export::{AsyncExporter, TestAsyncExporter};
|
||||
|
||||
fn test_config(server_id: Option<u32>) -> Config {
|
||||
let mut config = Config::from_iter(&[
|
||||
"run",
|
||||
"--api-bind",
|
||||
"127.0.0.1:0",
|
||||
"--grpc-bind",
|
||||
"127.0.0.1:0",
|
||||
]);
|
||||
config.run_config.server_id_config.server_id = server_id.map(|x| x.try_into().unwrap());
|
||||
config
|
||||
}
|
||||
|
||||
async fn test_serve(
|
||||
config: RunConfig,
|
||||
application: Arc<ApplicationState>,
|
||||
server: Arc<Server<ConnectionManagerImpl>>,
|
||||
) {
|
||||
let grpc_listener = grpc_listener(config.grpc_bind_address.into())
|
||||
.await
|
||||
.unwrap();
|
||||
let http_listener = http_listener(config.grpc_bind_address.into())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let common_state = CommonServerState::from_config(config).unwrap();
|
||||
let server_type = Arc::new(DatabaseServerType::new(application, server, &common_state));
|
||||
|
||||
serve(common_state, grpc_listener, http_listener, server_type)
|
||||
.await
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_server_shutdown() {
|
||||
test_helpers::maybe_start_logging();
|
||||
|
||||
// Create a server and wait for it to initialize
|
||||
let config = test_config(Some(23));
|
||||
let application = make_application(&config, None).await.unwrap();
|
||||
let server = make_server(Arc::clone(&application), &config);
|
||||
server.wait_for_init().await.unwrap();
|
||||
|
||||
// Start serving
|
||||
let serve_fut = test_serve(config.run_config, application, Arc::clone(&server)).fuse();
|
||||
pin_mut!(serve_fut);
|
||||
|
||||
// Nothing to trigger termination, so serve future should continue running
|
||||
futures::select! {
|
||||
_ = serve_fut => panic!("serve shouldn't finish"),
|
||||
_ = tokio::time::sleep(tokio::time::Duration::from_millis(10)).fuse() => {}
|
||||
}
|
||||
|
||||
// Trigger shutdown
|
||||
server.shutdown();
|
||||
|
||||
// The serve future should now complete
|
||||
futures::select! {
|
||||
_ = serve_fut => {},
|
||||
_ = tokio::time::sleep(tokio::time::Duration::from_secs(5)).fuse() => panic!("timeout shouldn't expire")
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_server_shutdown_uninit() {
|
||||
// Create a server but don't set a server id
|
||||
let config = test_config(None);
|
||||
let application = make_application(&config, None).await.unwrap();
|
||||
let server = make_server(Arc::clone(&application), &config);
|
||||
|
||||
let serve_fut = test_serve(config.run_config, application, Arc::clone(&server)).fuse();
|
||||
pin_mut!(serve_fut);
|
||||
|
||||
// Nothing should have triggered shutdown so serve shouldn't finish
|
||||
futures::select! {
|
||||
_ = serve_fut => panic!("serve shouldn't finish"),
|
||||
_ = tokio::time::sleep(tokio::time::Duration::from_millis(10)).fuse() => {}
|
||||
}
|
||||
|
||||
// We never set the server ID and so the server should not initialize
|
||||
assert!(
|
||||
server.wait_for_init().now_or_never().is_none(),
|
||||
"shouldn't have initialized"
|
||||
);
|
||||
|
||||
// But it should still be possible to shut it down
|
||||
server.shutdown();
|
||||
|
||||
futures::select! {
|
||||
_ = serve_fut => {},
|
||||
_ = tokio::time::sleep(tokio::time::Duration::from_secs(5)).fuse() => panic!("timeout shouldn't expire")
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_server_panic() {
|
||||
// Create a server and wait for it to initialize
|
||||
let config = test_config(Some(999999999));
|
||||
let application = make_application(&config, None).await.unwrap();
|
||||
let server = make_server(Arc::clone(&application), &config);
|
||||
server.wait_for_init().await.unwrap();
|
||||
|
||||
let serve_fut = test_serve(config.run_config, application, Arc::clone(&server)).fuse();
|
||||
pin_mut!(serve_fut);
|
||||
|
||||
// Nothing should have triggered shutdown so serve shouldn't finish
|
||||
futures::select! {
|
||||
_ = serve_fut => panic!("serve shouldn't finish"),
|
||||
_ = tokio::time::sleep(tokio::time::Duration::from_millis(10)).fuse() => {}
|
||||
}
|
||||
|
||||
// Trigger a panic in the Server background worker
|
||||
server::utils::register_panic_key("server background worker: 999999999");
|
||||
|
||||
// This should trigger shutdown
|
||||
futures::select! {
|
||||
_ = serve_fut => {},
|
||||
_ = tokio::time::sleep(tokio::time::Duration::from_secs(5)).fuse() => panic!("timeout shouldn't expire")
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_database_panic() {
|
||||
// Create a server and wait for it to initialize
|
||||
let config = test_config(Some(23));
|
||||
let application = make_application(&config, None).await.unwrap();
|
||||
let server = make_server(Arc::clone(&application), &config);
|
||||
server.wait_for_init().await.unwrap();
|
||||
|
||||
// Create a database that won't panic
|
||||
let other_db_name = DatabaseName::new("other").unwrap();
|
||||
server
|
||||
.create_database(make_rules(&other_db_name))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let other_db = server.database(&other_db_name).unwrap();
|
||||
|
||||
let serve_fut = test_serve(
|
||||
config.run_config,
|
||||
Arc::clone(&application),
|
||||
Arc::clone(&server),
|
||||
)
|
||||
.fuse();
|
||||
pin_mut!(serve_fut);
|
||||
|
||||
// Nothing should have triggered shutdown so serve shouldn't finish
|
||||
futures::select! {
|
||||
_ = serve_fut => panic!("serve shouldn't finish"),
|
||||
_ = tokio::time::sleep(tokio::time::Duration::from_millis(10)).fuse() => {}
|
||||
}
|
||||
|
||||
// Configure a panic in the worker of the database we're about to create
|
||||
server::utils::register_panic_key("database background worker: panic_test");
|
||||
|
||||
// Spawn a dummy job that will delay shutdown as it runs to completion
|
||||
let task = application
|
||||
.job_registry()
|
||||
.spawn_dummy_job(vec![1_000_000_000], None);
|
||||
|
||||
// Create database that will panic in its worker loop
|
||||
server
|
||||
.create_database(make_rules("panic_test"))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// The serve future shouldn't resolve until the dummy job finishes
|
||||
futures::select! {
|
||||
_ = serve_fut => panic!("should wait for jobs to finish"),
|
||||
_ = tokio::time::sleep(tokio::time::Duration::from_millis(10)).fuse() => {}
|
||||
}
|
||||
|
||||
assert!(!task.is_complete(), "task should still be running");
|
||||
|
||||
// But the databases should have been shutdown
|
||||
assert!(
|
||||
other_db.join().now_or_never().is_some(),
|
||||
"database should have been terminated and have finished"
|
||||
);
|
||||
|
||||
// Once the dummy job completes - the serve future should resolve
|
||||
futures::select! {
|
||||
_ = serve_fut => {},
|
||||
_ = tokio::time::sleep(tokio::time::Duration::from_secs(5)).fuse() => panic!("timeout shouldn't expire")
|
||||
}
|
||||
|
||||
assert_eq!(
|
||||
task.get_status().result().unwrap(),
|
||||
tracker::TaskResult::Success
|
||||
)
|
||||
}
|
||||
|
||||
async fn jaeger_client(addr: SocketAddr, trace: &'static str) -> Connection {
|
||||
influxdb_iox_client::connection::Builder::default()
|
||||
.header(
|
||||
HeaderName::from_static("uber-trace-id"),
|
||||
HeaderValue::from_static(trace),
|
||||
)
|
||||
.build(format!("http://{}", addr))
|
||||
.await
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
async fn tracing_server<T: TraceCollector + 'static>(
|
||||
collector: &Arc<T>,
|
||||
) -> (
|
||||
SocketAddr,
|
||||
Arc<Server<ConnectionManagerImpl>>,
|
||||
JoinHandle<crate::influxdb_ioxd::Result<()>>,
|
||||
) {
|
||||
let config = test_config(Some(23));
|
||||
let application = make_application(&config, Some(Arc::<T>::clone(collector)))
|
||||
.await
|
||||
.unwrap();
|
||||
let server = make_server(Arc::clone(&application), &config);
|
||||
server.wait_for_init().await.unwrap();
|
||||
|
||||
let grpc_listener = grpc_listener(config.run_config.grpc_bind_address.into())
|
||||
.await
|
||||
.unwrap();
|
||||
let http_listener = http_listener(config.run_config.grpc_bind_address.into())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let addr = grpc_listener.local_addr().unwrap();
|
||||
|
||||
let common_state = CommonServerState::from_config(config.run_config.clone()).unwrap();
|
||||
let server_type = Arc::new(DatabaseServerType::new(
|
||||
application,
|
||||
Arc::clone(&server),
|
||||
&common_state,
|
||||
));
|
||||
|
||||
let fut = serve(common_state, grpc_listener, http_listener, server_type);
|
||||
|
||||
let join = tokio::spawn(fut);
|
||||
(addr, server, join)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_tracing() {
|
||||
let trace_collector = Arc::new(RingBufferTraceCollector::new(20));
|
||||
let (addr, server, join) = tracing_server(&trace_collector).await;
|
||||
|
||||
let client = influxdb_iox_client::connection::Builder::default()
|
||||
.build(format!("http://{}", addr))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mut client = influxdb_iox_client::management::Client::new(client);
|
||||
|
||||
client.list_database_names().await.unwrap();
|
||||
|
||||
assert_eq!(trace_collector.spans().len(), 0);
|
||||
|
||||
let b3_tracing_client = influxdb_iox_client::connection::Builder::default()
|
||||
.header(
|
||||
HeaderName::from_static("x-b3-sampled"),
|
||||
HeaderValue::from_static("1"),
|
||||
)
|
||||
.header(
|
||||
HeaderName::from_static("x-b3-traceid"),
|
||||
HeaderValue::from_static("fea24902"),
|
||||
)
|
||||
.header(
|
||||
HeaderName::from_static("x-b3-spanid"),
|
||||
HeaderValue::from_static("ab3409"),
|
||||
)
|
||||
.build(format!("http://{}", addr))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mut b3_tracing_client = influxdb_iox_client::management::Client::new(b3_tracing_client);
|
||||
|
||||
b3_tracing_client.list_database_names().await.unwrap();
|
||||
b3_tracing_client.get_server_status().await.unwrap();
|
||||
|
||||
let conn = jaeger_client(addr, "34f9495:30e34:0:1").await;
|
||||
influxdb_iox_client::management::Client::new(conn)
|
||||
.list_database_names()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let spans = trace_collector.spans();
|
||||
assert_eq!(spans.len(), 3);
|
||||
|
||||
assert_eq!(spans[0].name, "IOx");
|
||||
assert_eq!(spans[0].ctx.parent_span_id.unwrap().0.get(), 0xab3409);
|
||||
assert_eq!(spans[0].ctx.trace_id.0.get(), 0xfea24902);
|
||||
assert!(spans[0].start.is_some());
|
||||
assert!(spans[0].end.is_some());
|
||||
assert_eq!(spans[0].status, SpanStatus::Ok);
|
||||
|
||||
assert_eq!(spans[1].name, "IOx");
|
||||
assert_eq!(spans[1].ctx.parent_span_id.unwrap().0.get(), 0xab3409);
|
||||
assert_eq!(spans[1].ctx.trace_id.0.get(), 0xfea24902);
|
||||
assert!(spans[1].start.is_some());
|
||||
assert!(spans[1].end.is_some());
|
||||
|
||||
assert_eq!(spans[2].name, "IOx");
|
||||
assert_eq!(spans[2].ctx.parent_span_id.unwrap().0.get(), 0x30e34);
|
||||
assert_eq!(spans[2].ctx.trace_id.0.get(), 0x34f9495);
|
||||
assert!(spans[2].start.is_some());
|
||||
assert!(spans[2].end.is_some());
|
||||
|
||||
assert_ne!(spans[0].ctx.span_id, spans[1].ctx.span_id);
|
||||
server.shutdown();
|
||||
join.await.unwrap().unwrap();
|
||||
}
|
||||
|
||||
/// Ensure that query is fully executed.
|
||||
async fn consume_query(mut query: PerformQuery) {
|
||||
while query.next().await.unwrap().is_some() {}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_query_tracing() {
|
||||
let collector = Arc::new(RingBufferTraceCollector::new(100));
|
||||
let (addr, server, join) = tracing_server(&collector).await;
|
||||
let conn = jaeger_client(addr, "34f8495:35e32:0:1").await;
|
||||
|
||||
let db_info = influxdb_storage_client::OrgAndBucket::new(
|
||||
NonZeroU64::new(12).unwrap(),
|
||||
NonZeroU64::new(344).unwrap(),
|
||||
);
|
||||
|
||||
// Perform a number of different requests to generate traces
|
||||
|
||||
let mut management = influxdb_iox_client::management::Client::new(conn.clone());
|
||||
management
|
||||
.create_database(
|
||||
influxdb_iox_client::management::generated_types::DatabaseRules {
|
||||
name: db_info.db_name().to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mut write = influxdb_iox_client::write::Client::new(conn.clone());
|
||||
write
|
||||
.write(db_info.db_name(), "cpu,tag0=foo val=1 100\n")
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mut flight = influxdb_iox_client::flight::Client::new(conn.clone());
|
||||
consume_query(
|
||||
flight
|
||||
.perform_query(db_info.db_name(), "select * from cpu;")
|
||||
.await
|
||||
.unwrap(),
|
||||
)
|
||||
.await;
|
||||
|
||||
flight
|
||||
.perform_query("nonexistent", "select * from cpu;")
|
||||
.await
|
||||
.unwrap_err();
|
||||
|
||||
flight
|
||||
.perform_query(db_info.db_name(), "select * from nonexistent;")
|
||||
.await
|
||||
.unwrap_err();
|
||||
|
||||
let mut storage = influxdb_storage_client::Client::new(conn);
|
||||
storage
|
||||
.tag_values(influxdb_storage_client::generated_types::TagValuesRequest {
|
||||
tags_source: Some(influxdb_storage_client::Client::read_source(&db_info, 1)),
|
||||
range: None,
|
||||
predicate: None,
|
||||
tag_key: "tag0".into(),
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
server.shutdown();
|
||||
join.await.unwrap().unwrap();
|
||||
|
||||
// Check generated traces
|
||||
|
||||
let spans = collector.spans();
|
||||
|
||||
let root_spans: Vec<_> = spans.iter().filter(|span| &span.name == "IOx").collect();
|
||||
// Made 6 requests
|
||||
assert_eq!(root_spans.len(), 6);
|
||||
|
||||
let child = |parent: &Span, name: &'static str| -> Option<&Span> {
|
||||
spans.iter().find(|span| {
|
||||
span.ctx.parent_span_id == Some(parent.ctx.span_id) && span.name == name
|
||||
})
|
||||
};
|
||||
|
||||
// Test SQL
|
||||
let sql_query_span = root_spans[2];
|
||||
assert_eq!(sql_query_span.status, SpanStatus::Ok);
|
||||
|
||||
let ctx_span = child(sql_query_span, "Query Execution").unwrap();
|
||||
let planner_span = child(ctx_span, "Planner").unwrap();
|
||||
let sql_span = child(planner_span, "sql").unwrap();
|
||||
let prepare_sql_span = child(sql_span, "prepare_sql").unwrap();
|
||||
child(prepare_sql_span, "prepare_plan").unwrap();
|
||||
|
||||
let execute_span = child(ctx_span, "execute_stream_partitioned").unwrap();
|
||||
let coalesce_span = child(execute_span, "CoalescePartitionsEx").unwrap();
|
||||
|
||||
// validate spans from DataFusion ExecutionPlan are present
|
||||
child(coalesce_span, "ProjectionExec: expr").unwrap();
|
||||
|
||||
let database_not_found = root_spans[3];
|
||||
assert_eq!(database_not_found.status, SpanStatus::Err);
|
||||
assert!(database_not_found
|
||||
.events
|
||||
.iter()
|
||||
.any(|event| event.msg.as_ref() == "not found"));
|
||||
|
||||
let table_not_found = root_spans[4];
|
||||
assert_eq!(table_not_found.status, SpanStatus::Err);
|
||||
assert!(table_not_found
|
||||
.events
|
||||
.iter()
|
||||
.any(|event| event.msg.as_ref() == "invalid argument"));
|
||||
|
||||
// Test tag_values
|
||||
let storage_span = root_spans[5];
|
||||
let ctx_span = child(storage_span, "Query Execution").unwrap();
|
||||
child(ctx_span, "Planner").unwrap();
|
||||
|
||||
let to_string_set = child(ctx_span, "to_string_set").unwrap();
|
||||
child(to_string_set, "run_logical_plans").unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_async_exporter() {
|
||||
let (sender, mut receiver) = tokio::sync::mpsc::channel(20);
|
||||
let collector = Arc::new(AsyncExporter::new(TestAsyncExporter::new(sender)));
|
||||
|
||||
let (addr, server, join) = tracing_server(&collector).await;
|
||||
let conn = jaeger_client(addr, "34f8495:30e34:0:1").await;
|
||||
influxdb_iox_client::management::Client::new(conn)
|
||||
.list_database_names()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
collector.drain().await.unwrap();
|
||||
|
||||
server.shutdown();
|
||||
join.await.unwrap().unwrap();
|
||||
|
||||
let span = receiver.recv().await.unwrap();
|
||||
assert_eq!(span.ctx.trace_id.get(), 0x34f8495);
|
||||
assert_eq!(span.ctx.parent_span_id.unwrap().get(), 0x30e34);
|
||||
}
|
||||
|
||||
fn make_rules(db_name: impl Into<String>) -> ProvidedDatabaseRules {
|
||||
let db_name = DatabaseName::new(db_name.into()).unwrap();
|
||||
ProvidedDatabaseRules::new_rules(DatabaseRules::new(db_name).into())
|
||||
.expect("Tests should create valid DatabaseRules")
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,22 +25,15 @@ where
|
|||
|
||||
add_gated_service!(
|
||||
builder,
|
||||
server_type.serving_readiness,
|
||||
storage::make_server(Arc::clone(&server_type.server),)
|
||||
);
|
||||
add_gated_service!(
|
||||
builder,
|
||||
server_type.serving_readiness,
|
||||
flight::make_server(Arc::clone(&server_type.server))
|
||||
);
|
||||
add_gated_service!(builder, write::make_server(Arc::clone(&server_type.server)));
|
||||
add_gated_service!(
|
||||
builder,
|
||||
server_type.serving_readiness,
|
||||
write::make_server(Arc::clone(&server_type.server))
|
||||
);
|
||||
add_gated_service!(
|
||||
builder,
|
||||
server_type.serving_readiness,
|
||||
write_pb::make_server(Arc::clone(&server_type.server))
|
||||
);
|
||||
// Also important this is not behind a readiness check (as it is
|
||||
|
|
|
@ -0,0 +1,72 @@
|
|||
use std::sync::Arc;
|
||||
|
||||
use object_store::ObjectStore;
|
||||
use observability_deps::tracing::warn;
|
||||
use server::{
|
||||
connection::ConnectionManagerImpl, ApplicationState, RemoteTemplate, Server, ServerConfig,
|
||||
};
|
||||
use snafu::{ResultExt, Snafu};
|
||||
use trace::TraceCollector;
|
||||
|
||||
use crate::{
|
||||
commands::run::database::Config,
|
||||
structopt_blocks::object_store::{check_object_store, warn_about_inmem_store},
|
||||
};
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
pub enum Error {
|
||||
#[snafu(display("Cannot parse object store config: {}", source))]
|
||||
ObjectStoreParsing {
|
||||
source: crate::structopt_blocks::object_store::ParseError,
|
||||
},
|
||||
|
||||
#[snafu(display("Cannot check object store config: {}", source))]
|
||||
ObjectStoreCheck {
|
||||
source: crate::structopt_blocks::object_store::CheckError,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
|
||||
pub async fn make_application(
|
||||
config: &Config,
|
||||
trace_collector: Option<Arc<dyn TraceCollector>>,
|
||||
) -> Result<Arc<ApplicationState>> {
|
||||
warn_about_inmem_store(&config.run_config.object_store_config);
|
||||
let object_store = ObjectStore::try_from(&config.run_config.object_store_config)
|
||||
.context(ObjectStoreParsing)?;
|
||||
check_object_store(&object_store)
|
||||
.await
|
||||
.context(ObjectStoreCheck)?;
|
||||
let object_storage = Arc::new(object_store);
|
||||
|
||||
Ok(Arc::new(ApplicationState::new(
|
||||
object_storage,
|
||||
config.num_worker_threads,
|
||||
trace_collector,
|
||||
)))
|
||||
}
|
||||
|
||||
pub fn make_server(
|
||||
application: Arc<ApplicationState>,
|
||||
config: &Config,
|
||||
) -> Arc<Server<ConnectionManagerImpl>> {
|
||||
let server_config = ServerConfig {
|
||||
remote_template: config.remote_template.clone().map(RemoteTemplate::new),
|
||||
wipe_catalog_on_error: config.wipe_catalog_on_error.into(),
|
||||
skip_replay_and_seek_instead: config.skip_replay_and_seek_instead.into(),
|
||||
};
|
||||
|
||||
let connection_manager = ConnectionManagerImpl::new();
|
||||
let app_server = Arc::new(Server::new(connection_manager, application, server_config));
|
||||
|
||||
// if this ID isn't set the server won't be usable until this is set via an API
|
||||
// call
|
||||
if let Some(id) = config.run_config.server_id_config.server_id {
|
||||
app_server.set_id(id).expect("server id already set");
|
||||
} else {
|
||||
warn!("server ID not set. ID must be set via the INFLUXDB_IOX_ID config or API before writing or querying data.");
|
||||
}
|
||||
|
||||
app_server
|
||||
}
|
|
@ -8,6 +8,7 @@ use trace::TraceCollector;
|
|||
|
||||
use super::rpc::RpcBuilderInput;
|
||||
|
||||
pub mod common_state;
|
||||
pub mod database;
|
||||
|
||||
/// Constants used in API error codes.
|
||||
|
@ -120,4 +121,8 @@ pub trait ServerType: std::fmt::Debug + Send + Sync + 'static {
|
|||
) -> Result<Response<Body>, Self::RouteError>;
|
||||
|
||||
async fn server_grpc(self: Arc<Self>, builder_input: RpcBuilderInput) -> Result<(), RpcError>;
|
||||
|
||||
async fn background_worker(self: Arc<Self>);
|
||||
|
||||
fn shutdown_background_worker(&self);
|
||||
}
|
||||
|
|
|
@ -16,7 +16,7 @@ pub const DEFAULT_API_BIND_ADDR: &str = "127.0.0.1:8080";
|
|||
pub const DEFAULT_GRPC_BIND_ADDR: &str = "127.0.0.1:8082";
|
||||
|
||||
/// Common config for all `run` commands.
|
||||
#[derive(Debug, StructOpt)]
|
||||
#[derive(Debug, StructOpt, Clone)]
|
||||
pub struct RunConfig {
|
||||
/// logging options
|
||||
#[structopt(flatten)]
|
||||
|
|
Loading…
Reference in New Issue