feat: make pprof optional (#2331)
parent
3ee0f9268a
commit
5e1cb244f7
|
@ -214,7 +214,7 @@ jobs:
|
||||||
command: cargo test --workspace --benches --no-run
|
command: cargo test --workspace --benches --no-run
|
||||||
- run:
|
- run:
|
||||||
name: Build with object store + exporter support + HEAP profiling
|
name: Build with object store + exporter support + HEAP profiling
|
||||||
command: cargo build --features="aws,gcp,azure,jaeger,otlp,heappy"
|
command: cargo build --features="aws,gcp,azure,jaeger,otlp,heappy,pprof"
|
||||||
- cache_save
|
- cache_save
|
||||||
|
|
||||||
# Lint protobufs.
|
# Lint protobufs.
|
||||||
|
|
|
@ -103,7 +103,7 @@ itertools = "0.10.1"
|
||||||
parquet = "5.0"
|
parquet = "5.0"
|
||||||
# used by arrow/datafusion anyway
|
# used by arrow/datafusion anyway
|
||||||
prettytable-rs = "0.8"
|
prettytable-rs = "0.8"
|
||||||
pprof = { version = "^0.5", default-features = false, features = ["flamegraph", "protobuf"] }
|
pprof = { version = "^0.5", default-features = false, features = ["flamegraph", "protobuf"], optional = true}
|
||||||
prost = "0.8"
|
prost = "0.8"
|
||||||
# Forked to upgrade hyper and tokio
|
# Forked to upgrade hyper and tokio
|
||||||
routerify = { git = "https://github.com/influxdata/routerify", rev = "274e250" }
|
routerify = { git = "https://github.com/influxdata/routerify", rev = "274e250" }
|
||||||
|
@ -153,4 +153,5 @@ aws = ["object_store/aws"] # Optional AWS / S3 object store support
|
||||||
jaeger = ["trogging/jaeger"] # Enable optional jaeger tracing support
|
jaeger = ["trogging/jaeger"] # Enable optional jaeger tracing support
|
||||||
otlp = ["trogging/otlp"] # Enable optional open telemetry collector
|
otlp = ["trogging/otlp"] # Enable optional open telemetry collector
|
||||||
# heappy is also an optional feature; Not on by default as it
|
# heappy is also an optional feature; Not on by default as it
|
||||||
# runtime overhead on all allocations (calls to malloc)
|
# runtime overhead on all allocations (calls to malloc)
|
||||||
|
# pprof is also an optional feature for pprof support
|
|
@ -13,7 +13,7 @@ RUN \
|
||||||
--mount=type=cache,id=influxdb_iox_git,sharing=locked,target=/usr/local/cargo/git \
|
--mount=type=cache,id=influxdb_iox_git,sharing=locked,target=/usr/local/cargo/git \
|
||||||
--mount=type=cache,id=influxdb_iox_target,sharing=locked,target=/influxdb_iox/target \
|
--mount=type=cache,id=influxdb_iox_target,sharing=locked,target=/influxdb_iox/target \
|
||||||
du -cshx /usr/local/cargo/registry /usr/local/cargo/git /influxdb_iox/target && \
|
du -cshx /usr/local/cargo/registry /usr/local/cargo/git /influxdb_iox/target && \
|
||||||
cargo build --target-dir /influxdb_iox/target --release --features azure,gcp,aws,jaeger,otlp && \
|
cargo build --target-dir /influxdb_iox/target --release --features azure,gcp,aws,jaeger,otlp,pprof && \
|
||||||
cp /influxdb_iox/target/release/influxdb_iox /root/influxdb_iox && \
|
cp /influxdb_iox/target/release/influxdb_iox /root/influxdb_iox && \
|
||||||
du -cshx /usr/local/cargo/registry /usr/local/cargo/git /influxdb_iox/target
|
du -cshx /usr/local/cargo/registry /usr/local/cargo/git /influxdb_iox/target
|
||||||
|
|
||||||
|
|
|
@ -13,6 +13,9 @@
|
||||||
#[cfg(feature = "heappy")]
|
#[cfg(feature = "heappy")]
|
||||||
mod heappy;
|
mod heappy;
|
||||||
|
|
||||||
|
#[cfg(feature = "pprof")]
|
||||||
|
mod pprof;
|
||||||
|
|
||||||
// Influx crates
|
// Influx crates
|
||||||
use super::planner::Planner;
|
use super::planner::Planner;
|
||||||
use data_types::{
|
use data_types::{
|
||||||
|
@ -32,21 +35,19 @@ use http::header::{CONTENT_ENCODING, CONTENT_TYPE};
|
||||||
use hyper::{http::HeaderValue, Body, Method, Request, Response, StatusCode};
|
use hyper::{http::HeaderValue, Body, Method, Request, Response, StatusCode};
|
||||||
use observability_deps::{
|
use observability_deps::{
|
||||||
opentelemetry::KeyValue,
|
opentelemetry::KeyValue,
|
||||||
tracing::{self, debug, error, info},
|
tracing::{self, debug, error},
|
||||||
};
|
};
|
||||||
use routerify::{prelude::*, Middleware, RequestInfo, Router, RouterError, RouterService};
|
use routerify::{prelude::*, Middleware, RequestInfo, Router, RouterError, RouterService};
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
use snafu::{OptionExt, ResultExt, Snafu};
|
use snafu::{OptionExt, ResultExt, Snafu};
|
||||||
|
|
||||||
use hyper::server::conn::AddrIncoming;
|
use hyper::server::conn::AddrIncoming;
|
||||||
use pprof::protos::Message;
|
|
||||||
use std::num::NonZeroI32;
|
use std::num::NonZeroI32;
|
||||||
use std::{
|
use std::{
|
||||||
fmt::Debug,
|
fmt::Debug,
|
||||||
str::{self, FromStr},
|
str::{self, FromStr},
|
||||||
sync::Arc,
|
sync::Arc,
|
||||||
};
|
};
|
||||||
use tokio::time::Duration;
|
|
||||||
use tokio_util::sync::CancellationToken;
|
use tokio_util::sync::CancellationToken;
|
||||||
|
|
||||||
/// Constants used in API error codes.
|
/// Constants used in API error codes.
|
||||||
|
@ -200,7 +201,9 @@ pub enum ApplicationError {
|
||||||
Planning { source: super::planner::Error },
|
Planning { source: super::planner::Error },
|
||||||
|
|
||||||
#[snafu(display("PProf error: {}", source))]
|
#[snafu(display("PProf error: {}", source))]
|
||||||
PProf { source: pprof::Error },
|
PProf {
|
||||||
|
source: Box<dyn std::error::Error + Send + Sync>,
|
||||||
|
},
|
||||||
|
|
||||||
#[snafu(display("Protobuf error: {}", source))]
|
#[snafu(display("Protobuf error: {}", source))]
|
||||||
Prost { source: prost::EncodeError },
|
Prost { source: prost::EncodeError },
|
||||||
|
@ -223,8 +226,11 @@ pub enum ApplicationError {
|
||||||
#[snafu(display("Internal server error"))]
|
#[snafu(display("Internal server error"))]
|
||||||
InternalServerError,
|
InternalServerError,
|
||||||
|
|
||||||
#[snafu(display("Heappy is not compiled"))]
|
#[snafu(display("heappy support is not compiled"))]
|
||||||
HeappyIsNotCompiled,
|
HeappyIsNotCompiled,
|
||||||
|
|
||||||
|
#[snafu(display("pprof support is not compiled"))]
|
||||||
|
PProfIsNotCompiled,
|
||||||
}
|
}
|
||||||
|
|
||||||
type Result<T, E = ApplicationError> = std::result::Result<T, E>;
|
type Result<T, E = ApplicationError> = std::result::Result<T, E>;
|
||||||
|
@ -266,6 +272,7 @@ impl ApplicationError {
|
||||||
Self::DatabaseNotInitialized { .. } => self.bad_request(),
|
Self::DatabaseNotInitialized { .. } => self.bad_request(),
|
||||||
Self::InternalServerError => self.internal_error(),
|
Self::InternalServerError => self.internal_error(),
|
||||||
Self::HeappyIsNotCompiled => self.internal_error(),
|
Self::HeappyIsNotCompiled => self.internal_error(),
|
||||||
|
Self::PProfIsNotCompiled => self.internal_error(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -786,22 +793,6 @@ async fn pprof_home<M: ConnectionManager + Send + Sync + Debug + 'static>(
|
||||||
))))
|
))))
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn dump_rsprof(seconds: u64, frequency: i32) -> pprof::Result<pprof::Report> {
|
|
||||||
let guard = pprof::ProfilerGuard::new(frequency)?;
|
|
||||||
info!(
|
|
||||||
"start profiling {} seconds with frequency {} /s",
|
|
||||||
seconds, frequency
|
|
||||||
);
|
|
||||||
|
|
||||||
tokio::time::sleep(Duration::from_secs(seconds)).await;
|
|
||||||
|
|
||||||
info!(
|
|
||||||
"done profiling {} seconds with frequency {} /s",
|
|
||||||
seconds, frequency
|
|
||||||
);
|
|
||||||
guard.report().build()
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Deserialize)]
|
#[derive(Debug, Deserialize)]
|
||||||
struct PProfArgs {
|
struct PProfArgs {
|
||||||
#[serde(default = "PProfArgs::default_seconds")]
|
#[serde(default = "PProfArgs::default_seconds")]
|
||||||
|
@ -845,16 +836,19 @@ impl PProfAllocsArgs {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "pprof")]
|
||||||
#[tracing::instrument(level = "debug")]
|
#[tracing::instrument(level = "debug")]
|
||||||
async fn pprof_profile<M: ConnectionManager + Send + Sync + Debug + 'static>(
|
async fn pprof_profile<M: ConnectionManager + Send + Sync + Debug + 'static>(
|
||||||
req: Request<Body>,
|
req: Request<Body>,
|
||||||
) -> Result<Response<Body>, ApplicationError> {
|
) -> Result<Response<Body>, ApplicationError> {
|
||||||
|
use ::pprof::protos::Message;
|
||||||
let query_string = req.uri().query().unwrap_or_default();
|
let query_string = req.uri().query().unwrap_or_default();
|
||||||
let query: PProfArgs =
|
let query: PProfArgs =
|
||||||
serde_urlencoded::from_str(query_string).context(InvalidQueryString { query_string })?;
|
serde_urlencoded::from_str(query_string).context(InvalidQueryString { query_string })?;
|
||||||
|
|
||||||
let report = dump_rsprof(query.seconds, query.frequency.get())
|
let report = self::pprof::dump_rsprof(query.seconds, query.frequency.get())
|
||||||
.await
|
.await
|
||||||
|
.map_err(|e| Box::new(e) as _)
|
||||||
.context(PProf)?;
|
.context(PProf)?;
|
||||||
|
|
||||||
let mut body: Vec<u8> = Vec::new();
|
let mut body: Vec<u8> = Vec::new();
|
||||||
|
@ -868,18 +862,32 @@ async fn pprof_profile<M: ConnectionManager + Send + Sync + Debug + 'static>(
|
||||||
.flat_map(|i| i.to_str().unwrap_or_default().split(','))
|
.flat_map(|i| i.to_str().unwrap_or_default().split(','))
|
||||||
.any(|i| i == "text/html" || i == "image/svg+xml")
|
.any(|i| i == "text/html" || i == "image/svg+xml")
|
||||||
{
|
{
|
||||||
report.flamegraph(&mut body).context(PProf)?;
|
report
|
||||||
|
.flamegraph(&mut body)
|
||||||
|
.map_err(|e| Box::new(e) as _)
|
||||||
|
.context(PProf)?;
|
||||||
if body.is_empty() {
|
if body.is_empty() {
|
||||||
return EmptyFlamegraph.fail();
|
return EmptyFlamegraph.fail();
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
let profile = report.pprof().context(PProf)?;
|
let profile = report
|
||||||
|
.pprof()
|
||||||
|
.map_err(|e| Box::new(e) as _)
|
||||||
|
.context(PProf)?;
|
||||||
profile.encode(&mut body).context(Prost)?;
|
profile.encode(&mut body).context(Prost)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(Response::new(Body::from(body)))
|
Ok(Response::new(Body::from(body)))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(not(feature = "pprof"))]
|
||||||
|
#[tracing::instrument(level = "debug")]
|
||||||
|
async fn pprof_profile<M: ConnectionManager + Send + Sync + Debug + 'static>(
|
||||||
|
req: Request<Body>,
|
||||||
|
) -> Result<Response<Body>, ApplicationError> {
|
||||||
|
PProfIsNotCompiled {}.fail()
|
||||||
|
}
|
||||||
|
|
||||||
// If heappy support is enabled, call it
|
// If heappy support is enabled, call it
|
||||||
#[cfg(feature = "heappy")]
|
#[cfg(feature = "heappy")]
|
||||||
#[tracing::instrument(level = "debug")]
|
#[tracing::instrument(level = "debug")]
|
||||||
|
|
|
@ -0,0 +1,18 @@
|
||||||
|
use observability_deps::tracing::info;
|
||||||
|
use tokio::time::Duration;
|
||||||
|
|
||||||
|
pub async fn dump_rsprof(seconds: u64, frequency: i32) -> pprof::Result<pprof::Report> {
|
||||||
|
let guard = pprof::ProfilerGuard::new(frequency)?;
|
||||||
|
info!(
|
||||||
|
"start profiling {} seconds with frequency {} /s",
|
||||||
|
seconds, frequency
|
||||||
|
);
|
||||||
|
|
||||||
|
tokio::time::sleep(Duration::from_secs(seconds)).await;
|
||||||
|
|
||||||
|
info!(
|
||||||
|
"done profiling {} seconds with frequency {} /s",
|
||||||
|
seconds, frequency
|
||||||
|
);
|
||||||
|
guard.report().build()
|
||||||
|
}
|
Loading…
Reference in New Issue