feat: report system stats in load generator (#24871)

* feat: report system stats in load generator

Added the mechanism to report system stats during load generation. The
following stats are saved in a CSV file:

- cpu_usage
- disk_written_bytes
- disk_read_bytes
- memory
- virtual_memory

This only works when running the load generator against a local instance
of influxdb3, i.e., one that is running on your machine.

Generating system stats is done by passing the --system-stats flag to the
load generator.
pull/24876/head
Trevor Hilton 2024-04-02 17:16:17 -04:00 committed by GitHub
parent 1b3d279d70
commit 2dde602995
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 159 additions and 18 deletions

10
Cargo.lock generated
View File

@ -2555,12 +2555,14 @@ dependencies = [
"dotenvy",
"humantime",
"influxdb3_client",
"influxdb3_process",
"observability_deps",
"parking_lot",
"rand",
"secrecy",
"serde",
"serde_json",
"sysinfo",
"thiserror",
"tokio",
"trogging",
@ -4435,7 +4437,7 @@ checksum = "c55e02e35260070b6f716a2423c2ff1c3bb1642ddca6f99e1f26d06268a0e2d2"
dependencies = [
"bytes",
"heck 0.4.1",
"itertools 0.10.5",
"itertools 0.11.0",
"log",
"multimap",
"once_cell",
@ -4469,7 +4471,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "efb6c9a1dd1def8e2124d17e83a20af56f1570d6c2d2bd9e266ccb768df3840e"
dependencies = [
"anyhow",
"itertools 0.10.5",
"itertools 0.11.0",
"proc-macro2",
"quote",
"syn 2.0.53",
@ -5739,9 +5741,9 @@ checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160"
[[package]]
name = "sysinfo"
version = "0.30.7"
version = "0.30.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c385888ef380a852a16209afc8cfad22795dd8873d69c9a14d2e2088f118d18"
checksum = "4b1a378e48fb3ce3a5cf04359c456c9c98ff689bcf1c1bc6e6a31f247686f275"
dependencies = [
"cfg-if",
"core-foundation-sys",

View File

@ -86,6 +86,7 @@ serde_urlencoded = "0.7.0"
sha2 = "0.10.8"
snap = "1.0.0"
sqlparser = "0.41.0"
sysinfo = "0.30.8"
thiserror = "1.0"
tokio = { version = "1.35", features = ["full"] }
tokio-util = "0.7.9"

View File

@ -5,8 +5,6 @@ authors.workspace = true
edition.workspace = true
license.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
# Core Crates
observability_deps.workspace = true
@ -14,6 +12,7 @@ trogging.workspace = true
# Local Deps
influxdb3_client = { path = "../influxdb3_client" }
influxdb3_process = { path = "../influxdb3_process", default-features = false }
# crates.io Dependencies
anyhow.workspace = true
@ -28,6 +27,7 @@ rand.workspace = true
secrecy.workspace = true
serde.workspace = true
serde_json.workspace = true
sysinfo.workspace = true
tokio.workspace = true
thiserror.workspace = true
url.workspace = true

View File

@ -10,7 +10,7 @@ use tokio::time::Instant;
use crate::{
commands::common::LoadType,
query_generator::{create_queriers, Format, Querier},
report::QueryReporter,
report::{QueryReporter, SystemStatsReporter},
};
use super::common::InfluxDb3Config;
@ -62,12 +62,28 @@ pub(crate) async fn command(config: Config) -> Result<(), anyhow::Error> {
// set up a results reporter and spawn a thread to flush results
println!("generating results in: {results_file_path}");
let query_reporter = Arc::new(QueryReporter::new(results_file)?);
let query_reporter = Arc::new(QueryReporter::new(results_file));
let reporter = Arc::clone(&query_reporter);
tokio::task::spawn_blocking(move || {
reporter.flush_reports();
});
// spawn system stats collection
let stats_reporter = if let (Some(stats_file), Some(stats_file_path)) = (
load_config.system_stats_file,
load_config.system_stats_file_path,
) {
println!("generating system stats in: {stats_file_path}");
let stats_reporter = Arc::new(SystemStatsReporter::new(stats_file)?);
let s = Arc::clone(&stats_reporter);
tokio::task::spawn_blocking(move || {
s.report_stats();
});
Some((stats_file_path, stats_reporter))
} else {
None
};
// create a InfluxDB Client and spawn tasks for each querier
let mut tasks = Vec::new();
for querier in queriers {
@ -92,6 +108,11 @@ pub(crate) async fn command(config: Config) -> Result<(), anyhow::Error> {
query_reporter.shutdown();
println!("results saved in: {results_file_path}");
if let Some((stats_file_path, stats_reporter)) = stats_reporter {
println!("system stats saved in: {stats_file_path}");
stats_reporter.shutdown();
}
Ok(())
}

View File

@ -1,6 +1,6 @@
use crate::commands::common::LoadType;
use crate::line_protocol_generator::{create_generators, Generator};
use crate::report::WriteReporter;
use crate::report::{SystemStatsReporter, WriteReporter};
use anyhow::Context;
use chrono::{DateTime, Local};
use clap::Parser;
@ -112,7 +112,21 @@ pub(crate) async fn command(config: Config) -> Result<(), anyhow::Error> {
reporter.flush_reports();
});
// TODO - spawn system stats collection
// spawn system stats collection
let stats_reporter = if let (Some(stats_file), Some(stats_file_path)) = (
load_config.system_stats_file,
load_config.system_stats_file_path,
) {
println!("generating system stats in: {stats_file_path}");
let stats_reporter = Arc::new(SystemStatsReporter::new(stats_file)?);
let s = Arc::clone(&stats_reporter);
tokio::task::spawn_blocking(move || {
s.report_stats();
});
Some((stats_file_path, stats_reporter))
} else {
None
};
// spawn tokio tasks for each writer
let mut tasks = Vec::new();
@ -140,6 +154,11 @@ pub(crate) async fn command(config: Config) -> Result<(), anyhow::Error> {
write_reporter.shutdown();
println!("results saved in: {results_file_path}");
if let Some((stats_file_path, stats_reporter)) = stats_reporter {
println!("system stats saved in: {stats_file_path}");
stats_reporter.shutdown();
}
Ok(())
}

View File

@ -2,13 +2,15 @@
use crate::line_protocol_generator::{WriteSummary, WriterId};
use crate::query_generator::QuerierId;
use anyhow::Context;
use anyhow::{bail, Context};
use chrono::{DateTime, Local};
use influxdb3_process::INFLUXDB3_PROCESS_NAME;
use parking_lot::Mutex;
use serde::Serialize;
use std::collections::HashMap;
use std::fs::File;
use std::time::{Duration, Instant};
use sysinfo::{Pid, Process, ProcessRefreshKind, System};
// Logged reports will be flushed to the csv file on this interval
const REPORT_FLUSH_INTERVAL: Duration = Duration::from_millis(100);
@ -216,13 +218,13 @@ pub struct QueryReporter {
}
impl QueryReporter {
pub fn new(csv_file: File) -> Result<Self, anyhow::Error> {
pub fn new(csv_file: File) -> Self {
let csv_writer = Mutex::new(csv::Writer::from_writer(csv_file));
Ok(Self {
Self {
state: Mutex::new(vec![]),
csv_writer,
shutdown: Mutex::new(false),
})
}
}
pub fn report(
@ -278,8 +280,8 @@ impl QueryReporter {
}
csv_writer.flush().expect("failed to flush csv reports");
if console_stats.last_console_outptu_time.elapsed() > CONSOLE_REPORT_INTERVAL {
let elapsed_millis = console_stats.last_console_outptu_time.elapsed().as_millis();
if console_stats.last_console_output_time.elapsed() > CONSOLE_REPORT_INTERVAL {
let elapsed_millis = console_stats.last_console_output_time.elapsed().as_millis();
println!(
"success: {:.0}/s, error: {:.0}/s, rows: {:.0}/s",
@ -315,7 +317,7 @@ struct QueryRecord {
}
struct QueryConsoleStats {
last_console_outptu_time: Instant,
last_console_output_time: Instant,
success: usize,
error: usize,
rows: u64,
@ -324,10 +326,103 @@ struct QueryConsoleStats {
impl QueryConsoleStats {
fn new() -> Self {
Self {
last_console_outptu_time: Instant::now(),
last_console_output_time: Instant::now(),
success: 0,
error: 0,
rows: 0,
}
}
}
const SYSTEM_STATS_REPORT_INTERVAL: Duration = Duration::from_millis(500);
#[derive(Debug, Copy, Clone, Serialize)]
pub struct SystemStatsRecord {
wall_time: DateTime<Local>,
test_time_ms: u128,
cpu_usage: f32,
written_bytes: u64,
read_bytes: u64,
memory_bytes: u64,
virtual_memory_bytes: u64,
}
#[derive(Debug)]
pub struct SystemStatsReporter {
pid: Pid,
system: Mutex<System>,
csv_writer: Mutex<csv::Writer<File>>,
shutdown: Mutex<bool>,
}
impl SystemStatsReporter {
pub fn new(csv_file: File) -> Result<Self, anyhow::Error> {
let csv_writer = Mutex::new(csv::Writer::from_writer(csv_file));
let mut system = System::new_all();
let mut processes = system
.processes_by_exact_name(INFLUXDB3_PROCESS_NAME)
.collect::<Vec<&Process>>();
if processes.is_empty() {
bail!("there is no '{}' process", INFLUXDB3_PROCESS_NAME);
}
if processes.len() > 1 {
bail!(
"ensure there is only one '{}' process running on your operating system",
INFLUXDB3_PROCESS_NAME
);
}
let pid = processes.pop().unwrap().pid();
// refresh the system stats for the process to initialize the baseline:
system.refresh_pids(&[pid]);
Ok(Self {
pid,
system: Mutex::new(system),
csv_writer,
shutdown: Mutex::new(false),
})
}
pub fn report_stats(&self) {
let start_time = Instant::now();
loop {
let mut system = self.system.lock();
system.refresh_pids_specifics(
&[self.pid],
ProcessRefreshKind::new()
.with_cpu()
.with_memory()
.with_disk_usage(),
);
let process = system
.process(self.pid)
.unwrap_or_else(|| panic!("process with pid: {}", self.pid));
let mut csv_writer = self.csv_writer.lock();
let test_time_ms = Instant::now().duration_since(start_time).as_millis();
csv_writer
.serialize(SystemStatsRecord {
wall_time: Local::now(),
test_time_ms,
cpu_usage: process.cpu_usage(),
written_bytes: process.disk_usage().written_bytes,
read_bytes: process.disk_usage().read_bytes,
memory_bytes: process.memory(),
virtual_memory_bytes: process.virtual_memory(),
})
.expect("failed to write csv record for system stats");
csv_writer.flush().expect("flush system stats csv reports");
if *self.shutdown.lock() {
return;
}
std::thread::sleep(
sysinfo::MINIMUM_CPU_UPDATE_INTERVAL.max(SYSTEM_STATS_REPORT_INTERVAL),
);
}
}
pub fn shutdown(&self) {
*self.shutdown.lock() = true;
}
}

View File

@ -4,6 +4,9 @@ use iox_time::{SystemProvider, Time, TimeProvider};
use metric::U64Gauge;
use once_cell::sync::Lazy;
/// The process name on the local OS running `influxdb3`
pub const INFLUXDB3_PROCESS_NAME: &str = "influxdb3";
#[cfg(all(not(feature = "heappy"), feature = "jemalloc_replacing_malloc"))]
pub mod jemalloc;