diff --git a/influxdb_iox/src/commands/run/database.rs b/influxdb_iox/src/commands/run/database.rs index a874bdef78..676fbf1a25 100644 --- a/influxdb_iox/src/commands/run/database.rs +++ b/influxdb_iox/src/commands/run/database.rs @@ -1,7 +1,18 @@ //! Implementation of command line option for running server +use std::sync::Arc; + use crate::{ - influxdb_ioxd, + influxdb_ioxd::{ + self, + server_type::{ + common_state::{CommonServerState, CommonServerStateError}, + database::{ + setup::{make_application, make_server}, + DatabaseServerType, + }, + }, + }, structopt_blocks::{boolean_flag::BooleanFlag, run_config::RunConfig}, }; use structopt::StructOpt; @@ -10,7 +21,13 @@ use thiserror::Error; #[derive(Debug, Error)] pub enum Error { #[error("Run: {0}")] - ServerError(#[from] influxdb_ioxd::Error), + Run(#[from] influxdb_ioxd::Error), + + #[error("Cannot setup server: {0}")] + Setup(#[from] crate::influxdb_ioxd::server_type::database::setup::Error), + + #[error("Invalid config: {0}")] + InvalidConfig(#[from] CommonServerStateError), } pub type Result = std::result::Result; @@ -75,5 +92,15 @@ pub struct Config { } pub async fn command(config: Config) -> Result<()> { - Ok(influxdb_ioxd::main(config).await?) + let common_state = CommonServerState::from_config(config.run_config.clone())?; + + let application = make_application(&config, common_state.trace_collector()).await?; + let app_server = make_server(Arc::clone(&application), &config); + let server_type = Arc::new(DatabaseServerType::new( + Arc::clone(&application), + Arc::clone(&app_server), + &common_state, + )); + + Ok(influxdb_ioxd::main(common_state, server_type).await?) } diff --git a/influxdb_iox/src/influxdb_ioxd.rs b/influxdb_iox/src/influxdb_ioxd.rs index f199cb81cc..a46479e956 100644 --- a/influxdb_iox/src/influxdb_ioxd.rs +++ b/influxdb_iox/src/influxdb_ioxd.rs @@ -1,30 +1,17 @@ -use crate::{ - commands::run::database::Config, - influxdb_ioxd::server_type::database::DatabaseServerType, - structopt_blocks::{ - object_store::{check_object_store, warn_about_inmem_store}, - run_config::RunConfig, - }, -}; +use crate::influxdb_ioxd::server_type::{common_state::CommonServerState, ServerType}; use futures::{future::FusedFuture, pin_mut, FutureExt}; use hyper::server::conn::AddrIncoming; -use object_store::{self, ObjectStore}; -use observability_deps::tracing::{error, info, warn}; +use observability_deps::tracing::{error, info}; use panic_logging::SendPanicsToTracing; -use server::{ - connection::ConnectionManagerImpl as ConnectionManager, ApplicationState, RemoteTemplate, - Server as AppServer, ServerConfig, -}; use snafu::{ResultExt, Snafu}; -use std::{convert::TryFrom, net::SocketAddr, sync::Arc}; -use trace::TraceCollector; +use std::{net::SocketAddr, sync::Arc}; use trace_http::ctx::TraceHeaderParser; mod http; mod jemalloc; mod planner; pub(crate) mod rpc; -mod server_type; +pub(crate) mod server_type; pub(crate) mod serving_readiness; #[derive(Debug, Snafu)] @@ -46,19 +33,6 @@ pub enum Error { #[snafu(display("Error serving RPC: {}", source))] ServingRpc { source: server_type::RpcError }, - - #[snafu(display("Cannot parse object store config: {}", source))] - ObjectStoreParsing { - source: crate::structopt_blocks::object_store::ParseError, - }, - - #[snafu(display("Cannot check object store config: {}", source))] - ObjectStoreCheck { - source: crate::structopt_blocks::object_store::CheckError, - }, - - #[snafu(display("Cannot create tracing pipeline: {}", source))] - Tracing { source: trace_exporters::Error }, } pub type Result = std::result::Result; @@ -84,64 +58,6 @@ async fn wait_for_signal() { let _ = tokio::signal::ctrl_c().await; } -async fn make_application( - config: &Config, - trace_collector: Option>, -) -> Result> { - warn_about_inmem_store(&config.run_config.object_store_config); - let object_store = ObjectStore::try_from(&config.run_config.object_store_config) - .context(ObjectStoreParsing)?; - check_object_store(&object_store) - .await - .context(ObjectStoreCheck)?; - let object_storage = Arc::new(object_store); - - Ok(Arc::new(ApplicationState::new( - object_storage, - config.num_worker_threads, - trace_collector, - ))) -} - -fn make_server( - application: Arc, - config: &Config, -) -> Arc> { - let server_config = ServerConfig { - remote_template: config.remote_template.clone().map(RemoteTemplate::new), - wipe_catalog_on_error: config.wipe_catalog_on_error.into(), - skip_replay_and_seek_instead: config.skip_replay_and_seek_instead.into(), - }; - - if (config.run_config.grpc_bind_address == config.run_config.http_bind_address) - && (config.run_config.grpc_bind_address.port() != 0) - { - error!( - %config.run_config.grpc_bind_address, - %config.run_config.http_bind_address, - "grpc and http bind addresses must differ", - ); - std::process::exit(1); - } - - let connection_manager = ConnectionManager::new(); - let app_server = Arc::new(AppServer::new( - connection_manager, - application, - server_config, - )); - - // if this ID isn't set the server won't be usable until this is set via an API - // call - if let Some(id) = config.run_config.server_id_config.server_id { - app_server.set_id(id).expect("server id already set"); - } else { - warn!("server ID not set. ID must be set via the INFLUXDB_IOX_ID config or API before writing or querying data."); - } - - app_server -} - #[cfg(all(not(feature = "heappy"), not(feature = "jemalloc_replacing_malloc")))] fn build_malloc_conf() -> String { "system".to_string() @@ -166,9 +82,16 @@ fn build_malloc_conf() -> String { compile_error!("must use exactly one memory allocator") } -/// This is the entry point for the IOx server. `config` represents -/// command line arguments, if any. -pub async fn main(config: Config) -> Result<()> { +/// This is the entry point for the IOx server. +/// +/// The precise server type depends on `T`. This entry point ensures that the given `server_type` is started using best +/// practice, e.g. that we print the GIT-hash and malloc-configs, that a panic handler is installed, etc. +/// +/// Due to the invasive nature of the setup routine, this should not be used during unit tests. +pub async fn main(common_state: CommonServerState, server_type: Arc) -> Result<()> +where + T: ServerType, +{ let git_hash = option_env!("GIT_HASH").unwrap_or("UNKNOWN"); let num_cpus = num_cpus::get(); let build_malloc_conf = build_malloc_conf(); @@ -179,6 +102,17 @@ pub async fn main(config: Config) -> Result<()> { "InfluxDB IOx server starting", ); + if (common_state.run_config().grpc_bind_address == common_state.run_config().http_bind_address) + && (common_state.run_config().grpc_bind_address.port() != 0) + { + error!( + grpc_bind_address=%common_state.run_config().grpc_bind_address, + http_bind_address=%common_state.run_config().http_bind_address, + "grpc and http bind addresses must differ", + ); + std::process::exit(1); + } + // Install custom panic handler and forget about it. // // This leaks the handler and prevents it from ever being dropped during the @@ -188,40 +122,26 @@ pub async fn main(config: Config) -> Result<()> { let f = SendPanicsToTracing::new(); std::mem::forget(f); - let async_exporter = config.run_config.tracing_config.build().context(Tracing)?; - let trace_collector = async_exporter - .clone() - .map(|x| -> Arc { x }); - let application = make_application(&config, trace_collector).await?; - // Register jemalloc metrics - application + server_type .metric_registry() .register_instrument("jemalloc_metrics", jemalloc::JemallocMetrics::new); - let app_server = make_server(Arc::clone(&application), &config); + let grpc_listener = grpc_listener(common_state.run_config().grpc_bind_address.into()).await?; + let http_listener = http_listener(common_state.run_config().http_bind_address.into()).await?; - let grpc_listener = grpc_listener(config.run_config.grpc_bind_address.into()).await?; - let http_listener = http_listener(config.run_config.http_bind_address.into()).await?; + let trace_exporter = common_state.trace_exporter(); + let r = serve(common_state, grpc_listener, http_listener, server_type).await; - let r = serve( - config.run_config, - application, - grpc_listener, - http_listener, - app_server, - ) - .await; - - if let Some(async_exporter) = async_exporter { - if let Err(e) = async_exporter.drain().await { + if let Some(trace_exporter) = trace_exporter { + if let Err(e) = trace_exporter.drain().await { error!(%e, "error draining trace exporter"); } } r } -async fn grpc_listener(addr: SocketAddr) -> Result { +pub async fn grpc_listener(addr: SocketAddr) -> Result { let listener = tokio::net::TcpListener::bind(addr) .await .context(StartListeningGrpc { addr })?; @@ -234,7 +154,7 @@ async fn grpc_listener(addr: SocketAddr) -> Result { Ok(listener) } -async fn http_listener(addr: SocketAddr) -> Result { +pub async fn http_listener(addr: SocketAddr) -> Result { let listener = AddrIncoming::bind(&addr).context(StartListeningHttp { addr })?; info!(bind_addr=%listener.local_addr(), "bound HTTP listener"); @@ -245,30 +165,31 @@ async fn http_listener(addr: SocketAddr) -> Result { /// these listeners, the Server, Databases, etc... have all exited. /// /// This is effectively the "main loop" for influxdb_iox -async fn serve( - config: RunConfig, - application: Arc, +async fn serve( + common_state: CommonServerState, grpc_listener: tokio::net::TcpListener, http_listener: AddrIncoming, - app_server: Arc>, -) -> Result<()> { + server_type: Arc, +) -> Result<()> +where + T: ServerType, +{ // Construct a token to trigger shutdown of API services let frontend_shutdown = tokio_util::sync::CancellationToken::new(); let trace_header_parser = TraceHeaderParser::new() .with_jaeger_trace_context_header_name( - config + &common_state + .run_config() .tracing_config .traces_jaeger_trace_context_header_name, ) - .with_jaeger_debug_name(config.tracing_config.traces_jaeger_debug_name); - - let server_type = Arc::new(DatabaseServerType::new( - Arc::clone(&application), - Arc::clone(&app_server), - config.max_http_request_size, - config.initial_serving_state.into(), - )); + .with_jaeger_debug_name( + &common_state + .run_config() + .tracing_config + .traces_jaeger_debug_name, + ); // Construct and start up gRPC server @@ -277,6 +198,7 @@ async fn serve( Arc::clone(&server_type), trace_header_parser.clone(), frontend_shutdown.clone(), + common_state.serving_readiness().clone(), ) .fuse(); @@ -284,7 +206,7 @@ async fn serve( let http_server = http::serve( http_listener, - server_type, + Arc::clone(&server_type), frontend_shutdown.clone(), trace_header_parser, ) @@ -295,7 +217,7 @@ async fn serve( log::info!("InfluxDB IOx server ready"); // Get IOx background worker task - let server_worker = app_server.join().fuse(); + let server_worker = Arc::clone(&server_type).background_worker().fuse(); // Shutdown signal let signal = wait_for_signal().fuse(); @@ -364,492 +286,13 @@ async fn serve( frontend_shutdown.cancel() } - info!("frontend shutdown completed"); - app_server.shutdown(); + server_type.shutdown_background_worker(); if !server_worker.is_terminated() { - match server_worker.await { - Ok(_) => info!("server worker shutdown"), - Err(error) => error!(%error, "server worker error"), - } + server_worker.await; } - - info!("server completed shutting down"); - - application.join(); - info!("shared application state completed shutting down"); + info!("backend shutdown completed"); res } - -#[cfg(test)] -mod tests { - use super::*; - use ::http::{header::HeaderName, HeaderValue}; - use data_types::{database_rules::DatabaseRules, DatabaseName}; - use influxdb_iox_client::{connection::Connection, flight::PerformQuery}; - use server::rules::ProvidedDatabaseRules; - use std::{convert::TryInto, num::NonZeroU64}; - use structopt::StructOpt; - use tokio::task::JoinHandle; - use trace::{ - span::{Span, SpanStatus}, - RingBufferTraceCollector, - }; - use trace_exporters::export::{AsyncExporter, TestAsyncExporter}; - - fn test_config(server_id: Option) -> Config { - let mut config = Config::from_iter(&[ - "run", - "--api-bind", - "127.0.0.1:0", - "--grpc-bind", - "127.0.0.1:0", - ]); - config.run_config.server_id_config.server_id = server_id.map(|x| x.try_into().unwrap()); - config - } - - async fn test_serve( - config: RunConfig, - application: Arc, - server: Arc>, - ) { - let grpc_listener = grpc_listener(config.grpc_bind_address.into()) - .await - .unwrap(); - let http_listener = http_listener(config.grpc_bind_address.into()) - .await - .unwrap(); - - serve(config, application, grpc_listener, http_listener, server) - .await - .unwrap() - } - - #[tokio::test] - async fn test_server_shutdown() { - test_helpers::maybe_start_logging(); - - // Create a server and wait for it to initialize - let config = test_config(Some(23)); - let application = make_application(&config, None).await.unwrap(); - let server = make_server(Arc::clone(&application), &config); - server.wait_for_init().await.unwrap(); - - // Start serving - let serve_fut = test_serve(config.run_config, application, Arc::clone(&server)).fuse(); - pin_mut!(serve_fut); - - // Nothing to trigger termination, so serve future should continue running - futures::select! { - _ = serve_fut => panic!("serve shouldn't finish"), - _ = tokio::time::sleep(tokio::time::Duration::from_millis(10)).fuse() => {} - } - - // Trigger shutdown - server.shutdown(); - - // The serve future should now complete - futures::select! { - _ = serve_fut => {}, - _ = tokio::time::sleep(tokio::time::Duration::from_secs(5)).fuse() => panic!("timeout shouldn't expire") - } - } - - #[tokio::test] - async fn test_server_shutdown_uninit() { - // Create a server but don't set a server id - let config = test_config(None); - let application = make_application(&config, None).await.unwrap(); - let server = make_server(Arc::clone(&application), &config); - - let serve_fut = test_serve(config.run_config, application, Arc::clone(&server)).fuse(); - pin_mut!(serve_fut); - - // Nothing should have triggered shutdown so serve shouldn't finish - futures::select! { - _ = serve_fut => panic!("serve shouldn't finish"), - _ = tokio::time::sleep(tokio::time::Duration::from_millis(10)).fuse() => {} - } - - // We never set the server ID and so the server should not initialize - assert!( - server.wait_for_init().now_or_never().is_none(), - "shouldn't have initialized" - ); - - // But it should still be possible to shut it down - server.shutdown(); - - futures::select! { - _ = serve_fut => {}, - _ = tokio::time::sleep(tokio::time::Duration::from_secs(5)).fuse() => panic!("timeout shouldn't expire") - } - } - - #[tokio::test] - async fn test_server_panic() { - // Create a server and wait for it to initialize - let config = test_config(Some(999999999)); - let application = make_application(&config, None).await.unwrap(); - let server = make_server(Arc::clone(&application), &config); - server.wait_for_init().await.unwrap(); - - let serve_fut = test_serve(config.run_config, application, Arc::clone(&server)).fuse(); - pin_mut!(serve_fut); - - // Nothing should have triggered shutdown so serve shouldn't finish - futures::select! { - _ = serve_fut => panic!("serve shouldn't finish"), - _ = tokio::time::sleep(tokio::time::Duration::from_millis(10)).fuse() => {} - } - - // Trigger a panic in the Server background worker - server::utils::register_panic_key("server background worker: 999999999"); - - // This should trigger shutdown - futures::select! { - _ = serve_fut => {}, - _ = tokio::time::sleep(tokio::time::Duration::from_secs(5)).fuse() => panic!("timeout shouldn't expire") - } - } - - #[tokio::test] - async fn test_database_panic() { - // Create a server and wait for it to initialize - let config = test_config(Some(23)); - let application = make_application(&config, None).await.unwrap(); - let server = make_server(Arc::clone(&application), &config); - server.wait_for_init().await.unwrap(); - - // Create a database that won't panic - let other_db_name = DatabaseName::new("other").unwrap(); - server - .create_database(make_rules(&other_db_name)) - .await - .unwrap(); - - let other_db = server.database(&other_db_name).unwrap(); - - let serve_fut = test_serve( - config.run_config, - Arc::clone(&application), - Arc::clone(&server), - ) - .fuse(); - pin_mut!(serve_fut); - - // Nothing should have triggered shutdown so serve shouldn't finish - futures::select! { - _ = serve_fut => panic!("serve shouldn't finish"), - _ = tokio::time::sleep(tokio::time::Duration::from_millis(10)).fuse() => {} - } - - // Configure a panic in the worker of the database we're about to create - server::utils::register_panic_key("database background worker: panic_test"); - - // Spawn a dummy job that will delay shutdown as it runs to completion - let task = application - .job_registry() - .spawn_dummy_job(vec![1_000_000_000], None); - - // Create database that will panic in its worker loop - server - .create_database(make_rules("panic_test")) - .await - .unwrap(); - - // The serve future shouldn't resolve until the dummy job finishes - futures::select! { - _ = serve_fut => panic!("should wait for jobs to finish"), - _ = tokio::time::sleep(tokio::time::Duration::from_millis(10)).fuse() => {} - } - - assert!(!task.is_complete(), "task should still be running"); - - // But the databases should have been shutdown - assert!( - other_db.join().now_or_never().is_some(), - "database should have been terminated and have finished" - ); - - // Once the dummy job completes - the serve future should resolve - futures::select! { - _ = serve_fut => {}, - _ = tokio::time::sleep(tokio::time::Duration::from_secs(5)).fuse() => panic!("timeout shouldn't expire") - } - - assert_eq!( - task.get_status().result().unwrap(), - tracker::TaskResult::Success - ) - } - - async fn jaeger_client(addr: SocketAddr, trace: &'static str) -> Connection { - influxdb_iox_client::connection::Builder::default() - .header( - HeaderName::from_static("uber-trace-id"), - HeaderValue::from_static(trace), - ) - .build(format!("http://{}", addr)) - .await - .unwrap() - } - - async fn tracing_server( - collector: &Arc, - ) -> ( - SocketAddr, - Arc>, - JoinHandle>, - ) { - let config = test_config(Some(23)); - let application = make_application(&config, Some(Arc::::clone(collector))) - .await - .unwrap(); - let server = make_server(Arc::clone(&application), &config); - server.wait_for_init().await.unwrap(); - - let grpc_listener = grpc_listener(config.run_config.grpc_bind_address.into()) - .await - .unwrap(); - let http_listener = http_listener(config.run_config.grpc_bind_address.into()) - .await - .unwrap(); - - let addr = grpc_listener.local_addr().unwrap(); - - let fut = serve( - config.run_config, - application, - grpc_listener, - http_listener, - Arc::clone(&server), - ); - - let join = tokio::spawn(fut); - (addr, server, join) - } - - #[tokio::test] - async fn test_tracing() { - let trace_collector = Arc::new(RingBufferTraceCollector::new(20)); - let (addr, server, join) = tracing_server(&trace_collector).await; - - let client = influxdb_iox_client::connection::Builder::default() - .build(format!("http://{}", addr)) - .await - .unwrap(); - - let mut client = influxdb_iox_client::management::Client::new(client); - - client.list_database_names().await.unwrap(); - - assert_eq!(trace_collector.spans().len(), 0); - - let b3_tracing_client = influxdb_iox_client::connection::Builder::default() - .header( - HeaderName::from_static("x-b3-sampled"), - HeaderValue::from_static("1"), - ) - .header( - HeaderName::from_static("x-b3-traceid"), - HeaderValue::from_static("fea24902"), - ) - .header( - HeaderName::from_static("x-b3-spanid"), - HeaderValue::from_static("ab3409"), - ) - .build(format!("http://{}", addr)) - .await - .unwrap(); - - let mut b3_tracing_client = influxdb_iox_client::management::Client::new(b3_tracing_client); - - b3_tracing_client.list_database_names().await.unwrap(); - b3_tracing_client.get_server_status().await.unwrap(); - - let conn = jaeger_client(addr, "34f9495:30e34:0:1").await; - influxdb_iox_client::management::Client::new(conn) - .list_database_names() - .await - .unwrap(); - - let spans = trace_collector.spans(); - assert_eq!(spans.len(), 3); - - assert_eq!(spans[0].name, "IOx"); - assert_eq!(spans[0].ctx.parent_span_id.unwrap().0.get(), 0xab3409); - assert_eq!(spans[0].ctx.trace_id.0.get(), 0xfea24902); - assert!(spans[0].start.is_some()); - assert!(spans[0].end.is_some()); - assert_eq!(spans[0].status, SpanStatus::Ok); - - assert_eq!(spans[1].name, "IOx"); - assert_eq!(spans[1].ctx.parent_span_id.unwrap().0.get(), 0xab3409); - assert_eq!(spans[1].ctx.trace_id.0.get(), 0xfea24902); - assert!(spans[1].start.is_some()); - assert!(spans[1].end.is_some()); - - assert_eq!(spans[2].name, "IOx"); - assert_eq!(spans[2].ctx.parent_span_id.unwrap().0.get(), 0x30e34); - assert_eq!(spans[2].ctx.trace_id.0.get(), 0x34f9495); - assert!(spans[2].start.is_some()); - assert!(spans[2].end.is_some()); - - assert_ne!(spans[0].ctx.span_id, spans[1].ctx.span_id); - server.shutdown(); - join.await.unwrap().unwrap(); - } - - /// Ensure that query is fully executed. - async fn consume_query(mut query: PerformQuery) { - while query.next().await.unwrap().is_some() {} - } - - #[tokio::test] - async fn test_query_tracing() { - let collector = Arc::new(RingBufferTraceCollector::new(100)); - let (addr, server, join) = tracing_server(&collector).await; - let conn = jaeger_client(addr, "34f8495:35e32:0:1").await; - - let db_info = influxdb_storage_client::OrgAndBucket::new( - NonZeroU64::new(12).unwrap(), - NonZeroU64::new(344).unwrap(), - ); - - // Perform a number of different requests to generate traces - - let mut management = influxdb_iox_client::management::Client::new(conn.clone()); - management - .create_database( - influxdb_iox_client::management::generated_types::DatabaseRules { - name: db_info.db_name().to_string(), - ..Default::default() - }, - ) - .await - .unwrap(); - - let mut write = influxdb_iox_client::write::Client::new(conn.clone()); - write - .write(db_info.db_name(), "cpu,tag0=foo val=1 100\n") - .await - .unwrap(); - - let mut flight = influxdb_iox_client::flight::Client::new(conn.clone()); - consume_query( - flight - .perform_query(db_info.db_name(), "select * from cpu;") - .await - .unwrap(), - ) - .await; - - flight - .perform_query("nonexistent", "select * from cpu;") - .await - .unwrap_err(); - - flight - .perform_query(db_info.db_name(), "select * from nonexistent;") - .await - .unwrap_err(); - - let mut storage = influxdb_storage_client::Client::new(conn); - storage - .tag_values(influxdb_storage_client::generated_types::TagValuesRequest { - tags_source: Some(influxdb_storage_client::Client::read_source(&db_info, 1)), - range: None, - predicate: None, - tag_key: "tag0".into(), - }) - .await - .unwrap(); - - server.shutdown(); - join.await.unwrap().unwrap(); - - // Check generated traces - - let spans = collector.spans(); - - let root_spans: Vec<_> = spans.iter().filter(|span| &span.name == "IOx").collect(); - // Made 6 requests - assert_eq!(root_spans.len(), 6); - - let child = |parent: &Span, name: &'static str| -> Option<&Span> { - spans.iter().find(|span| { - span.ctx.parent_span_id == Some(parent.ctx.span_id) && span.name == name - }) - }; - - // Test SQL - let sql_query_span = root_spans[2]; - assert_eq!(sql_query_span.status, SpanStatus::Ok); - - let ctx_span = child(sql_query_span, "Query Execution").unwrap(); - let planner_span = child(ctx_span, "Planner").unwrap(); - let sql_span = child(planner_span, "sql").unwrap(); - let prepare_sql_span = child(sql_span, "prepare_sql").unwrap(); - child(prepare_sql_span, "prepare_plan").unwrap(); - - let execute_span = child(ctx_span, "execute_stream_partitioned").unwrap(); - let coalesce_span = child(execute_span, "CoalescePartitionsEx").unwrap(); - - // validate spans from DataFusion ExecutionPlan are present - child(coalesce_span, "ProjectionExec: expr").unwrap(); - - let database_not_found = root_spans[3]; - assert_eq!(database_not_found.status, SpanStatus::Err); - assert!(database_not_found - .events - .iter() - .any(|event| event.msg.as_ref() == "not found")); - - let table_not_found = root_spans[4]; - assert_eq!(table_not_found.status, SpanStatus::Err); - assert!(table_not_found - .events - .iter() - .any(|event| event.msg.as_ref() == "invalid argument")); - - // Test tag_values - let storage_span = root_spans[5]; - let ctx_span = child(storage_span, "Query Execution").unwrap(); - child(ctx_span, "Planner").unwrap(); - - let to_string_set = child(ctx_span, "to_string_set").unwrap(); - child(to_string_set, "run_logical_plans").unwrap(); - } - - #[tokio::test] - async fn test_async_exporter() { - let (sender, mut receiver) = tokio::sync::mpsc::channel(20); - let collector = Arc::new(AsyncExporter::new(TestAsyncExporter::new(sender))); - - let (addr, server, join) = tracing_server(&collector).await; - let conn = jaeger_client(addr, "34f8495:30e34:0:1").await; - influxdb_iox_client::management::Client::new(conn) - .list_database_names() - .await - .unwrap(); - - collector.drain().await.unwrap(); - - server.shutdown(); - join.await.unwrap().unwrap(); - - let span = receiver.recv().await.unwrap(); - assert_eq!(span.ctx.trace_id.get(), 0x34f8495); - assert_eq!(span.ctx.parent_span_id.unwrap().get(), 0x30e34); - } - - fn make_rules(db_name: impl Into) -> ProvidedDatabaseRules { - let db_name = DatabaseName::new(db_name.into()).unwrap(); - ProvidedDatabaseRules::new_rules(DatabaseRules::new(db_name).into()) - .expect("Tests should create valid DatabaseRules") - } -} diff --git a/influxdb_iox/src/influxdb_ioxd/rpc.rs b/influxdb_iox/src/influxdb_ioxd/rpc.rs index 34655d7216..263dfd4726 100644 --- a/influxdb_iox/src/influxdb_ioxd/rpc.rs +++ b/influxdb_iox/src/influxdb_ioxd/rpc.rs @@ -6,7 +6,10 @@ use tonic::transport::NamedService; use tonic_health::server::HealthReporter; use trace_http::ctx::TraceHeaderParser; -use crate::influxdb_ioxd::server_type::{RpcError, ServerType}; +use crate::influxdb_ioxd::{ + server_type::{RpcError, ServerType}, + serving_readiness::ServingReadiness, +}; pub mod error; pub(crate) mod testing; @@ -16,17 +19,21 @@ pub fn service_name(_: &S) -> &'static str { S::NAME } +#[derive(Debug)] pub struct RpcBuilderInput { pub socket: TcpListener, pub trace_header_parser: TraceHeaderParser, pub shutdown: CancellationToken, + pub serving_readiness: ServingReadiness, } +#[derive(Debug)] pub struct RpcBuilder { pub inner: T, pub health_reporter: HealthReporter, pub shutdown: CancellationToken, pub socket: TcpListener, + pub serving_readiness: ServingReadiness, } /// Adds a gRPC service to the builder, and registers it with the @@ -46,6 +53,7 @@ macro_rules! add_service { mut health_reporter, shutdown, socket, + serving_readiness, } = $builder; let service = $svc; @@ -61,6 +69,7 @@ macro_rules! add_service { health_reporter, shutdown, socket, + serving_readiness, } } }; @@ -72,11 +81,11 @@ pub(crate) use add_service; /// Adds a gRPC service to the builder gated behind the serving /// readiness check, and registers it with the health reporter macro_rules! add_gated_service { - ($builder:ident, $serving_readiness:expr, $svc:expr) => { + ($builder:ident, $svc:expr) => { let $builder = { let service = $svc; - let interceptor = $serving_readiness.clone().into_interceptor(); + let interceptor = $builder.serving_readiness.clone().into_interceptor(); let service = tonic::codegen::InterceptedService::new(service, interceptor); add_service!($builder, service); @@ -103,6 +112,7 @@ macro_rules! setup_builder { socket, trace_header_parser, shutdown, + serving_readiness, } = $input; let (health_reporter, health_service) = tonic_health::server::health_reporter(); @@ -124,6 +134,7 @@ macro_rules! setup_builder { health_reporter, shutdown, socket, + serving_readiness, }; // important that this one is NOT gated so that it can answer health requests @@ -168,6 +179,7 @@ pub async fn serve( server_type: Arc, trace_header_parser: TraceHeaderParser, shutdown: CancellationToken, + serving_readiness: ServingReadiness, ) -> Result<(), RpcError> where T: ServerType, @@ -176,6 +188,7 @@ where socket, trace_header_parser, shutdown, + serving_readiness, }; server_type.server_grpc(builder_input).await diff --git a/influxdb_iox/src/influxdb_ioxd/server_type/common_state.rs b/influxdb_iox/src/influxdb_ioxd/server_type/common_state.rs new file mode 100644 index 0000000000..f024f46b27 --- /dev/null +++ b/influxdb_iox/src/influxdb_ioxd/server_type/common_state.rs @@ -0,0 +1,64 @@ +use std::sync::Arc; + +use snafu::{ResultExt, Snafu}; +use trace::TraceCollector; + +use crate::{ + influxdb_ioxd::serving_readiness::ServingReadiness, structopt_blocks::run_config::RunConfig, +}; + +#[derive(Debug, Snafu)] +pub enum CommonServerStateError { + #[snafu(display("Cannot create tracing pipeline: {}", source))] + Tracing { source: trace_exporters::Error }, +} + +/// Common state used by all server types (e.g. `Database` and `Router`) +#[derive(Debug)] +pub struct CommonServerState { + run_config: RunConfig, + serving_readiness: ServingReadiness, + trace_exporter: Option>, +} + +impl CommonServerState { + pub fn from_config(run_config: RunConfig) -> Result { + let serving_readiness = run_config.initial_serving_state.clone().into(); + let trace_exporter = run_config.tracing_config.build().context(Tracing)?; + + Ok(Self { + run_config, + serving_readiness, + trace_exporter, + }) + } + + #[cfg(test)] + pub fn for_testing() -> Self { + use structopt::StructOpt; + + Self::from_config( + RunConfig::from_iter_safe(["not_used".to_string()].into_iter()) + .expect("default parsing should work"), + ) + .expect("default configs should work") + } + + pub fn run_config(&self) -> &RunConfig { + &self.run_config + } + + pub fn serving_readiness(&self) -> &ServingReadiness { + &self.serving_readiness + } + + pub fn trace_exporter(&self) -> Option> { + self.trace_exporter.clone() + } + + pub fn trace_collector(&self) -> Option> { + self.trace_exporter + .clone() + .map(|x| -> Arc { x }) + } +} diff --git a/influxdb_iox/src/influxdb_ioxd/server_type/database/http.rs b/influxdb_iox/src/influxdb_ioxd/server_type/database/http.rs index 2cc91ae7f2..a3338d32e5 100644 --- a/influxdb_iox/src/influxdb_ioxd/server_type/database/http.rs +++ b/influxdb_iox/src/influxdb_ioxd/server_type/database/http.rs @@ -566,8 +566,7 @@ async fn query( mod tests { use crate::influxdb_ioxd::{ http::test_utils::{check_response, get_content_type, TestServer}, - server_type::ServerType, - serving_readiness::ServingReadiness, + server_type::{common_state::CommonServerState, ServerType}, }; use super::*; @@ -1301,13 +1300,10 @@ mod tests { application: Arc, server: Arc>, ) -> TestServer> { - let server_type = Arc::new(DatabaseServerType::new( - application, - server, - TEST_MAX_REQUEST_SIZE, - ServingReadiness::new(Default::default()), - )); - TestServer::new(server_type) + let mut server_type = + DatabaseServerType::new(application, server, &CommonServerState::for_testing()); + server_type.max_request_size = TEST_MAX_REQUEST_SIZE; + TestServer::new(Arc::new(server_type)) } /// Run the specified SQL query and return formatted results as a string diff --git a/influxdb_iox/src/influxdb_ioxd/server_type/database/mod.rs b/influxdb_iox/src/influxdb_ioxd/server_type/database/mod.rs index 3a3da969a3..bec792acd5 100644 --- a/influxdb_iox/src/influxdb_ioxd/server_type/database/mod.rs +++ b/influxdb_iox/src/influxdb_ioxd/server_type/database/mod.rs @@ -1,9 +1,12 @@ use std::sync::Arc; use async_trait::async_trait; +use futures::{future::FusedFuture, FutureExt}; use hyper::{Body, Request, Response}; use metric::Registry; +use observability_deps::tracing::{error, info}; use server::{connection::ConnectionManager, ApplicationState, Server}; +use tokio_util::sync::CancellationToken; use trace::TraceCollector; use crate::influxdb_ioxd::{ @@ -15,9 +18,12 @@ use crate::influxdb_ioxd::{ mod http; mod rpc; +pub mod setup; pub use self::http::ApplicationError; +use super::common_state::CommonServerState; + #[derive(Debug)] pub struct DatabaseServerType where @@ -28,6 +34,7 @@ where pub lp_metrics: Arc, pub max_request_size: usize, pub serving_readiness: ServingReadiness, + shutdown: CancellationToken, } impl DatabaseServerType @@ -37,8 +44,7 @@ where pub fn new( application: Arc, server: Arc>, - max_request_size: usize, - serving_readiness: ServingReadiness, + common_state: &CommonServerState, ) -> Self { let lp_metrics = Arc::new(LineProtocolMetrics::new( application.metric_registry().as_ref(), @@ -48,8 +54,9 @@ where application, server, lp_metrics, - max_request_size, - serving_readiness, + max_request_size: common_state.run_config().max_http_request_size, + serving_readiness: common_state.serving_readiness().clone(), + shutdown: CancellationToken::new(), } } } @@ -79,4 +86,518 @@ where async fn server_grpc(self: Arc, builder_input: RpcBuilderInput) -> Result<(), RpcError> { self::rpc::server_grpc(self, builder_input).await } + + async fn background_worker(self: Arc) { + let server_worker = self.server.join().fuse(); + futures::pin_mut!(server_worker); + + futures::select! { + _ = server_worker => {}, + _ = self.shutdown.cancelled().fuse() => {}, + } + + self.server.shutdown(); + + if !server_worker.is_terminated() { + match server_worker.await { + Ok(_) => info!("server worker shutdown"), + Err(error) => error!(%error, "server worker error"), + } + } + + info!("server completed shutting down"); + + self.application.join(); + info!("shared application state completed shutting down"); + } + + fn shutdown_background_worker(&self) { + self.server.shutdown(); + self.application.join(); + } +} + +#[cfg(test)] +mod tests { + use crate::{ + commands::run::database::Config, + influxdb_ioxd::{ + grpc_listener, http_listener, serve, + server_type::database::setup::{make_application, make_server}, + }, + structopt_blocks::run_config::RunConfig, + }; + + use super::*; + use ::http::{header::HeaderName, HeaderValue}; + use data_types::{database_rules::DatabaseRules, DatabaseName}; + use futures::pin_mut; + use influxdb_iox_client::{connection::Connection, flight::PerformQuery}; + use server::{connection::ConnectionManagerImpl, rules::ProvidedDatabaseRules}; + use std::{convert::TryInto, net::SocketAddr, num::NonZeroU64}; + use structopt::StructOpt; + use tokio::task::JoinHandle; + use trace::{ + span::{Span, SpanStatus}, + RingBufferTraceCollector, + }; + use trace_exporters::export::{AsyncExporter, TestAsyncExporter}; + + fn test_config(server_id: Option) -> Config { + let mut config = Config::from_iter(&[ + "run", + "--api-bind", + "127.0.0.1:0", + "--grpc-bind", + "127.0.0.1:0", + ]); + config.run_config.server_id_config.server_id = server_id.map(|x| x.try_into().unwrap()); + config + } + + async fn test_serve( + config: RunConfig, + application: Arc, + server: Arc>, + ) { + let grpc_listener = grpc_listener(config.grpc_bind_address.into()) + .await + .unwrap(); + let http_listener = http_listener(config.grpc_bind_address.into()) + .await + .unwrap(); + + let common_state = CommonServerState::from_config(config).unwrap(); + let server_type = Arc::new(DatabaseServerType::new(application, server, &common_state)); + + serve(common_state, grpc_listener, http_listener, server_type) + .await + .unwrap() + } + + #[tokio::test] + async fn test_server_shutdown() { + test_helpers::maybe_start_logging(); + + // Create a server and wait for it to initialize + let config = test_config(Some(23)); + let application = make_application(&config, None).await.unwrap(); + let server = make_server(Arc::clone(&application), &config); + server.wait_for_init().await.unwrap(); + + // Start serving + let serve_fut = test_serve(config.run_config, application, Arc::clone(&server)).fuse(); + pin_mut!(serve_fut); + + // Nothing to trigger termination, so serve future should continue running + futures::select! { + _ = serve_fut => panic!("serve shouldn't finish"), + _ = tokio::time::sleep(tokio::time::Duration::from_millis(10)).fuse() => {} + } + + // Trigger shutdown + server.shutdown(); + + // The serve future should now complete + futures::select! { + _ = serve_fut => {}, + _ = tokio::time::sleep(tokio::time::Duration::from_secs(5)).fuse() => panic!("timeout shouldn't expire") + } + } + + #[tokio::test] + async fn test_server_shutdown_uninit() { + // Create a server but don't set a server id + let config = test_config(None); + let application = make_application(&config, None).await.unwrap(); + let server = make_server(Arc::clone(&application), &config); + + let serve_fut = test_serve(config.run_config, application, Arc::clone(&server)).fuse(); + pin_mut!(serve_fut); + + // Nothing should have triggered shutdown so serve shouldn't finish + futures::select! { + _ = serve_fut => panic!("serve shouldn't finish"), + _ = tokio::time::sleep(tokio::time::Duration::from_millis(10)).fuse() => {} + } + + // We never set the server ID and so the server should not initialize + assert!( + server.wait_for_init().now_or_never().is_none(), + "shouldn't have initialized" + ); + + // But it should still be possible to shut it down + server.shutdown(); + + futures::select! { + _ = serve_fut => {}, + _ = tokio::time::sleep(tokio::time::Duration::from_secs(5)).fuse() => panic!("timeout shouldn't expire") + } + } + + #[tokio::test] + async fn test_server_panic() { + // Create a server and wait for it to initialize + let config = test_config(Some(999999999)); + let application = make_application(&config, None).await.unwrap(); + let server = make_server(Arc::clone(&application), &config); + server.wait_for_init().await.unwrap(); + + let serve_fut = test_serve(config.run_config, application, Arc::clone(&server)).fuse(); + pin_mut!(serve_fut); + + // Nothing should have triggered shutdown so serve shouldn't finish + futures::select! { + _ = serve_fut => panic!("serve shouldn't finish"), + _ = tokio::time::sleep(tokio::time::Duration::from_millis(10)).fuse() => {} + } + + // Trigger a panic in the Server background worker + server::utils::register_panic_key("server background worker: 999999999"); + + // This should trigger shutdown + futures::select! { + _ = serve_fut => {}, + _ = tokio::time::sleep(tokio::time::Duration::from_secs(5)).fuse() => panic!("timeout shouldn't expire") + } + } + + #[tokio::test] + async fn test_database_panic() { + // Create a server and wait for it to initialize + let config = test_config(Some(23)); + let application = make_application(&config, None).await.unwrap(); + let server = make_server(Arc::clone(&application), &config); + server.wait_for_init().await.unwrap(); + + // Create a database that won't panic + let other_db_name = DatabaseName::new("other").unwrap(); + server + .create_database(make_rules(&other_db_name)) + .await + .unwrap(); + + let other_db = server.database(&other_db_name).unwrap(); + + let serve_fut = test_serve( + config.run_config, + Arc::clone(&application), + Arc::clone(&server), + ) + .fuse(); + pin_mut!(serve_fut); + + // Nothing should have triggered shutdown so serve shouldn't finish + futures::select! { + _ = serve_fut => panic!("serve shouldn't finish"), + _ = tokio::time::sleep(tokio::time::Duration::from_millis(10)).fuse() => {} + } + + // Configure a panic in the worker of the database we're about to create + server::utils::register_panic_key("database background worker: panic_test"); + + // Spawn a dummy job that will delay shutdown as it runs to completion + let task = application + .job_registry() + .spawn_dummy_job(vec![1_000_000_000], None); + + // Create database that will panic in its worker loop + server + .create_database(make_rules("panic_test")) + .await + .unwrap(); + + // The serve future shouldn't resolve until the dummy job finishes + futures::select! { + _ = serve_fut => panic!("should wait for jobs to finish"), + _ = tokio::time::sleep(tokio::time::Duration::from_millis(10)).fuse() => {} + } + + assert!(!task.is_complete(), "task should still be running"); + + // But the databases should have been shutdown + assert!( + other_db.join().now_or_never().is_some(), + "database should have been terminated and have finished" + ); + + // Once the dummy job completes - the serve future should resolve + futures::select! { + _ = serve_fut => {}, + _ = tokio::time::sleep(tokio::time::Duration::from_secs(5)).fuse() => panic!("timeout shouldn't expire") + } + + assert_eq!( + task.get_status().result().unwrap(), + tracker::TaskResult::Success + ) + } + + async fn jaeger_client(addr: SocketAddr, trace: &'static str) -> Connection { + influxdb_iox_client::connection::Builder::default() + .header( + HeaderName::from_static("uber-trace-id"), + HeaderValue::from_static(trace), + ) + .build(format!("http://{}", addr)) + .await + .unwrap() + } + + async fn tracing_server( + collector: &Arc, + ) -> ( + SocketAddr, + Arc>, + JoinHandle>, + ) { + let config = test_config(Some(23)); + let application = make_application(&config, Some(Arc::::clone(collector))) + .await + .unwrap(); + let server = make_server(Arc::clone(&application), &config); + server.wait_for_init().await.unwrap(); + + let grpc_listener = grpc_listener(config.run_config.grpc_bind_address.into()) + .await + .unwrap(); + let http_listener = http_listener(config.run_config.grpc_bind_address.into()) + .await + .unwrap(); + + let addr = grpc_listener.local_addr().unwrap(); + + let common_state = CommonServerState::from_config(config.run_config.clone()).unwrap(); + let server_type = Arc::new(DatabaseServerType::new( + application, + Arc::clone(&server), + &common_state, + )); + + let fut = serve(common_state, grpc_listener, http_listener, server_type); + + let join = tokio::spawn(fut); + (addr, server, join) + } + + #[tokio::test] + async fn test_tracing() { + let trace_collector = Arc::new(RingBufferTraceCollector::new(20)); + let (addr, server, join) = tracing_server(&trace_collector).await; + + let client = influxdb_iox_client::connection::Builder::default() + .build(format!("http://{}", addr)) + .await + .unwrap(); + + let mut client = influxdb_iox_client::management::Client::new(client); + + client.list_database_names().await.unwrap(); + + assert_eq!(trace_collector.spans().len(), 0); + + let b3_tracing_client = influxdb_iox_client::connection::Builder::default() + .header( + HeaderName::from_static("x-b3-sampled"), + HeaderValue::from_static("1"), + ) + .header( + HeaderName::from_static("x-b3-traceid"), + HeaderValue::from_static("fea24902"), + ) + .header( + HeaderName::from_static("x-b3-spanid"), + HeaderValue::from_static("ab3409"), + ) + .build(format!("http://{}", addr)) + .await + .unwrap(); + + let mut b3_tracing_client = influxdb_iox_client::management::Client::new(b3_tracing_client); + + b3_tracing_client.list_database_names().await.unwrap(); + b3_tracing_client.get_server_status().await.unwrap(); + + let conn = jaeger_client(addr, "34f9495:30e34:0:1").await; + influxdb_iox_client::management::Client::new(conn) + .list_database_names() + .await + .unwrap(); + + let spans = trace_collector.spans(); + assert_eq!(spans.len(), 3); + + assert_eq!(spans[0].name, "IOx"); + assert_eq!(spans[0].ctx.parent_span_id.unwrap().0.get(), 0xab3409); + assert_eq!(spans[0].ctx.trace_id.0.get(), 0xfea24902); + assert!(spans[0].start.is_some()); + assert!(spans[0].end.is_some()); + assert_eq!(spans[0].status, SpanStatus::Ok); + + assert_eq!(spans[1].name, "IOx"); + assert_eq!(spans[1].ctx.parent_span_id.unwrap().0.get(), 0xab3409); + assert_eq!(spans[1].ctx.trace_id.0.get(), 0xfea24902); + assert!(spans[1].start.is_some()); + assert!(spans[1].end.is_some()); + + assert_eq!(spans[2].name, "IOx"); + assert_eq!(spans[2].ctx.parent_span_id.unwrap().0.get(), 0x30e34); + assert_eq!(spans[2].ctx.trace_id.0.get(), 0x34f9495); + assert!(spans[2].start.is_some()); + assert!(spans[2].end.is_some()); + + assert_ne!(spans[0].ctx.span_id, spans[1].ctx.span_id); + server.shutdown(); + join.await.unwrap().unwrap(); + } + + /// Ensure that query is fully executed. + async fn consume_query(mut query: PerformQuery) { + while query.next().await.unwrap().is_some() {} + } + + #[tokio::test] + async fn test_query_tracing() { + let collector = Arc::new(RingBufferTraceCollector::new(100)); + let (addr, server, join) = tracing_server(&collector).await; + let conn = jaeger_client(addr, "34f8495:35e32:0:1").await; + + let db_info = influxdb_storage_client::OrgAndBucket::new( + NonZeroU64::new(12).unwrap(), + NonZeroU64::new(344).unwrap(), + ); + + // Perform a number of different requests to generate traces + + let mut management = influxdb_iox_client::management::Client::new(conn.clone()); + management + .create_database( + influxdb_iox_client::management::generated_types::DatabaseRules { + name: db_info.db_name().to_string(), + ..Default::default() + }, + ) + .await + .unwrap(); + + let mut write = influxdb_iox_client::write::Client::new(conn.clone()); + write + .write(db_info.db_name(), "cpu,tag0=foo val=1 100\n") + .await + .unwrap(); + + let mut flight = influxdb_iox_client::flight::Client::new(conn.clone()); + consume_query( + flight + .perform_query(db_info.db_name(), "select * from cpu;") + .await + .unwrap(), + ) + .await; + + flight + .perform_query("nonexistent", "select * from cpu;") + .await + .unwrap_err(); + + flight + .perform_query(db_info.db_name(), "select * from nonexistent;") + .await + .unwrap_err(); + + let mut storage = influxdb_storage_client::Client::new(conn); + storage + .tag_values(influxdb_storage_client::generated_types::TagValuesRequest { + tags_source: Some(influxdb_storage_client::Client::read_source(&db_info, 1)), + range: None, + predicate: None, + tag_key: "tag0".into(), + }) + .await + .unwrap(); + + server.shutdown(); + join.await.unwrap().unwrap(); + + // Check generated traces + + let spans = collector.spans(); + + let root_spans: Vec<_> = spans.iter().filter(|span| &span.name == "IOx").collect(); + // Made 6 requests + assert_eq!(root_spans.len(), 6); + + let child = |parent: &Span, name: &'static str| -> Option<&Span> { + spans.iter().find(|span| { + span.ctx.parent_span_id == Some(parent.ctx.span_id) && span.name == name + }) + }; + + // Test SQL + let sql_query_span = root_spans[2]; + assert_eq!(sql_query_span.status, SpanStatus::Ok); + + let ctx_span = child(sql_query_span, "Query Execution").unwrap(); + let planner_span = child(ctx_span, "Planner").unwrap(); + let sql_span = child(planner_span, "sql").unwrap(); + let prepare_sql_span = child(sql_span, "prepare_sql").unwrap(); + child(prepare_sql_span, "prepare_plan").unwrap(); + + let execute_span = child(ctx_span, "execute_stream_partitioned").unwrap(); + let coalesce_span = child(execute_span, "CoalescePartitionsEx").unwrap(); + + // validate spans from DataFusion ExecutionPlan are present + child(coalesce_span, "ProjectionExec: expr").unwrap(); + + let database_not_found = root_spans[3]; + assert_eq!(database_not_found.status, SpanStatus::Err); + assert!(database_not_found + .events + .iter() + .any(|event| event.msg.as_ref() == "not found")); + + let table_not_found = root_spans[4]; + assert_eq!(table_not_found.status, SpanStatus::Err); + assert!(table_not_found + .events + .iter() + .any(|event| event.msg.as_ref() == "invalid argument")); + + // Test tag_values + let storage_span = root_spans[5]; + let ctx_span = child(storage_span, "Query Execution").unwrap(); + child(ctx_span, "Planner").unwrap(); + + let to_string_set = child(ctx_span, "to_string_set").unwrap(); + child(to_string_set, "run_logical_plans").unwrap(); + } + + #[tokio::test] + async fn test_async_exporter() { + let (sender, mut receiver) = tokio::sync::mpsc::channel(20); + let collector = Arc::new(AsyncExporter::new(TestAsyncExporter::new(sender))); + + let (addr, server, join) = tracing_server(&collector).await; + let conn = jaeger_client(addr, "34f8495:30e34:0:1").await; + influxdb_iox_client::management::Client::new(conn) + .list_database_names() + .await + .unwrap(); + + collector.drain().await.unwrap(); + + server.shutdown(); + join.await.unwrap().unwrap(); + + let span = receiver.recv().await.unwrap(); + assert_eq!(span.ctx.trace_id.get(), 0x34f8495); + assert_eq!(span.ctx.parent_span_id.unwrap().get(), 0x30e34); + } + + fn make_rules(db_name: impl Into) -> ProvidedDatabaseRules { + let db_name = DatabaseName::new(db_name.into()).unwrap(); + ProvidedDatabaseRules::new_rules(DatabaseRules::new(db_name).into()) + .expect("Tests should create valid DatabaseRules") + } } diff --git a/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/mod.rs b/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/mod.rs index 7b0be1e482..73ec546bb5 100644 --- a/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/mod.rs +++ b/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/mod.rs @@ -25,22 +25,15 @@ where add_gated_service!( builder, - server_type.serving_readiness, storage::make_server(Arc::clone(&server_type.server),) ); add_gated_service!( builder, - server_type.serving_readiness, flight::make_server(Arc::clone(&server_type.server)) ); + add_gated_service!(builder, write::make_server(Arc::clone(&server_type.server))); add_gated_service!( builder, - server_type.serving_readiness, - write::make_server(Arc::clone(&server_type.server)) - ); - add_gated_service!( - builder, - server_type.serving_readiness, write_pb::make_server(Arc::clone(&server_type.server)) ); // Also important this is not behind a readiness check (as it is diff --git a/influxdb_iox/src/influxdb_ioxd/server_type/database/setup.rs b/influxdb_iox/src/influxdb_ioxd/server_type/database/setup.rs new file mode 100644 index 0000000000..c38ccef35e --- /dev/null +++ b/influxdb_iox/src/influxdb_ioxd/server_type/database/setup.rs @@ -0,0 +1,72 @@ +use std::sync::Arc; + +use object_store::ObjectStore; +use observability_deps::tracing::warn; +use server::{ + connection::ConnectionManagerImpl, ApplicationState, RemoteTemplate, Server, ServerConfig, +}; +use snafu::{ResultExt, Snafu}; +use trace::TraceCollector; + +use crate::{ + commands::run::database::Config, + structopt_blocks::object_store::{check_object_store, warn_about_inmem_store}, +}; + +#[derive(Debug, Snafu)] +pub enum Error { + #[snafu(display("Cannot parse object store config: {}", source))] + ObjectStoreParsing { + source: crate::structopt_blocks::object_store::ParseError, + }, + + #[snafu(display("Cannot check object store config: {}", source))] + ObjectStoreCheck { + source: crate::structopt_blocks::object_store::CheckError, + }, +} + +pub type Result = std::result::Result; + +pub async fn make_application( + config: &Config, + trace_collector: Option>, +) -> Result> { + warn_about_inmem_store(&config.run_config.object_store_config); + let object_store = ObjectStore::try_from(&config.run_config.object_store_config) + .context(ObjectStoreParsing)?; + check_object_store(&object_store) + .await + .context(ObjectStoreCheck)?; + let object_storage = Arc::new(object_store); + + Ok(Arc::new(ApplicationState::new( + object_storage, + config.num_worker_threads, + trace_collector, + ))) +} + +pub fn make_server( + application: Arc, + config: &Config, +) -> Arc> { + let server_config = ServerConfig { + remote_template: config.remote_template.clone().map(RemoteTemplate::new), + wipe_catalog_on_error: config.wipe_catalog_on_error.into(), + skip_replay_and_seek_instead: config.skip_replay_and_seek_instead.into(), + }; + + let connection_manager = ConnectionManagerImpl::new(); + let app_server = Arc::new(Server::new(connection_manager, application, server_config)); + + // if this ID isn't set the server won't be usable until this is set via an API + // call + if let Some(id) = config.run_config.server_id_config.server_id { + app_server.set_id(id).expect("server id already set"); + } else { + warn!("server ID not set. ID must be set via the INFLUXDB_IOX_ID config or API before writing or querying data."); + } + + app_server +} diff --git a/influxdb_iox/src/influxdb_ioxd/server_type/mod.rs b/influxdb_iox/src/influxdb_ioxd/server_type/mod.rs index 3016586cee..4dd3ed3470 100644 --- a/influxdb_iox/src/influxdb_ioxd/server_type/mod.rs +++ b/influxdb_iox/src/influxdb_ioxd/server_type/mod.rs @@ -8,6 +8,7 @@ use trace::TraceCollector; use super::rpc::RpcBuilderInput; +pub mod common_state; pub mod database; /// Constants used in API error codes. @@ -120,4 +121,8 @@ pub trait ServerType: std::fmt::Debug + Send + Sync + 'static { ) -> Result, Self::RouteError>; async fn server_grpc(self: Arc, builder_input: RpcBuilderInput) -> Result<(), RpcError>; + + async fn background_worker(self: Arc); + + fn shutdown_background_worker(&self); } diff --git a/influxdb_iox/src/structopt_blocks/run_config.rs b/influxdb_iox/src/structopt_blocks/run_config.rs index d68ee7177c..2690c28657 100644 --- a/influxdb_iox/src/structopt_blocks/run_config.rs +++ b/influxdb_iox/src/structopt_blocks/run_config.rs @@ -16,7 +16,7 @@ pub const DEFAULT_API_BIND_ADDR: &str = "127.0.0.1:8080"; pub const DEFAULT_GRPC_BIND_ADDR: &str = "127.0.0.1:8082"; /// Common config for all `run` commands. -#[derive(Debug, StructOpt)] +#[derive(Debug, StructOpt, Clone)] pub struct RunConfig { /// logging options #[structopt(flatten)]