Merge pull request #2412 from influxdata/pd/duration-sampling-interval
feat: Support sampling interval strings in data generatorpull/24376/head
commit
ee0e8be70c
|
@ -141,7 +141,7 @@ dependencies = [
|
||||||
"lazy_static",
|
"lazy_static",
|
||||||
"lexical-core",
|
"lexical-core",
|
||||||
"multiversion",
|
"multiversion",
|
||||||
"num",
|
"num 0.4.0",
|
||||||
"prettytable-rs",
|
"prettytable-rs",
|
||||||
"rand 0.8.4",
|
"rand 0.8.4",
|
||||||
"regex",
|
"regex",
|
||||||
|
@ -1903,6 +1903,7 @@ dependencies = [
|
||||||
"influxdb_iox_client",
|
"influxdb_iox_client",
|
||||||
"itertools 0.9.0",
|
"itertools 0.9.0",
|
||||||
"packers",
|
"packers",
|
||||||
|
"parse_duration",
|
||||||
"rand 0.8.4",
|
"rand 0.8.4",
|
||||||
"rand_core 0.6.3",
|
"rand_core 0.6.3",
|
||||||
"rand_seeder",
|
"rand_seeder",
|
||||||
|
@ -2370,6 +2371,20 @@ dependencies = [
|
||||||
"winapi",
|
"winapi",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "num"
|
||||||
|
version = "0.2.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "b8536030f9fea7127f841b45bb6243b27255787fb4eb83958aa1ef9d2fdc0c36"
|
||||||
|
dependencies = [
|
||||||
|
"num-bigint 0.2.6",
|
||||||
|
"num-complex 0.2.4",
|
||||||
|
"num-integer",
|
||||||
|
"num-iter",
|
||||||
|
"num-rational 0.2.4",
|
||||||
|
"num-traits",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "num"
|
name = "num"
|
||||||
version = "0.4.0"
|
version = "0.4.0"
|
||||||
|
@ -2377,10 +2392,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "43db66d1170d347f9a065114077f7dccb00c1b9478c89384490a3425279a4606"
|
checksum = "43db66d1170d347f9a065114077f7dccb00c1b9478c89384490a3425279a4606"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"num-bigint 0.4.0",
|
"num-bigint 0.4.0",
|
||||||
"num-complex",
|
"num-complex 0.4.0",
|
||||||
"num-integer",
|
"num-integer",
|
||||||
"num-iter",
|
"num-iter",
|
||||||
"num-rational",
|
"num-rational 0.4.0",
|
||||||
"num-traits",
|
"num-traits",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
@ -2406,6 +2421,16 @@ dependencies = [
|
||||||
"num-traits",
|
"num-traits",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "num-complex"
|
||||||
|
version = "0.2.4"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "b6b19411a9719e753aff12e5187b74d60d3dc449ec3f4dc21e3989c3f554bc95"
|
||||||
|
dependencies = [
|
||||||
|
"autocfg",
|
||||||
|
"num-traits",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "num-complex"
|
name = "num-complex"
|
||||||
version = "0.4.0"
|
version = "0.4.0"
|
||||||
|
@ -2446,6 +2471,18 @@ dependencies = [
|
||||||
"num-traits",
|
"num-traits",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "num-rational"
|
||||||
|
version = "0.2.4"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "5c000134b5dbf44adc5cb772486d335293351644b801551abe8f75c84cfa4aef"
|
||||||
|
dependencies = [
|
||||||
|
"autocfg",
|
||||||
|
"num-bigint 0.2.6",
|
||||||
|
"num-integer",
|
||||||
|
"num-traits",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "num-rational"
|
name = "num-rational"
|
||||||
version = "0.4.0"
|
version = "0.4.0"
|
||||||
|
@ -2818,6 +2855,17 @@ dependencies = [
|
||||||
"uuid",
|
"uuid",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "parse_duration"
|
||||||
|
version = "2.1.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "7037e5e93e0172a5a96874380bf73bc6ecef022e26fa25f2be26864d6b3ba95d"
|
||||||
|
dependencies = [
|
||||||
|
"lazy_static",
|
||||||
|
"num 0.2.1",
|
||||||
|
"regex",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "paste"
|
name = "paste"
|
||||||
version = "1.0.5"
|
version = "1.0.5"
|
||||||
|
|
|
@ -16,6 +16,7 @@ generated_types = { path = "../generated_types" }
|
||||||
influxdb2_client = { path = "../influxdb2_client" }
|
influxdb2_client = { path = "../influxdb2_client" }
|
||||||
influxdb_iox_client = { path = "../influxdb_iox_client" }
|
influxdb_iox_client = { path = "../influxdb_iox_client" }
|
||||||
packers = { path = "../packers" }
|
packers = { path = "../packers" }
|
||||||
|
parse_duration = "2.1.1"
|
||||||
itertools = "0.9.0"
|
itertools = "0.9.0"
|
||||||
rand = { version = "0.8.3", features = ["small_rng"] }
|
rand = { version = "0.8.3", features = ["small_rng"] }
|
||||||
rand_core = "0.6.2"
|
rand_core = "0.6.2"
|
||||||
|
|
|
@ -11,7 +11,7 @@ pub fn single_agent(c: &mut Criterion) {
|
||||||
agents: vec![AgentSpec {
|
agents: vec![AgentSpec {
|
||||||
name: "agent-1".into(),
|
name: "agent-1".into(),
|
||||||
count: None,
|
count: None,
|
||||||
sampling_interval: Some(1),
|
sampling_interval: Some("1s".to_string()),
|
||||||
name_tag_key: None,
|
name_tag_key: None,
|
||||||
tags: vec![],
|
tags: vec![],
|
||||||
measurements: vec![MeasurementSpec {
|
measurements: vec![MeasurementSpec {
|
||||||
|
|
|
@ -6,7 +6,7 @@ base_seed = "correct horse battery staple"
|
||||||
[[agents]]
|
[[agents]]
|
||||||
name = "cap_write_{{agent_id}}"
|
name = "cap_write_{{agent_id}}"
|
||||||
count = 3
|
count = 3
|
||||||
sampling_interval = 10
|
sampling_interval = "10s"
|
||||||
|
|
||||||
[[agents.measurements]]
|
[[agents.measurements]]
|
||||||
name = "system"
|
name = "system"
|
||||||
|
|
|
@ -11,7 +11,7 @@ base_seed = "this is a demo"
|
||||||
|
|
||||||
[[agents]]
|
[[agents]]
|
||||||
name = "basic"
|
name = "basic"
|
||||||
sampling_interval = 10 # in seconds. TODO: parse nice durations like "12m" and "30s"
|
sampling_interval = "10s"
|
||||||
|
|
||||||
[[agents.measurements]]
|
[[agents.measurements]]
|
||||||
name = "cpu"
|
name = "cpu"
|
||||||
|
|
|
@ -4,7 +4,7 @@ base_seed = "correct horse battery staple"
|
||||||
# the most basic spec with no auto generating of agents, measurements, tags or fields
|
# the most basic spec with no auto generating of agents, measurements, tags or fields
|
||||||
[[agents]]
|
[[agents]]
|
||||||
name = "demo"
|
name = "demo"
|
||||||
sampling_interval = 10
|
sampling_interval = "10s"
|
||||||
|
|
||||||
[[agents.measurements]]
|
[[agents.measurements]]
|
||||||
name = "some_measurement"
|
name = "some_measurement"
|
||||||
|
@ -65,7 +65,7 @@ sampling_interval = 10
|
||||||
[[agents]]
|
[[agents]]
|
||||||
name = "some-server-{{agent_id}}"
|
name = "some-server-{{agent_id}}"
|
||||||
count = 10
|
count = 10
|
||||||
sampling_interval = 22
|
sampling_interval = "22s"
|
||||||
|
|
||||||
# Optional: every measurement (row) this agent produces will include a tag with the agent_id filled
|
# Optional: every measurement (row) this agent produces will include a tag with the agent_id filled
|
||||||
# in:
|
# in:
|
||||||
|
|
|
@ -3,7 +3,7 @@ base_seed = "this is a demo"
|
||||||
|
|
||||||
[[agents]]
|
[[agents]]
|
||||||
name = "trace-sender"
|
name = "trace-sender"
|
||||||
sampling_interval = 10 # in seconds. TODO: parse nice durations like "12m" and "30s"
|
sampling_interval = "10s"
|
||||||
|
|
||||||
[[agents.measurements]]
|
[[agents.measurements]]
|
||||||
name = "traces"
|
name = "traces"
|
||||||
|
|
|
@ -6,6 +6,7 @@ use crate::{
|
||||||
};
|
};
|
||||||
|
|
||||||
use influxdb2_client::models::DataPoint;
|
use influxdb2_client::models::DataPoint;
|
||||||
|
use parse_duration;
|
||||||
use snafu::{ResultExt, Snafu};
|
use snafu::{ResultExt, Snafu};
|
||||||
use std::{fmt, time::Duration};
|
use std::{fmt, time::Duration};
|
||||||
use tracing::{debug, info};
|
use tracing::{debug, info};
|
||||||
|
@ -36,6 +37,13 @@ pub enum Error {
|
||||||
/// Underlying `write` module error that caused this problem
|
/// Underlying `write` module error that caused this problem
|
||||||
source: crate::write::Error,
|
source: crate::write::Error,
|
||||||
},
|
},
|
||||||
|
|
||||||
|
/// Error that may happen if the provided sampling interval string can't be parsed into a duration
|
||||||
|
#[snafu(display("Sampling interval must be valid string: {}", source))]
|
||||||
|
InvalidSamplingInterval {
|
||||||
|
/// Underlying `parse` error
|
||||||
|
source: parse_duration::parse::Error,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Each `AgentSpec` informs the instantiation of an `Agent`, which coordinates
|
/// Each `AgentSpec` informs the instantiation of an `Agent`, which coordinates
|
||||||
|
@ -48,7 +56,7 @@ pub struct Agent<T: DataGenRng> {
|
||||||
rng: RandomNumberGenerator<T>,
|
rng: RandomNumberGenerator<T>,
|
||||||
agent_tags: Vec<Tag>,
|
agent_tags: Vec<Tag>,
|
||||||
measurement_generator_sets: Vec<MeasurementGeneratorSet<T>>,
|
measurement_generator_sets: Vec<MeasurementGeneratorSet<T>>,
|
||||||
sampling_interval: Option<i64>,
|
sampling_interval: Option<Duration>,
|
||||||
/// nanoseconds since the epoch, used as the timestamp for the next
|
/// nanoseconds since the epoch, used as the timestamp for the next
|
||||||
/// generated point
|
/// generated point
|
||||||
current_datetime: i64,
|
current_datetime: i64,
|
||||||
|
@ -105,9 +113,10 @@ impl<T: DataGenRng> Agent<T> {
|
||||||
let end_datetime = end_datetime.unwrap_or_else(now_ns);
|
let end_datetime = end_datetime.unwrap_or_else(now_ns);
|
||||||
|
|
||||||
// Convert to nanoseconds
|
// Convert to nanoseconds
|
||||||
let sampling_interval = agent_spec
|
let sampling_interval = match &agent_spec.sampling_interval {
|
||||||
.sampling_interval
|
None => None,
|
||||||
.map(|s| s as i64 * 1_000_000_000);
|
Some(s) => Some(parse_duration::parse(s).context(InvalidSamplingInterval)?),
|
||||||
|
};
|
||||||
|
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
agent_id,
|
agent_id,
|
||||||
|
@ -169,12 +178,12 @@ impl<T: DataGenRng> Agent<T> {
|
||||||
if let Some(i) = &mut self.interval {
|
if let Some(i) = &mut self.interval {
|
||||||
i.tick().await;
|
i.tick().await;
|
||||||
self.current_datetime = now_ns();
|
self.current_datetime = now_ns();
|
||||||
} else if let Some(ns) = self.sampling_interval {
|
} else if let Some(sampling_interval) = self.sampling_interval {
|
||||||
self.current_datetime += ns;
|
self.current_datetime += sampling_interval.as_nanos() as i64;
|
||||||
|
|
||||||
if self.current_datetime > self.end_datetime {
|
if self.current_datetime > self.end_datetime {
|
||||||
if self.continue_on {
|
if self.continue_on {
|
||||||
let mut i = tokio::time::interval(Duration::from_nanos(ns as u64));
|
let mut i = tokio::time::interval(sampling_interval);
|
||||||
i.tick().await; // first tick completes immediately
|
i.tick().await; // first tick completes immediately
|
||||||
self.current_datetime = now_ns();
|
self.current_datetime = now_ns();
|
||||||
self.interval = Some(i);
|
self.interval = Some(i);
|
||||||
|
@ -214,7 +223,7 @@ mod test {
|
||||||
/// testing, keeping everything else constant across different
|
/// testing, keeping everything else constant across different
|
||||||
/// tests.
|
/// tests.
|
||||||
fn test_instance(
|
fn test_instance(
|
||||||
sampling_interval: Option<i64>,
|
sampling_interval: Option<Duration>,
|
||||||
continue_on: bool,
|
continue_on: bool,
|
||||||
current_datetime: i64,
|
current_datetime: i64,
|
||||||
end_datetime: i64,
|
end_datetime: i64,
|
||||||
|
@ -380,7 +389,7 @@ mod test {
|
||||||
// The tests take about 5 ms to run on my computer, so set the sampling interval
|
// The tests take about 5 ms to run on my computer, so set the sampling interval
|
||||||
// to 10 ms to be able to test that the delay is happening when
|
// to 10 ms to be able to test that the delay is happening when
|
||||||
// `continue` is true without making the tests too artificially slow.
|
// `continue` is true without making the tests too artificially slow.
|
||||||
const TEST_SAMPLING_INTERVAL: i64 = 10_000_000;
|
const TEST_SAMPLING_INTERVAL: Duration = Duration::from_millis(10);
|
||||||
|
|
||||||
#[rustfmt::skip]
|
#[rustfmt::skip]
|
||||||
// # Summary: Not continuing
|
// # Summary: Not continuing
|
||||||
|
@ -401,7 +410,7 @@ mod test {
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn current_time_less_than_end_time() -> Result<()> {
|
async fn current_time_less_than_end_time() -> Result<()> {
|
||||||
let current = 0;
|
let current = 0;
|
||||||
let end = TEST_SAMPLING_INTERVAL;
|
let end = TEST_SAMPLING_INTERVAL.as_nanos() as i64;
|
||||||
|
|
||||||
let mut agent =
|
let mut agent =
|
||||||
Agent::<ZeroRng>::test_instance(Some(TEST_SAMPLING_INTERVAL), false, current, end);
|
Agent::<ZeroRng>::test_instance(Some(TEST_SAMPLING_INTERVAL), false, current, end);
|
||||||
|
@ -420,7 +429,7 @@ mod test {
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn current_time_equal_end_time() -> Result<()> {
|
async fn current_time_equal_end_time() -> Result<()> {
|
||||||
let current = TEST_SAMPLING_INTERVAL;
|
let current = TEST_SAMPLING_INTERVAL.as_nanos() as i64;
|
||||||
let end = current;
|
let end = current;
|
||||||
|
|
||||||
let mut agent =
|
let mut agent =
|
||||||
|
@ -437,8 +446,8 @@ mod test {
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn current_time_greater_than_end_time() -> Result<()> {
|
async fn current_time_greater_than_end_time() -> Result<()> {
|
||||||
let current = 2 * TEST_SAMPLING_INTERVAL;
|
let current = 2 * TEST_SAMPLING_INTERVAL.as_nanos() as i64;
|
||||||
let end = TEST_SAMPLING_INTERVAL;
|
let end = TEST_SAMPLING_INTERVAL.as_nanos() as i64;
|
||||||
|
|
||||||
let mut agent =
|
let mut agent =
|
||||||
Agent::<ZeroRng>::test_instance(Some(TEST_SAMPLING_INTERVAL), false, current, end);
|
Agent::<ZeroRng>::test_instance(Some(TEST_SAMPLING_INTERVAL), false, current, end);
|
||||||
|
@ -473,7 +482,7 @@ mod test {
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn current_time_less_than_end_time() -> Result<()> {
|
async fn current_time_less_than_end_time() -> Result<()> {
|
||||||
let end = now_ns();
|
let end = now_ns();
|
||||||
let current = end - TEST_SAMPLING_INTERVAL;
|
let current = end - TEST_SAMPLING_INTERVAL.as_nanos() as i64;
|
||||||
|
|
||||||
let mut agent =
|
let mut agent =
|
||||||
Agent::<ZeroRng>::test_instance(Some(TEST_SAMPLING_INTERVAL), true, current, end);
|
Agent::<ZeroRng>::test_instance(Some(TEST_SAMPLING_INTERVAL), true, current, end);
|
||||||
|
@ -530,7 +539,7 @@ mod test {
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn current_time_greater_than_end_time() -> Result<()> {
|
async fn current_time_greater_than_end_time() -> Result<()> {
|
||||||
let end = now_ns();
|
let end = now_ns();
|
||||||
let current = end + TEST_SAMPLING_INTERVAL;
|
let current = end + TEST_SAMPLING_INTERVAL.as_nanos() as i64;
|
||||||
|
|
||||||
let mut agent =
|
let mut agent =
|
||||||
Agent::<ZeroRng>::test_instance(Some(TEST_SAMPLING_INTERVAL), true, current, end);
|
Agent::<ZeroRng>::test_instance(Some(TEST_SAMPLING_INTERVAL), true, current, end);
|
||||||
|
|
|
@ -289,7 +289,7 @@ name = "demo_schema"
|
||||||
|
|
||||||
[[agents]]
|
[[agents]]
|
||||||
name = "basic"
|
name = "basic"
|
||||||
sampling_interval = 10 # seconds
|
sampling_interval = "10s" # seconds
|
||||||
|
|
||||||
[[agents.measurements]]
|
[[agents.measurements]]
|
||||||
name = "cpu"
|
name = "cpu"
|
||||||
|
|
|
@ -89,9 +89,9 @@ pub struct AgentSpec {
|
||||||
/// Specifies the number of agents that should be created with this spec.
|
/// Specifies the number of agents that should be created with this spec.
|
||||||
/// Default value is 1.
|
/// Default value is 1.
|
||||||
pub count: Option<usize>,
|
pub count: Option<usize>,
|
||||||
/// How often this agent should generate samples, in number of seconds. If
|
/// How often this agent should generate samples, in a duration string. If
|
||||||
/// not specified, this agent will only generate one sample.
|
/// not specified, this agent will only generate one sample.
|
||||||
pub sampling_interval: Option<usize>,
|
pub sampling_interval: Option<String>,
|
||||||
/// If specified, every measurement generated by this agent will include a
|
/// If specified, every measurement generated by this agent will include a
|
||||||
/// tag with this `String` as its key, and with the `AgentSpec`'s `name`
|
/// tag with this `String` as its key, and with the `AgentSpec`'s `name`
|
||||||
/// as the value (with any substitutions in the `name` performed)
|
/// as the value (with any substitutions in the `name` performed)
|
||||||
|
|
|
@ -368,7 +368,7 @@ base_seed = "this is a demo"
|
||||||
|
|
||||||
[[agents]]
|
[[agents]]
|
||||||
name = "basic"
|
name = "basic"
|
||||||
sampling_interval = 1
|
sampling_interval = "1s" # seconds
|
||||||
|
|
||||||
[[agents.measurements]]
|
[[agents.measurements]]
|
||||||
name = "cpu"
|
name = "cpu"
|
||||||
|
|
Loading…
Reference in New Issue