feat: Refactor Data Generator

This is a huge commit that refactors the data generator. It removes many of the previous features that didn't quite make sense. The goal of this refactor was to make the data generator capable of representing complex tagsets that have values dependent on each other. It also significantly optimizes things to use far less memory and generate data much faster. Follow on work will update the generation of line protocol to support spaces in tags and their keys, double quotes in strings, and add more examples and documentation.
pull/24376/head
Paul Dix 2021-10-26 15:34:35 -04:00
parent 348b91edc4
commit 7044b89453
20 changed files with 1385 additions and 3009 deletions

32
Cargo.lock generated
View File

@ -333,9 +333,9 @@ dependencies = [
[[package]]
name = "bitflags"
version = "1.2.1"
version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693"
checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
name = "bitvec"
@ -1774,8 +1774,6 @@ dependencies = [
"influxdb_iox_client",
"itertools",
"rand",
"rand_core",
"rand_seeder",
"serde",
"serde_json",
"snafu",
@ -2318,22 +2316,21 @@ dependencies = [
[[package]]
name = "nix"
version = "0.20.2"
version = "0.20.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f5e06129fb611568ef4e868c14b326274959aa70ff7776e9d55323531c374945"
checksum = "fa9b4819da1bc61c0ea48b63b7bc8604064dd43013e7cc325df098d49cd7c18a"
dependencies = [
"bitflags",
"cc",
"cfg-if",
"libc",
"memoffset",
]
[[package]]
name = "nix"
version = "0.22.2"
version = "0.22.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3bb9a13fa32bc5aeb64150cd3f32d6cf4c748f8f8a417cce5d2eb976a8370ba"
checksum = "cf1e25ee6b412c2a1e3fcb6a4499a5c1bfe7f43e014bdce9a6b6666e5aa2d187"
dependencies = [
"bitflags",
"cc",
@ -3044,7 +3041,7 @@ dependencies = [
"lazy_static",
"libc",
"log",
"nix 0.20.2",
"nix 0.20.0",
"parking_lot",
"prost",
"prost-build",
@ -3373,15 +3370,6 @@ dependencies = [
"rand_core",
]
[[package]]
name = "rand_seeder"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "612dd698949d531335b4c29d1c64fb11942798decfc08abc218578942e66d7d0"
dependencies = [
"rand_core",
]
[[package]]
name = "rayon"
version = "1.5.1"
@ -3718,7 +3706,7 @@ dependencies = [
"libc",
"log",
"memchr",
"nix 0.22.2",
"nix 0.22.0",
"radix_trie",
"scopeguard",
"smallvec",
@ -3788,9 +3776,9 @@ dependencies = [
[[package]]
name = "security-framework"
version = "2.3.1"
version = "2.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "23a2ac85147a3a11d77ecf1bc7166ec0b92febfa4461c37944e180f319ece467"
checksum = "525bc1abfda2e1998d152c45cf13e696f76d0a4972310b22fac1658b05df7c87"
dependencies = [
"bitflags",
"core-foundation",

View File

@ -18,8 +18,6 @@ influxdb2_client = { path = "../influxdb2_client" }
influxdb_iox_client = { path = "../influxdb_iox_client" }
itertools = "0.10.0"
rand = { version = "0.8.3", features = ["small_rng"] }
rand_core = "0.6.2"
rand_seeder = "0.2.1"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.67"
snafu = "0.6.8"

View File

@ -1,29 +1,22 @@
use criterion::{criterion_group, criterion_main, Criterion, Throughput};
use iox_data_generator::agent::Agent;
use iox_data_generator::{
specification::{AgentSpec, DataSpec, FieldSpec, FieldValueSpec, MeasurementSpec},
write::PointsWriterBuilder,
tag_set::GeneratedTagSets,
write::PointsWriterBuilder,
};
use iox_data_generator::agent::Agent;
use rand::Rng;
pub fn single_agent(c: &mut Criterion) {
let spec = DataSpec {
base_seed: Some("faster faster faster".into()),
include_spec_tag: Some(true),
name: "benchmark".into(),
values: vec![],
tag_sets: vec![],
agents: vec![AgentSpec {
name: "agent-1".into(),
count: None,
sampling_interval: Some("1s".to_string()),
name_tag_key: None,
tags: vec![],
measurements: vec![MeasurementSpec {
name: "measurement-1".into(),
count: None,
tags: vec![],
fields: vec![FieldSpec {
name: "field-1".into(),
field_value_spec: FieldValueSpec::Bool(true),
@ -32,6 +25,8 @@ pub fn single_agent(c: &mut Criterion) {
tag_set: None,
tag_pairs: vec![],
}],
has_one: vec![],
tag_pairs: vec![],
}],
};
@ -49,18 +44,16 @@ pub fn single_agent(c: &mut Criterion) {
group.bench_function("single agent with basic configuration", |b| {
b.iter(|| {
let r = block_on({
iox_data_generator::generate::<rand::rngs::SmallRng>(
&spec,
&mut points_writer,
start_datetime,
end_datetime,
0,
false,
1,
false,
)
});
let r = block_on(iox_data_generator::generate(
&spec,
&mut points_writer,
start_datetime,
end_datetime,
0,
false,
1,
false,
));
let n_points = r.expect("Could not generate data");
assert_eq!(n_points, expected_points as usize);
})
@ -69,54 +62,87 @@ pub fn single_agent(c: &mut Criterion) {
pub fn agent_pre_generated(c: &mut Criterion) {
let spec: DataSpec = toml::from_str(r#"
name = "benchmark"
name = "storage_cardinality_example"
base_seed = "this is a demo"
[[values]]
name = "parent"
template = "parent{{id}}"
cardinality = 10
has_one = ["has_one"]
# Values are automatically generated before the agents are intialized. They generate tag key/value pairs
# with the name of the value as the tag key and the evaluated template as the value. These pairs
# are Arc wrapped so they can be shared across tagsets and used in the agents as pre-generated data.
[[values]]
# the name must not have a . in it, which is used to access children later. Otherwise it's open.
name = "role"
# the template can use a number of helpers to get an id, a random string and the name, see below for examples
template = "storage"
# this number of tag pairs will be generated. If this is > 1, the id or a random character string should be
# used in the template to ensure that the tag key/value pairs are unique.
cardinality = 1
[[values]]
name = "child"
template = "child{{id}}"
belongs_to = "parent"
cardinality = 5
[[values]]
name = "url"
template = "http://127.0.0.1:6060/metrics/usage"
cardinality = 1
[[values]]
name = "has_one"
template = "has_one{{id}}"
cardinality = 3
[[values]]
name = "org_id"
# Fill in the value with the cardinality counter and 15 random alphanumeric characters
template = "{{id}}_{{random 15}}"
cardinality = 1000
has_one = ["env"]
[[tag_sets]]
name = "the-set"
for_each = [
"parent",
"parent.has_one",
"parent.child",
]
[[values]]
name = "env"
template = "whatever-environment-{{id}}"
cardinality = 10
[[agents]]
name = "agent-1"
sampling_interval = "1s"
[[values]]
name = "bucket_id"
# a bucket belongs to an org. With this, you would be able to access the org.id or org.value in the template
belongs_to = "org_id"
# each bucket will have a unique id, which is used here to guarantee uniqueness even across orgs. We also
# have a random 15 character alphanumeric sequence to pad out the value length.
template = "{{id}}_{{random 15}}"
# For each org, 3 buckets will be generated
cardinality = 3
[[agents.measurements]]
name = "measurement-1"
tag_set = "the-set"
tag_pairs = [
{key = "foo", template = "bar{{measurement.id}}"},
{key = "hello", template = "world"},
]
[[values]]
name = "partition_id"
template = "{{id}}"
cardinality = 10
[[agents.measurements.fields]]
name = "field-1"
bool = true
"#).unwrap();
# makes a tagset so every bucket appears in every partition. The other tags are descriptive and don't
# increase the cardiality beyond count(bucket) * count(partition). Later this example will use the
# agent and measurement generation to take this base tagset and increase cardinality on a per-agent basis.
[[tag_sets]]
name = "bucket_set"
for_each = [
"role",
"url",
"org_id",
"org_id.env",
"org_id.bucket_id",
"partition_id",
]
let seed = spec.base_seed.to_owned().unwrap_or_else(|| {
let mut rng = rand::thread_rng();
format!("{:04}", rng.gen_range(0..10000))
});
[[agents]]
name = "metric-scraper"
# create this many agents
count = 3
[[agents.measurements]]
name = "storage_usage_bucket_cardinality"
# each sampling will have all the tag sets from this collection in addition to the tags and tag_pairs specified
tag_set = "bucket_set"
# for each agent, this specific measurement will be decorated with these additional tags.
tag_pairs = [
{key = "node_id", template = "{{agent.id}}"},
{key = "hostname", template = "{{agent.id}}"},
{key = "host", template = "storage-{{agent.id}}"},
]
[[agents.measurements.fields]]
name = "gauge"
i64_range = [1, 8147240]
"#).unwrap();
let generated_tag_sets = GeneratedTagSets::from_spec(&spec).unwrap();
@ -127,19 +153,17 @@ pub fn agent_pre_generated(c: &mut Criterion) {
let ns_per_second = 1_000_000_000;
let end_datetime = Some(one_hour_s * ns_per_second);
let mut agent = Agent::<rand::rngs::SmallRng>::new(
let mut agents = Agent::from_spec(
&spec.agents[0],
"foo",
1,
seed,
vec![],
start_datetime,
end_datetime,
0,
false,
&generated_tag_sets,
).unwrap();
let expected_points = 180050;
)
.unwrap();
let agent = agents.first_mut().unwrap();
let expected_points = 30000;
let mut group = c.benchmark_group("agent_pre_generated");
group.measurement_time(std::time::Duration::from_secs(50));
@ -148,10 +172,8 @@ pub fn agent_pre_generated(c: &mut Criterion) {
group.bench_function("single agent with basic configuration", |b| {
b.iter(|| {
agent.reset_current_date_time(0);
let points_writer = points_writer.build_for_agent("foo").unwrap();
let r = block_on({
agent.generate_all(points_writer, 1)
});
let points_writer = points_writer.build_for_agent(1).unwrap();
let r = block_on(agent.generate_all(points_writer, 1));
let n_points = r.expect("Could not generate data");
assert_eq!(n_points, expected_points as usize);
})

View File

@ -1,18 +1,16 @@
# This config file aims to replicate the data produced by the capwrite tool:
# https://github.com/influxdata/idpe/tree/e493a8e9b6b773e9374a8542ddcab7d8174d320d/performance/capacity/write
name = "cap_write"
base_seed = "correct horse battery staple"
[[agents]]
name = "cap_write_{{agent_id}}"
count = 3
sampling_interval = "10s"
tag_pairs = [
{key = "host", template = "host-{{agent.id}}"}
]
[[agents.measurements]]
name = "system"
[[agents.measurements.tags]]
name = "host"
value = "host-{{agent_id}}"
[[agents.measurements.fields]]
name = "n_cpus"
@ -45,9 +43,6 @@ name = "system"
[[agents.measurements]]
name = "mem"
[[agents.measurements.tags]]
name = "host"
value = "host-{{agent_id}}"
[[agents.measurements.fields]]
name = "active"
@ -183,9 +178,6 @@ name = "mem"
[[agents.measurements]]
name = "disk"
[[agents.measurements.tags]]
name = "host"
value = "host-{{agent_id}}"
[[agents.measurements.fields]]
name = "free"
@ -217,9 +209,6 @@ name = "disk"
[[agents.measurements]]
name = "swap"
[[agents.measurements.tags]]
name = "host"
value = "host-{{agent_id}}"
[[agents.measurements.fields]]
name = "free"
@ -249,13 +238,7 @@ name = "swap"
[[agents.measurements]]
name = "cpu"
[[agents.measurements.tags]]
name = "host"
value = "host-{{agent_id}}"
[[agents.measurements.tags]]
name = "cpu"
value = "cpu-total"
tag_pairs = [{key = "cpu", template = "cpu-total"}]
[[agents.measurements.fields]]
name = "usage_user"
@ -295,9 +278,6 @@ name = "cpu"
[[agents.measurements]]
name = "processes"
[[agents.measurements.tags]]
name = "host"
value = "host-{{agent_id}}"
[[agents.measurements.fields]]
name = "blocked"
@ -349,9 +329,6 @@ name = "processes"
[[agents.measurements]]
name = "net"
[[agents.measurements.tags]]
name = "host"
value = "host-{{agent_id}}"
[[agents.measurements.fields]]
name = "bytes_recv"
@ -387,9 +364,6 @@ name = "net"
[[agents.measurements]]
name = "diskio"
[[agents.measurements.tags]]
name = "host"
value = "host-{{agent_id}}"
[[agents.measurements.fields]]
name = "reads"

View File

@ -0,0 +1,79 @@
name = "full_example"
# Values are automatically generated before the agents are intialized. They generate tag key/value pairs
# with the name of the value as the tag key and the evaluated template as the value. These pairs
# are Arc wrapped so they can be shared across tagsets and used in the agents as pre-generated data.
[[values]]
# the name must not have a . in it, which is used to access children later. Otherwise it's open.
name = "role"
# the template can use a number of helpers to get an id, a random string and the name, see below for examples
template = "storage"
# this number of tag pairs will be generated. If this is > 1, the id or a random character string should be
# used in the template to ensure that the tag key/value pairs are unique.
cardinality = 1
[[values]]
name = "url"
template = "http://127.0.0.1:6060/metrics/usage"
cardinality = 1
[[values]]
name = "org_id"
# Fill in the value with the cardinality counter and 15 random alphanumeric characters
template = "{{id}}_{{random 15}}"
cardinality = 20
has_one = ["env"]
[[values]]
name = "env"
template = "whatever-environment-{{id}}"
cardinality = 2
[[values]]
name = "bucket_id"
# a bucket belongs to an org. With this, you would be able to access the org.id or org.value in the template
belongs_to = "org_id"
# each bucket will have a unique id, which is used here to guarantee uniqueness even across orgs. We also
# have a random 15 character alphanumeric sequence to pad out the value length.
template = "{{id}}_{{random 15}}"
# For each org, 3 buckets will be generated
cardinality = 3
[[values]]
name = "partition_id"
template = "{{id}}"
cardinality = 10
# makes a tagset so every bucket appears in every partition. The other tags are descriptive and don't
# increase the cardiality beyond count(bucket) * count(partition). Later this example will use the
# agent and measurement generation to take this base tagset and increase cardinality on a per-agent basis.
[[tag_sets]]
name = "bucket_set"
for_each = [
"role",
"url",
"org_id",
"org_id.env",
"org_id.bucket_id",
"partition_id",
]
[[agents]]
# create this many agents
count = 1
sampling_interval = "10ms"
[[agents.measurements]]
name = "storage_usage_bucket_cardinality"
# each sampling will have all the tag sets from this collection in addition to the tags and tag_pairs specified
tag_set = "bucket_set"
# for each agent, this specific measurement will be decorated with these additional tags.
tag_pairs = [
{key = "node_id", template = "{{agent.id}}"},
{key = "hostname", template = "{{agent.id}}"},
{key = "host", template = "storage-{{agent.id}}"},
]
[[agents.measurements.fields]]
name = "gauge"
i64_range = [1, 8147240]

View File

@ -1,39 +0,0 @@
# Every feature demonstrated in this schema is fully supported in the current implementation.
# Other schemas may demonstrate future features.
# Every point generated by this schema will contain a tag `data_spec=[this_value]`.
name = "demo_schema"
# This seed can be any string and will be used to seed all random number generators. To change
# the randomness in the points generated by this schema, change this value to something else.
# To generate the same data in the same order as previous runs with this schema (except for any
# elements in this schema you have changed), keep this value the same.
base_seed = "this is a demo"
[[agents]]
name = "basic"
sampling_interval = "10s"
[[agents.measurements]]
name = "cpu"
[[agents.measurements.fields]]
name = "temp"
f64_range = [0.0, 100.0]
[[agents.measurements.fields]]
name = "location"
pattern = "{{city}}, {{country}}"
replacements = [
{replace = "city", with = ["San Jose", "San Antonio", "Santa Maria"]},
{replace = "country", with = ["United States", "Costa Rica", ["Argentina", 10]]},
]
[[agents.measurements.fields]]
name = "wave_height"
i64_range = [0, 10]
increment = true
reset_after = 20
[[agents.measurements.fields]]
name = "uptime"
uptime = "i64"

View File

@ -1,5 +1,4 @@
name = "storage_cardinality_example"
base_seed = "this is a demo"
# Values are automatically generated before the agents are intialized. They generate tag key/value pairs
# with the name of the value as the tag key and the evaluated template as the value. These pairs
@ -22,13 +21,13 @@ cardinality = 1
name = "org_id"
# Fill in the value with the cardinality counter and 15 random alphanumeric characters
template = "{{id}}_{{random 15}}"
cardinality = 2
cardinality = 100
has_one = ["env"]
[[values]]
name = "env"
template = "whatever-environment-{{id}}"
cardinality = 10
cardinality = 2
[[values]]
name = "bucket_id"
@ -60,9 +59,9 @@ for_each = [
]
[[agents]]
name = "metric-scraper"
# create this many agents
count = 3
count = 1
sampling_interval = "10s"
[[agents.measurements]]
name = "storage_usage_bucket_cardinality"

View File

@ -1,141 +0,0 @@
name = "demo_schema"
base_seed = "correct horse battery staple"
# the most basic spec with no auto generating of agents, measurements, tags or fields
[[agents]]
name = "demo"
sampling_interval = "10s"
[[agents.measurements]]
name = "some_measurement"
[[agents.measurements.tags]]
name = "foo"
value = "bar"
[[agents.measurements.fields]]
name = "field1"
# it's a boolean field, the true means to generate the boolean randomly with equal probability
bool = true
[[agents.measurements.fields]]
name = "field2"
# it's an i64 field, values will be generated using a pseudo random number generator
# with a set seed and values in the range [3, 200). Setting it to [3, 3] or [3, 4] will
# make the value always be 3
i64_range = [3, 200]
[[agents.measurements.fields]]
name = "field3"
# it's an i64 field, values will be generated using a pseudo random number generator
# with a set seed and values in the range in the range [1000, 5000)
i64_range = [1000, 5000]
# The value after each same will be incremented by the next random amount. This is
# useful when simulating a counter.
increment = true
[[agents.measurements.fields]]
name = "field4"
# it's an f64 field, values will be generated using a pseudo random number generator
# with a set seed with values in the range [0.0, 100.0). Setting both values to the same
# number will make every value that number.
f64_range = [0.0, 100.0]
[[agents.measurements.fields]]
name = "field5"
# this is a string field. Parts of the string will be replaced. {{agent_name}} will be replaced
# with the name of the agent, {{random 200}} will be replaced with a random alphanumeric string
# of the length specified. {{format-time "%Y-%m-%d %H:%M"}} will be replaced with the time for
# this line in the simulation (that is, the same value that this line will have for its
# timestamp) formatted using a strftime specifier. Other patterns will be looked for based on
# the keys in replacements.
pattern = "{{agent_name}} foo {{level}} {{format-time \"%Y-%m-%d %H:%M\"}} {{random 200}}"
# each key in string replacements will be replaced in the pattern with a value randomly
# selected from the array of strings. Specify a weight as an integer greater than 1 to change
# the probability that a given string will be selected.
replacements = [
{replace = "color", with = ["red", "blue", "green"]},
{replace = "level", with = [
["info", 800],
["warn", 195],
["error", 5]
]}
]
[[agents]]
name = "some-server-{{agent_id}}"
count = 10
sampling_interval = "22s"
# Optional: every measurement (row) this agent produces will include a tag with the agent_id filled
# in:
# agent_name=some-server-{{agent_id}}
name_tag_key = "agent_name"
# Optional: these values will be rotated through so that each agent that gets created will have one.
# e.g: the first agent will always inject region=us-west and secnod will be region=us-east, etc.
tags = [
{key = "region", values = ["us-west", "us-east", "dublin", "frankfurt"]},
{key = "foo", values = ["bar", "asdf"]},
]
[[agents.measurements]]
name = "few-tags-measurement-{{measurement_id}}"
count = 20
[[agents.measurements.tags]]
# {{measurement_id}} will be replaced with the id of the measurement this tag is for
name = "tag-1-{{measurement_id}}"
value = "value-1"
[[agents.measurements.tags]]
name = "tag-2"
# {{cardinality}} will be replaced with the cardinality counter
value = "value-{{cardinality}}"
# Optional: This means each collection on this agent will have 4 rows of this measurement with
# unique values for this tag. This could be for things like org_id as a tag or for
# something like cpu measurements in Telegraf where you have a separate line for each cpu:
# cpu,cpu=cpu-total,host=foo usage_user=23.2,usage_system=33.3
# cpu,cpu=cpu-0,host=foo usage_user=22.2,usage_system=34.5
# cpu,cpu=cpu-1,host=foo usage_user=11.2,usage_system=56.5
cardinality = 4
[[agents.measurements.tags]]
name = "tag-3"
# {{counter}} will be replaced with the increment counter
value = "value-{{counter}}"
# Optional: This means that {{counter}} will increase by 1 after every 10 samples that are
# pulled.
# This option simulates temporal tag values like process IDs or container IDs in tags
increment_every = 10
[[agents.measurements.tags]]
name = "tag-4"
# {{counter}} will be replaced with the increment counter and {{cardinality}} will be replaced
# with the cardinality counter
value = "value-{{counter}}-{{cardinality}}"
# Optional: This means that {{counter}} will increment by 1 after every 100 samples that are
# pulled.
# This option simulates temporal tag values like process IDs or container IDs in tags
increment_every = 100
# when paired with cardinality, this can simulate having many containers running on a single
# host
cardinality = 10
[[agents.measurements.fields]]
name = "field-2"
bool = true
# This example shows generating 10 different measurements that each have their own set of
# tags (10 of them) and each have their own set of fields (4 of them)
[[agents.measurements]]
name = "mid-tags-measurement-{{measurement_id}}"
count = 10
[[agents.measurements.tags]]
name = "tag-{{tag_id}}-{{measurement_id}}"
count = 10
value = "value-{{cardinality}}"
cardinality = 3
[[agents.measurements.fields]]
name = "field-1"
bool = true

View File

@ -1,52 +1,32 @@
name = "tracing_schema"
base_seed = "this is a demo"
[[values]]
name = "host"
template = "server-{{id}}"
cardinality = 3000
has_one = ["service"]
[[values]]
name = "service"
template = "service-{{id}}"
cardinality = 10
[[tag_sets]]
name = "host_services"
for_each = ["host", "host.service"]
[[agents]]
name = "trace-sender"
sampling_interval = "10s"
count = 1
sampling_interval = "1s"
[[agents.measurements]]
name = "traces"
[[agents.measurements.tags]]
name = "trace_id"
value = "{{guid}}"
[[agents.measurements.tags]]
name = "span_id"
value = "{{guid}}"
cardinality = 10
[[agents.measurements.tags]]
name = "host"
value = "{{host}}"
replacements = [
{replace = "host", with = ["serverA", "serverB", "serverC", "serverD"]},
]
resample_every_line = true
[[agents.measurements.tags]]
name = "region"
value = "{{region}}"
replacements = [
{replace = "region", with = ["us-west", "us-east"]},
]
resample_every_line = false
[[agents.measurements.tags]]
name = "service"
value = "{{service}}"
replacements = [
{replace = "service", with = ["nginx", "istio", "storage", "gateway", "redis", "mysql", "s3"]},
]
resample_every_line = true
tag_set = "host_services"
tag_pairs = [
{key = "trace_id", template = "{{guid}}", regenerate_after_lines = 10},
{key = "span_id", template = "{{guid}}", regenerate_after_lines = 1},
]
[[agents.measurements.fields]]
name = "timing"
f64_range = [0.0, 500.0]
[[agents.measurements.fields]]
name = "depth"
i64_range = [0, 3]
increment = true
reset_after = 10

View File

@ -1,14 +1,17 @@
//! Agents responsible for generating points
use crate::{
measurement::MeasurementGeneratorSet, now_ns, specification, tag::Tag, write::PointsWriter,
DataGenRng, RandomNumberGenerator,
measurement::{MeasurementGenerator, MeasurementLineIterator},
now_ns, specification,
tag_pair::TagPair,
write::PointsWriter,
};
use crate::tag_set::GeneratedTagSets;
use influxdb2_client::models::DataPoint;
use humantime::parse_duration;
use serde_json::json;
use snafu::{ResultExt, Snafu};
use std::{fmt, time::Duration};
use std::time::{Duration, Instant};
use tracing::{debug, info};
/// Agent-specific Results
@ -16,46 +19,44 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Errors that may happen while creating points
#[derive(Snafu, Debug)]
#[allow(missing_docs)]
pub enum Error {
/// Error that may happen when generating points from measurements
#[snafu(display("{}", source))]
CouldNotGeneratePoint {
/// Underlying `measurement` module error that caused this problem
source: crate::measurement::Error,
},
/// Error that may happen when creating measurement generator sets
#[snafu(display("Could not create measurement generator sets, caused by:\n{}", source))]
CouldNotCreateMeasurementGeneratorSets {
#[snafu(display("Could not create measurement generators, caused by:\n{}", source))]
CouldNotCreateMeasurementGenerators {
/// Underlying `measurement` module error that caused this problem
source: crate::measurement::Error,
},
/// Error that may happen when writing points
#[snafu(display("Could not write points, caused by:\n{}", source))]
CouldNotWritePoints {
/// Underlying `write` module error that caused this problem
source: crate::write::Error,
},
/// Error that may happen if the provided sampling interval string can't be parsed into a duration
#[snafu(display("Sampling interval must be valid string: {}", source))]
InvalidSamplingInterval {
/// Underlying `parse` error
source: humantime::DurationError,
},
#[snafu(display("Error creating agent tag pairs: {}", source))]
CouldNotCreateAgentTagPairs { source: crate::tag_pair::Error },
}
/// Each `AgentSpec` informs the instantiation of an `Agent`, which coordinates
/// the generation of the measurements in their specification.
#[derive(Debug)]
pub struct Agent<T: DataGenRng> {
agent_id: usize,
name: String,
pub struct Agent {
/// identifier for the agent. This can be used in generated tags and fields
pub id: usize,
#[allow(dead_code)]
rng: RandomNumberGenerator<T>,
agent_tags: Vec<Tag>,
measurement_generator_sets: Vec<MeasurementGeneratorSet<T>>,
measurement_generators: Vec<MeasurementGenerator>,
sampling_interval: Option<Duration>,
/// nanoseconds since the epoch, used as the timestamp for the next
/// generated point
@ -72,67 +73,67 @@ pub struct Agent<T: DataGenRng> {
interval: Option<tokio::time::Interval>,
}
impl<T: DataGenRng> Agent<T> {
/// Create a new agent that will generate data points according to these
/// specs. Substitutions in `name` and `agent_tags` should be made
/// before using them to instantiate an agent.
impl Agent {
/// Create a angents that will generate data points according to these
/// specs.
#[allow(clippy::too_many_arguments)]
pub fn new(
pub fn from_spec(
agent_spec: &specification::AgentSpec,
agent_name: impl Into<String>,
agent_id: usize,
parent_seed: impl fmt::Display,
agent_tags: Vec<Tag>,
start_datetime: Option<i64>, // in nanoseconds since the epoch, defaults to now
end_datetime: Option<i64>, // also in nanoseconds since the epoch, defaults to now
execution_start_time: i64,
continue_on: bool, // If true, run in "continue" mode after historical data is generated
generated_tag_sets: &GeneratedTagSets,
) -> Result<Self> {
let name = agent_name.into();
// Will agents actually need rngs? Might just need seeds...
let seed = format!("{}-{}", parent_seed, name);
let rng = RandomNumberGenerator::<T>::new(&seed);
) -> Result<Vec<Self>> {
let agent_count = agent_spec.count.unwrap_or(1);
let measurement_generator_sets = agent_spec
.measurements
.iter()
.map(|spec| {
MeasurementGeneratorSet::new(
&name,
agent_id,
spec,
&seed,
&agent_tags,
execution_start_time,
generated_tag_sets,
)
let agents: Vec<_> = (0..agent_count)
.into_iter()
.map(|agent_id| {
let data = json!({"agent": {"id": agent_id}});
let agent_tag_pairs = TagPair::pairs_from_specs(&agent_spec.tag_pairs, data)
.context(CouldNotCreateAgentTagPairs)?;
let measurement_generators = agent_spec
.measurements
.iter()
.map(|spec| {
MeasurementGenerator::from_spec(
agent_id,
spec,
execution_start_time,
generated_tag_sets,
&agent_tag_pairs,
)
.context(CouldNotCreateMeasurementGenerators)
})
.collect::<Result<Vec<_>>>()?;
let measurement_generators = measurement_generators.into_iter().flatten().collect();
let current_datetime = start_datetime.unwrap_or_else(now_ns);
let end_datetime = end_datetime.unwrap_or_else(now_ns);
// Convert to nanoseconds
let sampling_interval = match &agent_spec.sampling_interval {
None => None,
Some(s) => Some(parse_duration(s).context(InvalidSamplingInterval)?),
};
Ok(Self {
id: agent_id,
measurement_generators,
sampling_interval,
current_datetime,
end_datetime,
continue_on,
finished: false,
interval: None,
})
})
.collect::<crate::measurement::Result<_>>()
.context(CouldNotCreateMeasurementGeneratorSets)?;
.collect::<Result<Vec<_>>>()?;
let current_datetime = start_datetime.unwrap_or_else(now_ns);
let end_datetime = end_datetime.unwrap_or_else(now_ns);
// Convert to nanoseconds
let sampling_interval = match &agent_spec.sampling_interval {
None => None,
Some(s) => Some(humantime::parse_duration(s).context(InvalidSamplingInterval)?),
};
Ok(Self {
agent_id,
name,
rng,
agent_tags,
measurement_generator_sets,
sampling_interval,
current_datetime,
end_datetime,
continue_on,
finished: false,
interval: None,
})
Ok(agents)
}
/// Generate and write points in batches until `generate` doesn't return any
@ -143,32 +144,55 @@ impl<T: DataGenRng> Agent<T> {
mut points_writer: PointsWriter,
batch_size: usize,
) -> Result<usize> {
let mut points_this_batch = 1;
let mut total_points = 0;
let start = Instant::now();
let mut points = self.generate().await?;
while !points.is_empty() {
while points_this_batch != 0 {
let batch_start = Instant::now();
points_this_batch = 0;
let mut streams: Vec<_> = Vec::with_capacity(batch_size);
for _ in 0..batch_size {
points.append(&mut self.generate().await?);
let mut s = self.generate().await?;
if s.is_empty() {
break;
}
streams.append(&mut s);
}
info!("[agent {}] sending {} points", self.name, points.len());
total_points += points.len();
for s in &streams {
points_this_batch += s.line_count();
total_points += s.line_count();
}
points_writer
.write_points(points)
.write_points(streams.into_iter().flatten())
.await
.context(CouldNotWritePoints)?;
points = self.generate().await?;
info!("wrote {} in {:?}", points_this_batch, batch_start.elapsed());
let secs = start.elapsed().as_secs();
if secs != 0 {
info!(
"written {} in {:?} for {}/sec",
total_points,
start.elapsed(),
total_points / secs as usize
)
}
}
Ok(total_points)
}
/// Generate data points from the configuration in this agent, one point per
/// measurement contained in this agent's configuration.
pub async fn generate(&mut self) -> Result<Vec<DataPoint>> {
let mut points = Vec::new();
/// Generate data points from the configuration in this agent.
pub async fn generate(&mut self) -> Result<Vec<MeasurementLineIterator>> {
let mut measurement_streams = Vec::new();
debug!(
"[agent {}] generate more? {} current: {}, end: {}",
self.name, self.finished, self.current_datetime, self.end_datetime
self.id, self.finished, self.current_datetime, self.end_datetime
);
if !self.finished {
@ -197,17 +221,15 @@ impl<T: DataGenRng> Agent<T> {
self.finished = true;
}
for mgs in &mut self.measurement_generator_sets {
for point in mgs
.generate(point_timestamp)
.context(CouldNotGeneratePoint)?
{
points.push(point);
}
for mgs in &mut self.measurement_generators {
measurement_streams.push(
mgs.generate(point_timestamp)
.context(CouldNotGeneratePoint)?,
);
}
}
Ok(points)
Ok(measurement_streams)
}
/// Sets the current date and time for the agent and resets its finished state to false. Enables
@ -221,13 +243,14 @@ impl<T: DataGenRng> Agent<T> {
#[cfg(test)]
mod test {
use super::*;
use crate::{now_ns, specification::*, ZeroRng};
use crate::measurement::LineToGenerate;
use crate::{now_ns, specification::*};
use influxdb2_client::models::WriteDataPoint;
type Error = Box<dyn std::error::Error>;
type Result<T = (), E = Error> = std::result::Result<T, E>;
impl<T: DataGenRng> Agent<T> {
impl Agent {
/// Instantiate an agent only with the parameters we're interested in
/// testing, keeping everything else constant across different
/// tests.
@ -238,11 +261,10 @@ mod test {
end_datetime: i64,
) -> Self {
let measurement_spec = MeasurementSpec {
name: "measurement-{{agent_id}}-{{measurement_id}}".into(),
name: "measurement-{{agent.id}}-{{measurement.id}}".into(),
count: Some(2),
tags: vec![],
fields: vec![FieldSpec {
name: "field-{{agent_id}}-{{measurement_id}}-{{field_id}}".into(),
name: "field-{{agent.id}}-{{measurement.id}}-{{field.id}}".into(),
field_value_spec: FieldValueSpec::I64 {
range: 0..60,
increment: false,
@ -256,23 +278,17 @@ mod test {
let generated_tag_sets = GeneratedTagSets::default();
let measurement_generator_set = MeasurementGeneratorSet::new(
"test",
42,
let measurement_generators = MeasurementGenerator::from_spec(
1,
&measurement_spec,
"spec-test",
&[],
0,
current_datetime,
&generated_tag_sets,
&[],
)
.unwrap();
Self {
agent_id: 0,
name: String::from("test"),
rng: RandomNumberGenerator::<T>::new("spec-test"),
agent_tags: vec![],
measurement_generator_sets: vec![measurement_generator_set],
id: 0,
finished: false,
interval: None,
@ -280,11 +296,12 @@ mod test {
current_datetime,
end_datetime,
continue_on,
measurement_generators,
}
}
}
fn timestamps(points: &[influxdb2_client::models::DataPoint]) -> Result<Vec<i64>> {
fn timestamps(points: &[LineToGenerate]) -> Result<Vec<i64>> {
points
.iter()
.map(|point| {
@ -321,12 +338,13 @@ mod test {
#[tokio::test]
async fn current_time_less_than_end_time() -> Result<()> {
let mut agent = Agent::<ZeroRng>::test_instance(None, false, 0, 10);
let mut agent = Agent::test_instance(None, false, 0, 10);
let points = agent.generate().await?;
assert_eq!(points.len(), 2);
let points = agent.generate().await?.into_iter().flatten();
assert_eq!(points.count(), 2);
let points = agent.generate().await?;
let points = agent.generate().await?.into_iter().flatten();
let points: Vec<_> = points.collect();
assert!(points.is_empty(), "expected no points, got {:?}", points);
Ok(())
@ -334,12 +352,13 @@ mod test {
#[tokio::test]
async fn current_time_equal_end_time() -> Result<()> {
let mut agent = Agent::<ZeroRng>::test_instance(None, false, 10, 10);
let mut agent = Agent::test_instance(None, false, 10, 10);
let points = agent.generate().await?;
assert_eq!(points.len(), 2);
let points = agent.generate().await?.into_iter().flatten();
assert_eq!(points.count(), 2);
let points = agent.generate().await?;
let points = agent.generate().await?.into_iter().flatten();
let points: Vec<_> = points.collect();
assert!(points.is_empty(), "expected no points, got {:?}", points);
Ok(())
@ -347,12 +366,13 @@ mod test {
#[tokio::test]
async fn current_time_greater_than_end_time() -> Result<()> {
let mut agent = Agent::<ZeroRng>::test_instance(None, false, 11, 10);
let mut agent = Agent::test_instance(None, false, 11, 10);
let points = agent.generate().await?;
assert_eq!(points.len(), 2);
let points = agent.generate().await?.into_iter().flatten();
assert_eq!(points.count(), 2);
let points = agent.generate().await?;
let points = agent.generate().await?.into_iter().flatten();
let points: Vec<_> = points.collect();
assert!(points.is_empty(), "expected no points, got {:?}", points);
Ok(())
@ -364,12 +384,13 @@ mod test {
#[tokio::test]
async fn current_time_less_than_end_time() -> Result<()> {
let mut agent = Agent::<ZeroRng>::test_instance(None, true, 0, 10);
let mut agent = Agent::test_instance(None, true, 0, 10);
let points = agent.generate().await?;
assert_eq!(points.len(), 2);
let points = agent.generate().await?.into_iter().flatten();
assert_eq!(points.count(), 2);
let points = agent.generate().await?;
let points = agent.generate().await?.into_iter().flatten();
let points: Vec<_> = points.collect();
assert!(points.is_empty(), "expected no points, got {:?}", points);
Ok(())
@ -377,12 +398,13 @@ mod test {
#[tokio::test]
async fn current_time_equal_end_time() -> Result<()> {
let mut agent = Agent::<ZeroRng>::test_instance(None, true, 10, 10);
let mut agent = Agent::test_instance(None, true, 10, 10);
let points = agent.generate().await?;
assert_eq!(points.len(), 2);
let points = agent.generate().await?.into_iter().flatten();
assert_eq!(points.count(), 2);
let points = agent.generate().await?;
let points = agent.generate().await?.into_iter().flatten();
let points: Vec<_> = points.collect();
assert!(points.is_empty(), "expected no points, got {:?}", points);
Ok(())
@ -390,12 +412,13 @@ mod test {
#[tokio::test]
async fn current_time_greater_than_end_time() -> Result<()> {
let mut agent = Agent::<ZeroRng>::test_instance(None, true, 11, 10);
let mut agent = Agent::test_instance(None, true, 11, 10);
let points = agent.generate().await?;
assert_eq!(points.len(), 2);
let points = agent.generate().await?.into_iter().flatten();
assert_eq!(points.count(), 2);
let points = agent.generate().await?;
let points = agent.generate().await?.into_iter().flatten();
let points: Vec<_> = points.collect();
assert!(points.is_empty(), "expected no points, got {:?}", points);
Ok(())
@ -433,15 +456,16 @@ mod test {
let end = TEST_SAMPLING_INTERVAL.as_nanos() as i64;
let mut agent =
Agent::<ZeroRng>::test_instance(Some(TEST_SAMPLING_INTERVAL), false, current, end);
Agent::test_instance(Some(TEST_SAMPLING_INTERVAL), false, current, end);
let points = agent.generate().await?;
assert_eq!(points.len(), 2);
let points = agent.generate().await?.into_iter().flatten();
assert_eq!(points.count(), 2);
let points = agent.generate().await?;
assert_eq!(points.len(), 2);
let points = agent.generate().await?.into_iter().flatten();
assert_eq!(points.count(), 2);
let points = agent.generate().await?;
let points = agent.generate().await?.into_iter().flatten();
let points: Vec<_> = points.collect();
assert!(points.is_empty(), "expected no points, got {:?}", points);
Ok(())
@ -453,12 +477,13 @@ mod test {
let end = current;
let mut agent =
Agent::<ZeroRng>::test_instance(Some(TEST_SAMPLING_INTERVAL), false, current, end);
Agent::test_instance(Some(TEST_SAMPLING_INTERVAL), false, current, end);
let points = agent.generate().await?;
assert_eq!(points.len(), 2);
let points = agent.generate().await?.into_iter().flatten();
assert_eq!(points.count(), 2);
let points = agent.generate().await?;
let points = agent.generate().await?.into_iter().flatten();
let points: Vec<_> = points.collect();
assert!(points.is_empty(), "expected no points, got {:?}", points);
Ok(())
@ -470,12 +495,13 @@ mod test {
let end = TEST_SAMPLING_INTERVAL.as_nanos() as i64;
let mut agent =
Agent::<ZeroRng>::test_instance(Some(TEST_SAMPLING_INTERVAL), false, current, end);
Agent::test_instance(Some(TEST_SAMPLING_INTERVAL), false, current, end);
let points = agent.generate().await?;
assert_eq!(points.len(), 2);
let points = agent.generate().await?.into_iter().flatten();
assert_eq!(points.count(), 2);
let points = agent.generate().await?;
let points = agent.generate().await?.into_iter().flatten();
let points: Vec<_> = points.collect();
assert!(points.is_empty(), "expected no points, got {:?}", points);
Ok(())
@ -505,15 +531,17 @@ mod test {
let current = end - TEST_SAMPLING_INTERVAL.as_nanos() as i64;
let mut agent =
Agent::<ZeroRng>::test_instance(Some(TEST_SAMPLING_INTERVAL), true, current, end);
Agent::test_instance(Some(TEST_SAMPLING_INTERVAL), true, current, end);
let points = agent.generate().await?;
let points = agent.generate().await?.into_iter().flatten();
let points: Vec<_> = points.collect();
assert_eq!(points.len(), 2);
let times = timestamps(&points).unwrap();
assert_eq!(vec![current, current], times);
let points = agent.generate().await?;
let points = agent.generate().await?.into_iter().flatten();
let points: Vec<_> = points.collect();
assert_eq!(points.len(), 2);
let times = timestamps(&points).unwrap();
@ -528,15 +556,17 @@ mod test {
let current = end;
let mut agent =
Agent::<ZeroRng>::test_instance(Some(TEST_SAMPLING_INTERVAL), true, current, end);
Agent::test_instance(Some(TEST_SAMPLING_INTERVAL), true, current, end);
let points = agent.generate().await?;
let points = agent.generate().await?.into_iter().flatten();
let points: Vec<_> = points.collect();
assert_eq!(points.len(), 2);
let times = timestamps(&points).unwrap();
assert_eq!(vec![end, end], times);
let points = agent.generate().await?;
let points = agent.generate().await?.into_iter().flatten();
let points: Vec<_> = points.collect();
assert_eq!(points.len(), 2);
let real_now = now_ns();
@ -562,15 +592,17 @@ mod test {
let current = end + TEST_SAMPLING_INTERVAL.as_nanos() as i64;
let mut agent =
Agent::<ZeroRng>::test_instance(Some(TEST_SAMPLING_INTERVAL), true, current, end);
Agent::test_instance(Some(TEST_SAMPLING_INTERVAL), true, current, end);
let points = agent.generate().await?;
let points = agent.generate().await?.into_iter().flatten();
let points: Vec<_> = points.collect();
assert_eq!(points.len(), 2);
let times = timestamps(&points).unwrap();
assert_eq!(vec![current, current], times);
let points = agent.generate().await?;
let points = agent.generate().await?.into_iter().flatten();
let points: Vec<_> = points.collect();
assert_eq!(points.len(), 2);
let real_now = now_ns();

View File

@ -200,7 +200,7 @@ Logging:
panic!("One of --print or --output or --host must be provided.");
};
let result = iox_data_generator::generate::<rand::rngs::SmallRng>(
let result = iox_data_generator::generate(
&data_spec,
&mut points_writer_builder,
start_datetime,

View File

@ -2,153 +2,165 @@
use crate::{
now_ns, specification,
substitution::{pick_from_replacements, Substitute},
DataGenRng, RandomNumberGenerator,
substitution::{self, pick_from_replacements},
};
use influxdb2_client::models::FieldValue;
use handlebars::Handlebars;
use rand::rngs::SmallRng;
use rand::Rng;
use serde::Serialize;
use serde_json::json;
use serde_json::Value;
use snafu::{ResultExt, Snafu};
use std::{collections::BTreeMap, fmt, ops::Range, time::Duration};
use std::{ops::Range, time::Duration};
/// Field-specific Results
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Errors that may happen while creating fields
#[derive(Snafu, Debug)]
#[allow(missing_docs)]
pub enum Error {
/// Error that may happen when substituting placeholder values
#[snafu(display("Could not create field name, caused by:\n{}", source))]
CouldNotCreateFieldName {
/// Underlying `substitution` module error that caused this problem
source: crate::substitution::Error,
},
CouldNotCreateFieldName { source: crate::substitution::Error },
/// Error that may happen when substituting placeholder values
#[snafu(display("Could not compile field name template, caused by:\n{}", source))]
CouldNotCompileStringTemplate {
/// Underlying `substitution` module error that caused this problem
source: crate::substitution::Error,
},
#[snafu(display("Could not compile string field template: {}", source))]
CouldNotCompileStringTemplate { source: handlebars::TemplateError },
#[snafu(display("Could not compile string field template: {}", source))]
CouldNotRenderStringTemplate { source: handlebars::RenderError },
}
/// A generated field value that will be used in a generated data point.
#[derive(Debug, PartialEq)]
pub struct Field {
/// The key for the field
pub key: String,
/// The value for the field
pub value: FieldValue,
/// Different field type generators
#[derive(Debug)]
pub enum FieldGeneratorImpl {
/// Boolean field generator
Bool(BooleanFieldGenerator),
/// Integer field generator
I64(I64FieldGenerator),
/// Float field generator
F64(F64FieldGenerator),
/// String field generator
String(Box<StringFieldGenerator>),
/// Uptime field generator
Uptime(UptimeFieldGenerator),
}
impl Field {
/// Create a new field with the given key and value.
pub fn new(key: impl Into<String>, value: impl Into<FieldValue>) -> Self {
Self {
key: key.into(),
value: value.into(),
impl FieldGeneratorImpl {
/// Create a new generator based on a field spec
pub fn new(
spec: &specification::FieldSpec,
data: Value,
rng: SmallRng,
execution_start_time: i64,
) -> Result<Self> {
use specification::FieldValueSpec::*;
let field_name = substitution::render_once("field", &spec.name, &data)
.context(CouldNotCreateFieldName)?;
Ok(match &spec.field_value_spec {
Bool(true) => Self::Bool(BooleanFieldGenerator::new(&field_name, rng)),
Bool(false) => unimplemented!("Not sure what false means for bool fields yet"),
I64 {
range,
increment,
reset_after,
} => Self::I64(I64FieldGenerator::new(
&field_name,
range,
*increment,
*reset_after,
rng,
)),
F64 { range } => Self::F64(F64FieldGenerator::new(&field_name, range, rng)),
String {
pattern,
replacements,
} => Self::String(Box::new(StringFieldGenerator::new(
&field_name,
pattern,
data,
replacements.to_vec(),
rng,
)?)),
Uptime { kind } => Self::Uptime(UptimeFieldGenerator::new(
&field_name,
kind,
execution_start_time,
)),
})
}
/// Writes the field in line protocol to the passed writer
pub fn write_to<W: std::io::Write>(&mut self, mut w: W, timestamp: i64) -> std::io::Result<()> {
match self {
Self::Bool(f) => {
let v: bool = f.rng.gen();
write!(w, "{}={}", f.name, v)
}
Self::I64(f) => {
let v = f.generate_value();
write!(w, "{}={}", f.name, v)
}
Self::F64(f) => {
let v = f.generate_value();
write!(w, "{}={}", f.name, v)
}
Self::String(f) => {
let v = f.generate_value(timestamp);
write!(w, "{}=\"{}\"", f.name, v)
}
Self::Uptime(f) => match f.kind {
specification::UptimeKind::I64 => {
let v = f.generate_value();
write!(w, "{}={}", f.name, v)
}
specification::UptimeKind::Telegraf => {
let v = f.generate_value_as_string();
write!(w, "{}=\"{}\"", f.name, v)
}
},
}
}
}
/// A set of `count` fields that have the same configuration but different
/// `field_id`s.
pub struct FieldGeneratorSet {
field_generators: Vec<Box<dyn FieldGenerator + Send>>,
}
// field_generators doesn't implement Debug
impl fmt::Debug for FieldGeneratorSet {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("FieldGeneratorSet")
.field("field_generators", &"(dynamic)")
.finish()
}
}
impl FieldGeneratorSet {
/// Create a new set of field generators for a particular agent,
/// measurement, and field specification.
pub fn new<T: DataGenRng>(
agent_name: &str,
agent_id: usize,
measurement_id: usize,
spec: &specification::FieldSpec,
parent_seed: &str,
execution_start_time: i64,
) -> Result<Self> {
let count = spec.count.unwrap_or(1);
let field_generators = (0..count)
.map(|field_id| {
field_spec_to_generator::<T>(
agent_name,
agent_id,
measurement_id,
field_id,
spec,
parent_seed,
execution_start_time,
)
})
.collect::<Result<_>>()?;
Ok(Self { field_generators })
}
/// Create one set of fields
pub fn generate(&mut self, timestamp: i64) -> Vec<Field> {
self.field_generators
.iter_mut()
.map(|fg| fg.generate(timestamp))
.collect()
}
}
trait FieldGenerator {
fn generate(&mut self, timestamp: i64) -> Field;
}
/// Generate boolean field names and values.
#[derive(Debug)]
pub struct BooleanFieldGenerator<T: DataGenRng> {
name: String,
rng: RandomNumberGenerator<T>,
pub struct BooleanFieldGenerator {
/// The name (key) of the field
pub name: String,
rng: SmallRng,
}
impl<T: DataGenRng> BooleanFieldGenerator<T> {
impl BooleanFieldGenerator {
/// Create a new boolean field generator that will always use the specified
/// name.
pub fn new(name: &str, parent_seed: &str) -> Self {
pub fn new(name: &str, rng: SmallRng) -> Self {
let name = name.into();
let seed = format!("{}-{}", parent_seed, name);
let rng = RandomNumberGenerator::<T>::new(seed);
Self { name, rng }
}
}
impl<T: DataGenRng> FieldGenerator for BooleanFieldGenerator<T> {
fn generate(&mut self, _timestamp: i64) -> Field {
let b: bool = self.rng.gen();
Field::new(&self.name, b)
/// Generate a random value
pub fn generate_value(&mut self) -> bool {
self.rng.gen()
}
}
/// Generate integer field names and values.
#[derive(Debug)]
pub struct I64FieldGenerator<T: DataGenRng> {
name: String,
pub struct I64FieldGenerator {
/// The name (key) of the field
pub name: String,
range: Range<i64>,
increment: bool,
rng: RandomNumberGenerator<T>,
rng: SmallRng,
previous_value: i64,
reset_after: Option<usize>,
current_tick: usize,
}
impl<T: DataGenRng> I64FieldGenerator<T> {
impl I64FieldGenerator {
/// Create a new integer field generator that will always use the specified
/// name.
pub fn new(
@ -156,14 +168,10 @@ impl<T: DataGenRng> I64FieldGenerator<T> {
range: &Range<i64>,
increment: bool,
reset_after: Option<usize>,
parent_seed: impl fmt::Display,
rng: SmallRng,
) -> Self {
let name = name.into();
let seed = format!("{}-{}", parent_seed, name);
let rng = RandomNumberGenerator::<T>::new(seed);
Self {
name,
name: name.into(),
range: range.to_owned(),
increment,
rng,
@ -172,10 +180,9 @@ impl<T: DataGenRng> I64FieldGenerator<T> {
current_tick: 0,
}
}
}
impl<T: DataGenRng> FieldGenerator for I64FieldGenerator<T> {
fn generate(&mut self, _timestamp: i64) -> Field {
/// Generate a random value
pub fn generate_value(&mut self) -> i64 {
let mut value = if self.range.start == self.range.end {
self.range.start
} else {
@ -195,107 +202,91 @@ impl<T: DataGenRng> FieldGenerator for I64FieldGenerator<T> {
}
}
Field::new(&self.name, value)
value
}
}
/// Generate floating point field names and values.
#[derive(Debug)]
pub struct F64FieldGenerator<T: DataGenRng> {
name: String,
pub struct F64FieldGenerator {
/// The name (key) of the field
pub name: String,
range: Range<f64>,
rng: RandomNumberGenerator<T>,
rng: SmallRng,
}
impl<T: DataGenRng> F64FieldGenerator<T> {
impl F64FieldGenerator {
/// Create a new floating point field generator that will always use the
/// specified name.
pub fn new(
name: impl Into<String>,
range: &Range<f64>,
parent_seed: impl fmt::Display,
) -> Self {
let name = name.into();
let seed = format!("{}-{}", parent_seed, name);
let rng = RandomNumberGenerator::<T>::new(seed);
pub fn new(name: impl Into<String>, range: &Range<f64>, rng: SmallRng) -> Self {
Self {
name,
name: name.into(),
range: range.to_owned(),
rng,
}
}
}
impl<T: DataGenRng> FieldGenerator for F64FieldGenerator<T> {
fn generate(&mut self, _timestamp: i64) -> Field {
let value = if (self.range.start - self.range.end).abs() < f64::EPSILON {
/// Generate a random value
pub fn generate_value(&mut self) -> f64 {
if (self.range.start - self.range.end).abs() < f64::EPSILON {
self.range.start
} else {
self.rng.gen_range(self.range.clone())
};
Field::new(&self.name, value)
}
}
}
/// Generate string field names and values.
#[derive(Debug)]
pub struct StringFieldGenerator<T: DataGenRng> {
agent_name: String,
name: String,
substitute: Substitute,
rng: RandomNumberGenerator<T>,
pub struct StringFieldGenerator {
/// The name (key) of the field
pub name: String,
rng: SmallRng,
replacements: Vec<specification::Replacement>,
handlebars: Handlebars<'static>,
data: Value,
}
impl<T: DataGenRng> StringFieldGenerator<T> {
impl StringFieldGenerator {
/// Create a new string field generator
pub fn new(
agent_name: impl Into<String>,
name: impl Into<String>,
pattern: impl Into<String>,
parent_seed: impl fmt::Display,
template: impl Into<String>,
data: Value,
replacements: Vec<specification::Replacement>,
rng: SmallRng,
) -> Result<Self> {
let name = name.into();
let seed = format!("{}-{}", parent_seed, name);
let rng = RandomNumberGenerator::<T>::new(seed);
let substitute = Substitute::new(pattern, RandomNumberGenerator::<T>::new(&rng.seed))
.context(CouldNotCompileStringTemplate {})?;
let mut registry = substitution::new_handlebars_registry();
registry
.register_template_string(&name, template.into())
.context(CouldNotCompileStringTemplate)?;
Ok(Self {
agent_name: agent_name.into(),
name,
substitute,
rng,
replacements,
handlebars: registry,
data,
})
}
}
impl<T: DataGenRng> FieldGenerator for StringFieldGenerator<T> {
fn generate(&mut self, timestamp: i64) -> Field {
#[derive(Serialize)]
struct Values<'a> {
#[serde(flatten)]
replacements: BTreeMap<&'a str, &'a str>,
agent_name: &'a str,
timestamp: i64,
/// Generate a random value
pub fn generate_value(&mut self, timestamp: i64) -> String {
let replacements = pick_from_replacements(&mut self.rng, &self.replacements);
let d = self.data.as_object_mut().expect("data must be object");
if replacements.is_empty() {
d.remove("replacements");
} else {
d.insert("replacements".to_string(), json!(replacements));
}
let values = Values {
replacements: pick_from_replacements(&mut self.rng, &self.replacements),
agent_name: &self.agent_name,
timestamp,
};
d.insert("timestamp".to_string(), json!(timestamp));
let value = self
.substitute
.evaluate(&values)
.expect("Unable to substitute string field value");
Field::new(&self.name, value)
self.handlebars
.render(&self.name, &self.data)
.expect("Unable to substitute string field value")
}
}
@ -303,9 +294,11 @@ impl<T: DataGenRng> FieldGenerator for StringFieldGenerator<T> {
/// of seconds since the data generator started running
#[derive(Debug)]
pub struct UptimeFieldGenerator {
name: String,
/// The name (key) of the field
pub name: String,
execution_start_time: i64,
kind: specification::UptimeKind,
/// The specification type of the uptime field. Either an int64 or a string
pub kind: specification::UptimeKind,
}
impl UptimeFieldGenerator {
@ -320,143 +313,43 @@ impl UptimeFieldGenerator {
execution_start_time,
}
}
}
impl FieldGenerator for UptimeFieldGenerator {
fn generate(&mut self, _timestamp: i64) -> Field {
use specification::UptimeKind::*;
/// Generates the uptime as an i64
pub fn generate_value(&mut self) -> i64 {
let elapsed = Duration::from_nanos((now_ns() - self.execution_start_time) as u64);
let elapsed_seconds = elapsed.as_secs();
match self.kind {
I64 => Field::new(&self.name, elapsed_seconds as i64),
Telegraf => {
let days = elapsed_seconds / (60 * 60 * 24);
let days_plural = if days == 1 { "" } else { "s" };
let mut minutes = elapsed_seconds / 60;
let mut hours = minutes / 60;
hours %= 24;
minutes %= 60;
let duration_string =
format!("{} day{}, {:02}:{:02}", days, days_plural, hours, minutes);
Field::new(&self.name, duration_string)
}
}
elapsed.as_secs() as i64
}
}
fn field_spec_to_generator<T: DataGenRng>(
agent_name: &str,
agent_id: usize,
measurement_id: usize,
field_id: usize,
spec: &specification::FieldSpec,
parent_seed: &str,
execution_start_time: i64,
) -> Result<Box<dyn FieldGenerator + Send>> {
use specification::FieldValueSpec::*;
/// Generates the uptime as a string, which is what should be used if `self.kind == specification::UptimeKind::Telegraf
pub fn generate_value_as_string(&mut self) -> String {
let elapsed_seconds = self.generate_value();
let days = elapsed_seconds / (60 * 60 * 24);
let days_plural = if days == 1 { "" } else { "s" };
let spec_name = Substitute::once(
&spec.name,
&[
("agent_id", &agent_id.to_string()),
("measurement_id", &measurement_id.to_string()),
("field_id", &field_id.to_string()),
],
)
.context(CouldNotCreateFieldName)?;
let mut minutes = elapsed_seconds / 60;
let mut hours = minutes / 60;
hours %= 24;
minutes %= 60;
Ok(match &spec.field_value_spec {
Bool(true) => Box::new(BooleanFieldGenerator::<T>::new(&spec_name, parent_seed)),
Bool(false) => unimplemented!("Not sure what false means for bool fields yet"),
I64 {
range,
increment,
reset_after,
} => Box::new(I64FieldGenerator::<T>::new(
&spec_name,
range,
*increment,
*reset_after,
parent_seed,
)),
F64 { range } => Box::new(F64FieldGenerator::<T>::new(&spec_name, range, parent_seed)),
String {
pattern,
replacements,
} => Box::new(StringFieldGenerator::<T>::new(
agent_name,
&spec_name,
pattern,
parent_seed,
replacements.to_vec(),
)?),
Uptime { kind } => Box::new(UptimeFieldGenerator::new(
&spec_name,
kind,
execution_start_time,
)),
})
format!("{} day{}, {:02}:{:02}", days, days_plural, hours, minutes)
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::{DynamicRng, ZeroRng, TEST_SEED};
use crate::specification::UptimeKind;
use rand::SeedableRng;
use test_helpers::approximately_equal;
type Error = Box<dyn std::error::Error>;
type Result<T = (), E = Error> = std::result::Result<T, E>;
// Shortcut functions that panic for getting values out of fields for test convenience
impl Field {
fn i64(&self) -> i64 {
match self.value {
FieldValue::I64(v) => v,
ref other => panic!("expected i64, got {:?}", other),
}
}
fn f64(&self) -> f64 {
match self.value {
FieldValue::F64(v) => v,
ref other => panic!("expected f64, got {:?}", other),
}
}
fn bool(&self) -> bool {
match self.value {
FieldValue::Bool(v) => v,
ref other => panic!("expected bool, got {:?}", other),
}
}
fn string(&self) -> String {
match &self.value {
FieldValue::String(v) => v.clone(),
ref other => panic!("expected String, got {:?}", other),
}
}
}
#[test]
fn generate_boolean_field() {
let mut bfg = BooleanFieldGenerator::<ZeroRng>::new("bfg", TEST_SEED);
assert!(!bfg.generate(1234).bool());
}
#[test]
fn generate_i64_field_always_the_same() {
// If the specification has the same number for the start and end of the
// range...
let mut i64fg =
I64FieldGenerator::<DynamicRng>::new("i64fg", &(3..3), false, None, TEST_SEED);
I64FieldGenerator::new("i64fg", &(3..3), false, None, SmallRng::from_entropy());
let i64_fields: Vec<_> = (0..10).map(|_| i64fg.generate(1234).i64()).collect();
let i64_fields: Vec<_> = (0..10).map(|_| i64fg.generate_value()).collect();
let expected = i64_fields[0];
// All the values generated will always be the same.
@ -468,9 +361,9 @@ mod test {
// If the specification has n for the start and n+1 for the end of the range...
let mut i64fg =
I64FieldGenerator::<DynamicRng>::new("i64fg", &(4..5), false, None, TEST_SEED);
I64FieldGenerator::new("i64fg", &(4..5), false, None, SmallRng::from_entropy());
let i64_fields: Vec<_> = (0..10).map(|_| i64fg.generate(1234).i64()).collect();
let i64_fields: Vec<_> = (0..10).map(|_| i64fg.generate_value()).collect();
// We know what the value will be even though we're using a real random number generator
let expected = 4;
@ -488,9 +381,9 @@ mod test {
let range = 3..1000;
let mut i64fg =
I64FieldGenerator::<DynamicRng>::new("i64fg", &range, false, None, TEST_SEED);
I64FieldGenerator::new("i64fg", &range, false, None, SmallRng::from_entropy());
let val = i64fg.generate(1234).i64();
let val = i64fg.generate_value();
assert!(range.contains(&val), "`{}` was not in the range", val);
}
@ -498,12 +391,12 @@ mod test {
#[test]
fn generate_incrementing_i64_field() {
let mut i64fg =
I64FieldGenerator::<DynamicRng>::new("i64fg", &(3..10), true, None, TEST_SEED);
I64FieldGenerator::new("i64fg", &(3..10), true, None, SmallRng::from_entropy());
let val1 = i64fg.generate(1234).i64();
let val2 = i64fg.generate(1234).i64();
let val3 = i64fg.generate(1234).i64();
let val4 = i64fg.generate(1234).i64();
let val1 = i64fg.generate_value();
let val2 = i64fg.generate_value();
let val3 = i64fg.generate_value();
let val4 = i64fg.generate_value();
assert!(val1 < val2, "`{}` < `{}` was false", val1, val2);
assert!(val2 < val3, "`{}` < `{}` was false", val2, val3);
@ -512,7 +405,7 @@ mod test {
#[test]
fn incrementing_i64_wraps() {
let rng = RandomNumberGenerator::<DynamicRng>::new(TEST_SEED);
let rng = SmallRng::from_entropy();
let range = 3..10;
let previous_value = i64::MAX;
@ -530,7 +423,7 @@ mod test {
let resulting_range =
range.start.wrapping_add(previous_value)..range.end.wrapping_add(previous_value);
let val = i64fg.generate(1234).i64();
let val = i64fg.generate_value();
assert!(
resulting_range.contains(&val),
@ -542,13 +435,18 @@ mod test {
#[test]
fn incrementing_i64_that_resets() {
let reset_after = Some(3);
let mut i64fg =
I64FieldGenerator::<DynamicRng>::new("i64fg", &(3..10), true, reset_after, TEST_SEED);
let mut i64fg = I64FieldGenerator::new(
"i64fg",
&(3..10),
true,
reset_after,
SmallRng::from_entropy(),
);
let val1 = i64fg.generate(1234).i64();
let val2 = i64fg.generate(1234).i64();
let val3 = i64fg.generate(1234).i64();
let val4 = i64fg.generate(1234).i64();
let val1 = i64fg.generate_value();
let val2 = i64fg.generate_value();
let val3 = i64fg.generate_value();
let val4 = i64fg.generate_value();
assert!(val1 < val2, "`{}` < `{}` was false", val1, val2);
assert!(val2 < val3, "`{}` < `{}` was false", val2, val3);
@ -561,9 +459,9 @@ mod test {
// range...
let start_and_end = 3.0;
let range = start_and_end..start_and_end;
let mut f64fg = F64FieldGenerator::<DynamicRng>::new("f64fg", &range, TEST_SEED);
let mut f64fg = F64FieldGenerator::new("f64fg", &range, SmallRng::from_entropy());
let f64_fields: Vec<_> = (0..10).map(|_| f64fg.generate(1234).f64()).collect();
let f64_fields: Vec<_> = (0..10).map(|_| f64fg.generate_value()).collect();
// All the values generated will always be the same known value.
assert!(
@ -578,178 +476,46 @@ mod test {
#[test]
fn generate_f64_field_within_a_range() {
let range = 3.0..1000.0;
let mut f64fg = F64FieldGenerator::<DynamicRng>::new("f64fg", &range, TEST_SEED);
let mut f64fg = F64FieldGenerator::new("f64fg", &range, SmallRng::from_entropy());
let val = f64fg.generate(1234).f64();
let val = f64fg.generate_value();
assert!(range.contains(&val), "`{}` was not in the range", val);
}
#[test]
fn generate_string_field_without_replacements() {
let fake_now = 11111;
fn generate_string_field_with_data() {
let fake_now = 1633595510000000000;
let mut stringfg = StringFieldGenerator::<DynamicRng>::new(
"agent_name",
"stringfg",
"my value",
TEST_SEED,
let mut stringfg = StringFieldGenerator::new(
"str",
r#"my value {{measurement.name}} {{format-time "%Y-%m-%d"}}"#,
json!({"measurement": {"name": "foo"}}),
vec![],
SmallRng::from_entropy(),
)
.unwrap();
assert_eq!("my value", stringfg.generate(fake_now).string());
assert_eq!("my value foo 2021-10-07", stringfg.generate_value(fake_now));
}
#[test]
fn generate_string_field_with_provided_replacements() {
let fake_now = 5555555555;
let mut stringfg = StringFieldGenerator::<DynamicRng>::new(
"double-oh-seven",
"stringfg",
r#"{{agent_name}}---{{random 16}}---{{format-time "%s%f"}}"#,
TEST_SEED,
vec![],
)
.unwrap();
let string_val1 = stringfg.generate(fake_now).string();
let string_val2 = stringfg.generate(fake_now).string();
assert!(
string_val1.starts_with("double-oh-seven---"),
"`{}` did not start with `double-oh-seven---`",
string_val1
);
assert!(
string_val1.ends_with("---5555555555"),
"`{}` did not end with `---5555555555`",
string_val1
);
assert!(
string_val2.starts_with("double-oh-seven---"),
"`{}` did not start with `double-oh-seven---`",
string_val2
);
assert!(
string_val2.ends_with("---5555555555"),
"`{}` did not end with `---5555555555`",
string_val2
);
assert_ne!(string_val1, string_val2, "random value should change");
}
#[test]
#[should_panic(expected = "Unable to substitute string field value")]
fn unknown_replacement_errors() {
let fake_now = 55555;
let mut stringfg = StringFieldGenerator::<DynamicRng>::new(
"arbitrary",
"stringfg",
"static-{{unknown}}",
TEST_SEED,
vec![],
)
.unwrap();
stringfg.generate(fake_now);
}
#[test]
fn replacements_no_weights() -> Result<()> {
let fake_now = 55555;
let toml: specification::FieldSpec = toml::from_str(
r#"
name = "sf"
pattern = "foo {{level}}"
replacements = [
{replace = "level", with = ["info", "warn", "error"]}
]"#,
)
.unwrap();
let mut stringfg =
field_spec_to_generator::<ZeroRng>("agent_name", 0, 0, 0, &toml, TEST_SEED, fake_now)?;
assert_eq!("foo info", stringfg.generate(fake_now).string());
Ok(())
}
#[test]
fn replacements_with_weights() -> Result<()> {
let fake_now = 55555;
let toml: specification::FieldSpec = toml::from_str(
r#"
name = "sf"
pattern = "foo {{level}}"
replacements = [
{replace = "level", with = [["info", 1000000], ["warn", 1], ["error", 0]]}
]"#,
)
.unwrap();
let mut stringfg =
field_spec_to_generator::<ZeroRng>("agent_name", 0, 0, 0, &toml, TEST_SEED, fake_now)?;
assert_eq!("foo info", stringfg.generate(fake_now).string());
Ok(())
}
#[test]
fn uptime_i64() -> Result<()> {
let fake_now = 55555;
fn uptime_i64() {
// Pretend data generator started running 10 seconds ago
let seconds_ago = 10;
let fake_start_execution_time = now_ns() - seconds_ago * 1_000_000_000;
let execution_start_time = now_ns() - seconds_ago * 1_000_000_000;
let mut uptimefg = UptimeFieldGenerator::new("foo", &UptimeKind::I64, execution_start_time);
let toml: specification::FieldSpec = toml::from_str(
r#"
name = "arbitrary" # field name doesn't have to be uptime
uptime = "i64""#,
)
.unwrap();
let mut uptimefg = field_spec_to_generator::<DynamicRng>(
"agent_name",
0,
0,
0,
&toml,
TEST_SEED,
fake_start_execution_time,
)?;
assert_eq!(seconds_ago, uptimefg.generate(fake_now).i64());
Ok(())
assert_eq!(seconds_ago, uptimefg.generate_value());
}
#[test]
fn uptime_telegraf() -> Result<()> {
let fake_now = 55555;
fn uptime_telegraf() {
// Pretend data generator started running 10 days, 2 hours, and 33 minutes ago
let seconds_ago = 10 * 24 * 60 * 60 + 2 * 60 * 60 + 33 * 60;
let fake_start_execution_time = now_ns() - seconds_ago * 1_000_000_000;
let execution_start_time = now_ns() - seconds_ago * 1_000_000_000;
let mut uptimefg = UptimeFieldGenerator::new("foo", &UptimeKind::I64, execution_start_time);
let toml: specification::FieldSpec = toml::from_str(
r#"
name = "arbitrary" # field name doesn't have to be uptime
uptime = "telegraf""#,
)
.unwrap();
let mut uptimefg = field_spec_to_generator::<DynamicRng>(
"agent_name",
0,
0,
0,
&toml,
TEST_SEED,
fake_start_execution_time,
)?;
assert_eq!("10 days, 02:33", uptimefg.generate(fake_now).string());
assert_eq!("10 days, 02:33", uptimefg.generate_value_as_string());
// Pretend data generator started running 1 day, 14 hours, and 5 minutes ago
// to exercise different formatting
@ -758,20 +524,10 @@ mod test {
let seconds_in_5_minutes = 5 * 60;
let seconds_ago = seconds_in_1_day + seconds_in_14_hours + seconds_in_5_minutes;
let fake_start_execution_time = now_ns() - seconds_ago * 1_000_000_000;
let execution_start_time = now_ns() - seconds_ago * 1_000_000_000;
let mut uptimefg = field_spec_to_generator::<DynamicRng>(
"agent_name",
0,
0,
0,
&toml,
TEST_SEED,
fake_start_execution_time,
)?;
let mut uptimefg = UptimeFieldGenerator::new("foo", &UptimeKind::I64, execution_start_time);
assert_eq!("1 day, 14:05", uptimefg.generate(fake_now).string());
Ok(())
assert_eq!("1 day, 14:05", uptimefg.generate_value_as_string());
}
}

View File

@ -27,23 +27,21 @@
clippy::clone_on_ref_ptr
)]
use crate::substitution::Substitute;
use crate::tag_set::GeneratedTagSets;
use rand::Rng;
use rand_seeder::Seeder;
use crate::{agent::Agent, tag_set::GeneratedTagSets};
use snafu::{ResultExt, Snafu};
use std::sync::Arc;
use std::{
convert::TryFrom,
time::{SystemTime, UNIX_EPOCH},
};
use tracing::info;
pub mod agent;
pub mod field;
pub mod measurement;
pub mod specification;
pub mod substitution;
pub mod tag;
mod tag_pair;
pub mod tag_set;
pub mod write;
@ -72,10 +70,8 @@ pub enum Error {
},
/// Error that may happen when creating agents
#[snafu(display("Could not create agent `{}`, caused by:\n{}", name, source))]
#[snafu(display("Could not create agents, caused by:\n{}", source))]
CouldNotCreateAgent {
/// The name of the relevant agent
name: String,
/// Underlying `agent` module error that caused this problem
source: agent::Error,
},
@ -106,7 +102,7 @@ type Result<T, E = Error> = std::result::Result<T, E>;
/// If `start_datetime` or `end_datetime` are `None`, the current datetime will
/// be used.
#[allow(clippy::too_many_arguments)]
pub async fn generate<T: DataGenRng>(
pub async fn generate(
spec: &specification::DataSpec,
points_writer_builder: &mut write::PointsWriterBuilder,
start_datetime: Option<i64>,
@ -116,66 +112,47 @@ pub async fn generate<T: DataGenRng>(
batch_size: usize,
one_agent_at_a_time: bool, // run one agent after another, if printing to stdout
) -> Result<usize> {
let seed = spec.base_seed.to_owned().unwrap_or_else(|| {
let mut rng = rand::thread_rng();
format!("{:04}", rng.gen_range(0..10000))
});
let mut handles = vec![];
let generated_tag_sets = GeneratedTagSets::from_spec(spec).context(CouldNotGenerateTagSets)?;
let lock = Arc::new(tokio::sync::Mutex::new(()));
// for each agent specification
for agent_spec in &spec.agents {
// create iterators to `cycle` through for `agent_spec.tags`
let tag_set_iterator = tag::AgentTagIterator::new(&agent_spec.tags);
let start = std::time::Instant::now();
// create `count` number of agent instances, or 1 agent if no count is specified
let n_agents = agent_spec.count.unwrap_or(1);
for (agent_id, mut agent_tags) in tag_set_iterator.take(n_agents).enumerate() {
let agent_name =
Substitute::once(&agent_spec.name, &[("agent_id", &agent_id.to_string())])
.context(CouldNotCreateAgentName)?;
agent_tags.push(tag::Tag::new("data_spec", &spec.name));
if let Some(name_tag_key) = &agent_spec.name_tag_key {
agent_tags.push(tag::Tag::new(name_tag_key, &agent_name));
}
let mut agent = agent::Agent::<T>::new(
agent_spec,
&agent_name,
agent_id,
&seed,
agent_tags,
let agents = spec
.agents
.iter()
.map(|spec| {
Agent::from_spec(
spec,
start_datetime,
end_datetime,
execution_start_time,
continue_on,
&generated_tag_sets,
)
.context(CouldNotCreateAgent { name: &agent_name })?;
.context(CouldNotCreateAgent)
})
.collect::<Result<Vec<Vec<Agent>>>>()?;
let agents = agents.into_iter().flatten();
let agent_points_writer = points_writer_builder
.build_for_agent(&agent_name)
.context(CouldNotCreateAgentWriter { name: &agent_name })?;
for mut agent in agents {
let agent_points_writer = points_writer_builder
.build_for_agent(agent.id)
.context(CouldNotCreateAgentWriter { name: "whatevs" })?;
let lock_ref = Arc::clone(&lock);
let lock_ref = Arc::clone(&lock);
handles.push(tokio::task::spawn(async move {
// did this weird hack because otherwise the stdout outputs would be jumbled together garbage
if one_agent_at_a_time {
let _l = lock_ref.lock().await;
agent.generate_all(agent_points_writer, batch_size).await
} else {
agent.generate_all(agent_points_writer, batch_size).await
}
}));
}
handles.push(tokio::task::spawn(async move {
// did this weird hack because otherwise the stdout outputs would be jumbled together garbage
if one_agent_at_a_time {
let _l = lock_ref.lock().await;
agent.generate_all(agent_points_writer, batch_size).await
} else {
agent.generate_all(agent_points_writer, batch_size).await
}
}));
}
let mut total_points = 0;
@ -186,72 +163,21 @@ pub async fn generate<T: DataGenRng>(
.context(AgentCouldNotGeneratePoints)?;
}
let elapsed = start.elapsed();
let points_sec = if elapsed.as_secs() == 0 {
0
} else {
total_points as u64 / elapsed.as_secs()
};
info!(
"wrote {} total points in {:?} for a rate of {}/sec",
total_points, elapsed, points_sec
);
Ok(total_points)
}
/// Shorthand trait for the functionality this crate needs a random number generator to have
pub trait DataGenRng: rand::Rng + rand::SeedableRng + Send + 'static {}
impl<T: rand::Rng + rand::SeedableRng + Send + 'static> DataGenRng for T {}
/// Encapsulating the creation of an optionally-seedable random number generator
/// to make this easy to change. Uses a 4-digit number expressed as a `String`
/// as the seed type to enable easy creation of another instance using the same
/// seed.
#[derive(Debug)]
pub struct RandomNumberGenerator<T: DataGenRng> {
rng: T,
/// The seed used for this instance.
pub seed: String,
}
impl<T: DataGenRng> Default for RandomNumberGenerator<T> {
fn default() -> Self {
let mut rng = rand::thread_rng();
let seed = format!("{:04}", rng.gen_range(0..10000));
Self::new(seed)
}
}
impl<T: DataGenRng> RandomNumberGenerator<T> {
/// Create a new instance using the specified seed.
pub fn new(seed: impl Into<String>) -> Self {
let seed = seed.into();
Self {
rng: Seeder::from(&seed).make_rng(),
seed,
}
}
/// Generate a random GUID
pub fn guid(&mut self) -> uuid::Uuid {
let mut bytes = [0u8; 16];
self.rng.fill_bytes(&mut bytes);
uuid::Builder::from_bytes(bytes)
.set_variant(uuid::Variant::RFC4122)
.set_version(uuid::Version::Random)
.build()
}
}
impl<T: DataGenRng> rand::RngCore for RandomNumberGenerator<T> {
fn next_u32(&mut self) -> u32 {
self.rng.next_u32()
}
fn next_u64(&mut self) -> u64 {
self.rng.next_u64()
}
fn fill_bytes(&mut self, dest: &mut [u8]) {
self.rng.fill_bytes(dest);
}
fn try_fill_bytes(&mut self, dest: &mut [u8]) -> std::result::Result<(), rand::Error> {
self.rng.try_fill_bytes(dest)
}
}
/// Gets the current time in nanoseconds since the epoch
pub fn now_ns() -> i64 {
let since_the_epoch = SystemTime::now()
@ -260,56 +186,6 @@ pub fn now_ns() -> i64 {
i64::try_from(since_the_epoch.as_nanos()).expect("Time does not fit")
}
// Always returns 0.
#[cfg(test)]
#[derive(Default)]
struct ZeroRng;
#[cfg(test)]
impl rand::RngCore for ZeroRng {
fn next_u32(&mut self) -> u32 {
self.next_u64() as u32
}
fn next_u64(&mut self) -> u64 {
0
}
fn fill_bytes(&mut self, dest: &mut [u8]) {
rand_core::impls::fill_bytes_via_next(self, dest)
}
fn try_fill_bytes(&mut self, dest: &mut [u8]) -> std::result::Result<(), rand::Error> {
self.fill_bytes(dest);
Ok(())
}
}
#[cfg(test)]
impl rand::SeedableRng for ZeroRng {
type Seed = Vec<u8>;
// Ignore the seed value
fn from_seed(_seed: Self::Seed) -> Self {
Self
}
}
// The test rng ignores the seed anyway, so the seed specified doesn't matter.
#[cfg(test)]
const TEST_SEED: &str = "";
#[cfg(test)]
fn test_rng() -> RandomNumberGenerator<ZeroRng> {
RandomNumberGenerator::<ZeroRng>::new(TEST_SEED)
}
// A random number type that does *not* have a predictable sequence of values for use in tests
// that assert on properties rather than exact values. Aliased for convenience in changing to
// a different Rng type.
#[cfg(test)]
type DynamicRng = rand::rngs::SmallRng;
#[cfg(test)]
mod test {
use super::*;
@ -326,20 +202,16 @@ mod test {
name = "demo_schema"
[[agents]]
name = "basic"
sampling_interval = "10s" # seconds
[[agents.measurements]]
name = "cpu"
[[agents.measurements.fields]]
name = "up"
bool = true"#;
name = "val"
i64_range = [1, 1]"#;
let data_spec = DataSpec::from_str(toml).unwrap();
let agent_id = 0;
let agent_spec = &data_spec.agents[0];
// Take agent_tags out of the equation for the purposes of this test
let agent_tags = vec![];
let execution_start_time = now_ns();
@ -351,12 +223,8 @@ bool = true"#;
let generated_tag_sets = GeneratedTagSets::default();
let mut agent = agent::Agent::<ZeroRng>::new(
let mut agent = agent::Agent::from_spec(
agent_spec,
&agent_spec.name,
agent_id,
TEST_SEED,
agent_tags,
start_datetime,
end_datetime,
execution_start_time,
@ -364,7 +232,7 @@ bool = true"#;
&generated_tag_sets,
)?;
let data_points = agent.generate().await?;
let data_points = agent[0].generate().await?.into_iter().flatten();
let mut v = Vec::new();
for data_point in data_points {
data_point.write_data_point_to(&mut v).unwrap();
@ -372,10 +240,10 @@ bool = true"#;
let line_protocol = String::from_utf8(v).unwrap();
// Get a point for time 0
let expected_line_protocol = "cpu up=f 0\n";
let expected_line_protocol = "cpu val=1i 0\n";
assert_eq!(line_protocol, expected_line_protocol);
let data_points = agent.generate().await?;
let data_points = agent[0].generate().await?.into_iter().flatten();
let mut v = Vec::new();
for data_point in data_points {
data_point.write_data_point_to(&mut v).unwrap();
@ -383,11 +251,12 @@ bool = true"#;
let line_protocol = String::from_utf8(v).unwrap();
// Get a point for time 10s
let expected_line_protocol = "cpu up=f 10000000000\n";
let expected_line_protocol = "cpu val=1i 10000000000\n";
assert_eq!(line_protocol, expected_line_protocol);
// Don't get any points anymore because we're past the ending datetime
let data_points = agent.generate().await?;
let data_points = agent[0].generate().await?.into_iter().flatten();
let data_points: Vec<_> = data_points.collect();
assert!(
data_points.is_empty(),
"expected no data points, got {:?}",

File diff suppressed because it is too large Load Diff

View File

@ -28,33 +28,8 @@ type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Deserialize, Debug)]
#[serde(deny_unknown_fields)]
pub struct DataSpec {
/// If `include_spec_tag` is set to true, eveyr point generated from this
/// configuration will contain a tag `data_spec=[this value]` to identify
/// what generated that data. This name can also be used in string replacements
/// by using the placeholder `{{data_spec}}`.
/// This name can be referenced in handlebars templates as `{{spec_name}}`
pub name: String,
/// If set to true, all samples generated will include a `data_spec=[name]`
/// key/value pair.
pub include_spec_tag: Option<bool>,
/// A string to be used as the seed to the random number generators.
///
/// When specified, this is used as a base seed propagated through all
/// measurements, tags, and fields, which will each have their own
/// random number generator seeded by this seed plus their name. This
/// has the effect of keeping each value sequence generated per measurement,
/// tag, or field stable even if the configurations in other parts of the
/// schema are changed. That is, if you have a field named `temp` and on
/// the first run with base seed `foo` generates the values `[10, 50,
/// 72, 3]`, and then you add another field named `weight` to the schema
/// and run with base seed `foo` again, the values generated for `temp`
/// should again be `[10, 50, 72, 3]`. This enables incremental
/// development of a schema without churn, if that is undesired.
///
/// When this is not specified, the base seed will be randomly generated. It
/// will be printed to stdout so that the value used can be specified in
/// future configurations if reproducing a particular set of sequences
/// is desired.
pub base_seed: Option<String>,
/// Specifies values that are generated before agents are created. These values
/// can be used in tag set specs, which will pre-create tag sets that can then be
/// used by the agent specs.
@ -96,7 +71,7 @@ impl FromStr for DataSpec {
pub struct ValuesSpec {
/// The name of the collection of values
pub name: String,
/// The handlebars template to create each value in the collection
/// If values not specified this handlebars template will be used to create each value in the collection
pub template: String,
/// How many of these values should be generated. If belongs_to is
/// specified, each parent will have this many of this value. So
@ -141,43 +116,26 @@ pub struct TagSetsSpec {
#[cfg_attr(test, derive(Default))]
#[serde(deny_unknown_fields)]
pub struct AgentSpec {
/// Used as the value for the `name` tag if `name_tag_key` is `Some`; has no
/// effect if `name_tag_key` is not specified.
///
/// Can be a plain string or a string with placeholders for:
///
/// - `{{agent_id}}` - the agent ID
pub name: String,
/// Specifies the number of agents that should be created with this spec.
/// Default value is 1.
pub count: Option<usize>,
/// How often this agent should generate samples, in a duration string. If
/// not specified, this agent will only generate one sample.
pub sampling_interval: Option<String>,
/// If specified, every measurement generated by this agent will include a
/// tag with this `String` as its key, and with the `AgentSpec`'s `name`
/// as the value (with any substitutions in the `name` performed)
pub name_tag_key: Option<String>,
/// If specified, the values of the tags will be cycled through per `Agent`
/// instance such that all measurements generated by that agent will
/// contain tags with the specified name and that agent's `name` field
/// (with replacements made) as the value.
#[serde(default)]
pub tags: Vec<AgentTag>,
/// The specifications for the measurements for the agent to generate.
pub measurements: Vec<MeasurementSpec>,
}
/// Tags that are associated to all measurements that a particular agent
/// generates. The values are rotated through so that each agent gets one of the
/// specified values for this key.
#[derive(Deserialize, Debug)]
#[serde(deny_unknown_fields)]
pub struct AgentTag {
/// The tag key to use when adding this tag to all measurements for an agent
pub key: String,
/// The values to cycle through for each agent for this tag key
pub values: Vec<String>,
/// A collection of strings that reference other `Values` collections. Each agent will have one
/// of the referenced has_one. Further, when generating this, the has_one collection
/// will cycle through so that each successive agent will use the next has_one value
/// for association
#[serde(default)]
pub has_one: Vec<String>,
/// Specification of tag key/value pairs that get generated once and reused for
/// every sampling. Every measurement (and thus line) will have these tag pairs added onto it.
/// The template can use `{{agent.id}}` to reference the agent's id and `{{guid}}` or `{{random N}}`
/// to generate random strings.
#[serde(default)]
pub tag_pairs: Vec<TagPairSpec>,
}
/// The specification of how to generate data points for a particular
@ -189,17 +147,15 @@ pub struct MeasurementSpec {
/// Name of the measurement. Can be a plain string or a string with
/// placeholders for:
///
/// - `{{agent_id}}` - the agent ID
/// - `{{measurement_id}}` - the measurement's ID, which must be used if
/// - `{{agent.id}}` - the agent ID
/// - `{{measurement.id}}` - the measurement's ID, which must be used if
/// `count` > 1 so that unique measurement names are created
/// - `{{id}}` - the measurement ID
pub name: String,
/// The number of measurements with this configuration that should be
/// created. Default value is 1. If specified, use `{{measurement_id}}`
/// created. Default value is 1. If specified, use `{{id}}`
/// in this measurement's `name` to create unique measurements.
pub count: Option<usize>,
/// Specification of the tags for this measurement
#[serde(default)]
pub tags: Vec<TagSpec>,
/// Specifies a tag set to include in every sampling in addition to tags specified
pub tag_set: Option<String>,
/// Specification of tag key/value pairs that get generated once and reused for
@ -213,71 +169,26 @@ pub struct MeasurementSpec {
/// Specification of a tag key/value pair whose template will be evaluated once and
/// the value will be reused across every sampling.
#[derive(Deserialize, Debug)]
#[derive(Deserialize, Debug, Clone)]
#[cfg_attr(test, derive(Default))]
#[serde(deny_unknown_fields)]
pub struct TagPairSpec {
/// The tag key
/// The tag key. If `count` is specified, the id of the tag will be automatically
/// appended to the end of the key to ensure it is unique.
pub key: String,
/// The template the generate the tag value
pub template: String,
}
/// The specification of how to generate tag keys and values for a particular
/// measurement.
#[derive(Deserialize, Debug)]
#[cfg_attr(test, derive(Default))]
#[serde(deny_unknown_fields)]
pub struct TagSpec {
/// Key/name for this tag. Can be a plain string or a string with
/// placeholders for:
///
/// - `{{agent_id}}` - the agent ID
/// - `{{measurement_id}}` - the measurement ID
/// - `{{tag_id}}` - the tag ID, which must be used if `count` > 1 so that
/// unique tag names are created
pub name: String,
/// Value for this tag. Can be a plain string or a string with placeholders
/// for:
///
/// - `{{agent_id}}` - the agent ID
/// - `{{measurement_id}}` - the measurement ID
/// - `{{cardinality}}` - the cardinality counter value. Must use this or
/// `{{guid}}` if `cardinality` > 1 so that unique tag values are created
/// - `{{counter}}` - the increment counter value. Only useful if
/// `increment_every` is set.
/// - `{{guid}}` - a randomly generated unique string. If `cardinality` > 1,
/// each tag will have a different GUID.
pub value: String,
/// The number of tags with this configuration that should be created.
/// Default value is 1. If specified, use `{{tag_id}}` in this tag's
/// `name` to create unique tags.
/// If specified, this number of tags will be generated with this template. Each will
/// have a key of `key#` where # is the number. Useful for creating a degenerate case
/// of having dozens or hundreds of tags
pub count: Option<usize>,
/// A number that controls how many values are generated, which impacts how
/// many rows are created for each agent generation. Default value is 1.
/// If specified, use `{{cardinality}}` or `{{guid}}` in this tag's
/// `value` to create unique values.
pub cardinality: Option<u32>,
/// How often to increment the `{{counter}}` value. For example, if
/// `increment_every` is set to 10, `{{counter}}` will increase by 1
/// after every 10 agent generations. This simulates temporal tag values
/// like process IDs or container IDs in tags. If not specified, the value
/// of `{{counter}}` will always be 0.
pub increment_every: Option<usize>,
/// A list of replacement placeholders and the values to replace them with.
/// The values can optionally have weights associated with them to
/// change the probabilities that its value will be used.
#[serde(default)]
pub replacements: Vec<Replacement>,
/// When there are replacements specified and other tags in this measurement
/// with cardinality greater than 1, this option controls whether this
/// tag will get a new replacement value on every line in a generation
/// (`true`) or whether it will be sampled once and have the same value
/// on every line in a generation (`false`). If there are no replacements on
/// this tag or any other tags with a cardinality greater than one, this
/// has no effect.
#[serde(default)]
pub resample_every_line: bool,
/// If specified, the tag template will be re-evaluated after this many lines have been
/// generated. This will go across samplings. For example, if you have this set to 3 and
/// each sample generates two lines, it will get regenerated after the first line in the
/// second sample. This is useful for simulating things like tracing use cases or ephemeral
/// identifiers like process or container IDs. The template has access to the normal data
/// accessible as well as `line_number`.
pub regenerate_after_lines: Option<usize>,
}
/// The specification of how to generate field keys and values for a particular
@ -553,119 +464,28 @@ impl ReplacementValue {
mod test {
use super::*;
type Error = Box<dyn std::error::Error>;
type Result<T = (), E = Error> = std::result::Result<T, E>;
static TELEGRAF_TOML: &str = include_str!("../schemas/telegraf.toml");
#[test]
fn parse_spec() -> Result {
let spec = DataSpec::from_str(TELEGRAF_TOML)?;
fn sample_schemas_parse() {
let schemas: Vec<&str> = vec![
include_str!("../schemas/storage_cardinality_example.toml"),
include_str!("../schemas/cap-write.toml"),
include_str!("../schemas/tracing-spec.toml"),
include_str!("../schemas/full_example.toml"),
];
assert_eq!(spec.name, "demo_schema");
assert_eq!(spec.agents.len(), 2);
let agent0 = &spec.agents[0];
assert_eq!(agent0.name, "demo");
let agent0_measurements = &agent0.measurements;
assert_eq!(agent0_measurements.len(), 1);
let a0m0 = &agent0_measurements[0];
assert_eq!(a0m0.name, "some_measurement");
let a0m0_fields = &a0m0.fields;
assert_eq!(a0m0_fields.len(), 5);
let a0m0f0 = &a0m0_fields[0];
assert_eq!(a0m0f0.name, "field1");
assert_eq!(a0m0f0.field_value_spec, FieldValueSpec::Bool(true));
let a0m0f1 = &a0m0_fields[1];
assert_eq!(a0m0f1.name, "field2");
assert_eq!(
a0m0f1.field_value_spec,
FieldValueSpec::I64 {
range: 3..200,
increment: false,
reset_after: None,
for s in schemas {
if let Err(e) = DataSpec::from_str(s) {
panic!("error {:?} on\n{}", e, s)
}
);
let a0m0f2 = &a0m0_fields[2];
assert_eq!(a0m0f2.name, "field3");
assert_eq!(
a0m0f2.field_value_spec,
FieldValueSpec::I64 {
range: 1000..5000,
increment: true,
reset_after: None,
}
);
let a0m0f3 = &a0m0_fields[3];
assert_eq!(a0m0f3.name, "field4");
assert_eq!(
a0m0f3.field_value_spec,
FieldValueSpec::F64 { range: 0.0..100.0 }
);
let a0m0f4 = &a0m0_fields[4];
assert_eq!(a0m0f4.name, "field5");
assert_eq!(
a0m0f4.field_value_spec,
FieldValueSpec::String {
pattern:
"{{agent_name}} foo {{level}} {{format-time \"%Y-%m-%d %H:%M\"}} {{random 200}}"
.into(),
replacements: vec![
Replacement {
replace: "color".into(),
with: vec![
ReplacementValue::String("red".into()),
ReplacementValue::String("blue".into()),
ReplacementValue::String("green".into())
],
},
Replacement {
replace: "level".into(),
with: vec![
ReplacementValue::Weighted("info".into(), 800),
ReplacementValue::Weighted("warn".into(), 195),
ReplacementValue::Weighted("error".into(), 5)
],
}
],
}
);
Ok(())
}
#[test]
fn parse_fully_supported_spec() -> Result<()> {
// The fully supported spec is mostly for manual testing, but we should make
// sure while developing that it's valid as well so that when we go to
// do manual testing it isn't broken
// Also read it from the file to test `DataSpec::from_file` rather than
// include_str
let data_spec = DataSpec::from_file("schemas/fully-supported.toml")?;
assert_eq!(data_spec.name, "demo_schema");
Ok(())
}
}
#[test]
fn not_specifying_vectors_gets_default_empty_vector() {
let toml = r#"
name = "demo_schema"
base_seed = "this is a demo"
[[agents]]
name = "basic"
[[agents.measurements]]
name = "cpu"
@ -677,11 +497,11 @@ pattern = "server"
let spec = DataSpec::from_str(toml).unwrap();
let agent0 = &spec.agents[0];
assert!(agent0.tags.is_empty());
assert!(agent0.tag_pairs.is_empty());
let agent0_measurements = &agent0.measurements;
let a0m0 = &agent0_measurements[0];
assert!(a0m0.tags.is_empty());
assert!(a0m0.tag_pairs.is_empty());
let a0m0_fields = &a0m0.fields;
let a0m0f0 = &a0m0_fields[0];

View File

@ -1,60 +1,76 @@
//! Substituting dynamic values into a template as specified in various places
//! in the schema.
use crate::{specification, DataGenRng, RandomNumberGenerator};
use crate::specification;
use chrono::prelude::*;
use handlebars::{
Context, Handlebars, Helper, HelperDef, HelperResult, Output, RenderContext, RenderError,
};
use rand::{distributions::Alphanumeric, seq::SliceRandom, Rng};
use serde::Serialize;
use rand::rngs::SmallRng;
use rand::{distributions::Alphanumeric, seq::SliceRandom, Rng, RngCore};
use serde_json::Value;
use snafu::{ResultExt, Snafu};
use std::{collections::BTreeMap, convert::TryInto, sync::Mutex};
use std::{collections::BTreeMap, convert::TryInto};
/// Substitution-specific Results
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Errors that may happen while substituting values into templates.
#[derive(Snafu, Debug)]
#[allow(missing_docs)]
pub enum Error {
/// Error that may happen when substituting placeholder values
#[snafu(display(
"Could not perform text substitution in `{}`, caused by:\n{}",
template,
source
))]
CantCompileTemplate {
/// Underlying Handlebars error that caused this problem
source: handlebars::TemplateError,
/// Template that caused this problem
template: String,
},
/// Error that may happen when substituting placeholder values
#[snafu(display("Could not render template {}, caused by: {}", name, source))]
CantRenderTemplate {
name: String,
source: handlebars::RenderError,
},
#[snafu(display(
"Could not perform text substitution in `{}`, caused by:\n{}",
template,
source
))]
CantPerformSubstitution {
/// Underlying Handlebars error that caused this problem
source: handlebars::RenderError,
/// Template that caused this problem
template: String,
},
}
#[derive(Debug)]
pub(crate) struct RandomHelper<T: DataGenRng>(Mutex<RandomNumberGenerator<T>>);
impl<T: DataGenRng> RandomHelper<T> {
#[allow(dead_code)]
pub(crate) fn new(rng: Mutex<RandomNumberGenerator<T>>) -> Self {
Self(rng)
}
pub(crate) fn render_once(name: &str, template: impl Into<String>, data: &Value) -> Result<String> {
let mut registry = new_handlebars_registry();
registry.set_strict_mode(true);
let template = template.into();
registry
.register_template_string(name, &template)
.context(CantCompileTemplate { template })?;
registry
.render(name, data)
.context(CantRenderTemplate { name })
}
impl<T: DataGenRng> HelperDef for RandomHelper<T> {
pub(crate) fn new_handlebars_registry() -> Handlebars<'static> {
let mut registry = Handlebars::new();
registry.set_strict_mode(true);
registry.register_helper("format-time", Box::new(FormatNowHelper));
registry.register_helper("random", Box::new(RandomHelper));
registry.register_helper("guid", Box::new(GuidHelper));
registry
}
#[derive(Debug)]
pub(crate) struct RandomHelper;
impl HelperDef for RandomHelper {
fn call<'reg: 'rc, 'rc>(
&self,
h: &Helper<'_, '_>,
@ -72,7 +88,7 @@ impl<T: DataGenRng> HelperDef for RandomHelper<T> {
.try_into()
.map_err(|_| RenderError::new("`random`'s parameter must fit in a usize"))?;
let rng = &mut *self.0.lock().expect("mutex poisoned");
let mut rng = rand::thread_rng();
let random: String = std::iter::repeat(())
.map(|()| rng.sample(Alphanumeric))
@ -117,86 +133,39 @@ impl HelperDef for FormatNowHelper {
}
}
/// Given a handlebars template containing placeholders within double curly
/// brackets like `{{placeholder}}` and a list of `(placeholder, substitution
/// value)` pairs, place the values in the template where the relevant
/// placeholder is.
#[derive(Debug)]
pub struct Substitute {
handlebars: Handlebars<'static>,
template: String,
}
pub(crate) struct GuidHelper;
impl Substitute {
/// Compile and evaluate a template once. If you need to evaluate
/// it multiple times, construct an instance via [`new`](Self::new).
///
/// If a placeholder appears in a template but not in the list of
/// substitution values, this will return an error.
pub fn once(template: &str, values: &[(&str, &str)]) -> Result<String> {
let values = values
.iter()
.map(|&(k, v)| (k, v))
.collect::<BTreeMap<_, _>>();
let me = Self::new_minimal(template)?;
me.evaluate(&values)
}
impl HelperDef for GuidHelper {
fn call<'reg: 'rc, 'rc>(
&self,
_h: &Helper<'_, '_>,
_: &Handlebars<'_>,
_: &Context,
_: &mut RenderContext<'_, '_>,
out: &mut dyn Output,
) -> HelperResult {
let mut rng = rand::thread_rng();
/// Compiles the handlebars template once, then allows reusing the
/// template multiple times via [`evaluate`](Self::evaluate). If you don't need to
/// reuse the template, you can use [`once`](Self::once).
pub fn new<T: DataGenRng>(
template: impl Into<String>,
rng: RandomNumberGenerator<T>,
) -> Result<Self> {
let mut me = Self::new_minimal(template)?;
me.set_random_number_generator(rng);
Ok(me)
}
let mut bytes = [0u8; 16];
rng.fill_bytes(&mut bytes);
let uid = uuid::Builder::from_bytes(bytes)
.set_variant(uuid::Variant::RFC4122)
.set_version(uuid::Version::Random)
.build()
.to_string();
fn new_minimal(template: impl Into<String>) -> Result<Self> {
let template = template.into();
out.write(&uid)?;
let mut handlebars = Handlebars::new();
handlebars.set_strict_mode(true);
handlebars.register_helper("format-time", Box::new(FormatNowHelper));
handlebars
.register_template_string("template", &template)
.context(CantCompileTemplate {
template: &template,
})?;
Ok(Self {
handlebars,
template,
})
}
fn set_random_number_generator<T: DataGenRng>(&mut self, rng: RandomNumberGenerator<T>) {
self.handlebars
.register_helper("random", Box::new(RandomHelper(Mutex::new(rng))));
}
/// Interpolates the values into the compiled template.
///
/// If a placeholder appears in a template but not in the list of
/// substitution values, this will return an error.
pub fn evaluate(&self, values: &impl Serialize) -> Result<String> {
self.handlebars
.render("template", &values)
.context(CantPerformSubstitution {
template: &self.template,
})
Ok(())
}
}
/// Given a random number generator and replacement specification, choose a
/// particular value from the list of possible values according to any specified
/// weights (or with equal probability if there are no weights).
pub fn pick_from_replacements<'a, T: DataGenRng>(
rng: &mut RandomNumberGenerator<T>,
pub fn pick_from_replacements<'a>(
rng: &mut SmallRng,
replacements: &'a [specification::Replacement],
) -> BTreeMap<&'a str, &'a str> {
replacements
@ -216,60 +185,47 @@ pub fn pick_from_replacements<'a, T: DataGenRng>(
#[cfg(test)]
mod test {
use super::*;
use crate::test_rng;
type Error = Box<dyn std::error::Error>;
type Result<T = (), E = Error> = std::result::Result<T, E>;
#[derive(Serialize)]
struct TimestampArgs {
timestamp: i64,
}
use serde_json::json;
#[test]
fn format_now_valid_strftime() -> Result {
let rng = test_rng();
let args = TimestampArgs {
timestamp: 1599154445000000000,
};
fn format_now_valid_strftime() {
let mut registry = new_handlebars_registry();
registry
.register_template_string("t", r#"the date is {{format-time "%Y-%m-%d"}}."#)
.unwrap();
let substitute =
Substitute::new(r#"the date is {{format-time "%Y-%m-%d"}}."#, rng).unwrap();
let timestamp: i64 = 1599154445000000000;
let value = registry
.render("t", &json!({ "timestamp": timestamp }))
.unwrap();
let value = substitute.evaluate(&args)?;
assert_eq!(value, "the date is 2020-09-03.");
Ok(())
assert_eq!(&value, "the date is 2020-09-03.");
}
#[test]
#[should_panic(expected = "a Display implementation returned an error unexpectedly: Error")]
fn format_now_invalid_strftime_panics() {
let rng = test_rng();
let args = TimestampArgs {
timestamp: 1599154445000000000,
};
let mut registry = new_handlebars_registry();
registry
.register_template_string("t", r#"the date is {{format-time "%-B"}}."#)
.unwrap();
let substitute = Substitute::new(r#"the date is {{format-time "%-B"}}."#, rng).unwrap();
substitute.evaluate(&args).expect("This is unreachable");
let timestamp: i64 = 1599154445000000000;
let _value = registry
.render("t", &json!({ "timestamp": timestamp }))
.expect("this is unreachable");
}
#[test]
fn format_now_missing_strftime() -> Result {
let rng = test_rng();
let args = TimestampArgs {
timestamp: 1599154445000000000,
};
fn format_now_missing_strftime() {
let mut registry = new_handlebars_registry();
registry
.register_template_string("t", r#"the date is {{format-time}}."#)
.unwrap();
let substitute = Substitute::new(r#"the date is {{format-time}}."#, rng).unwrap();
let timestamp: i64 = 1599154445000000000;
let result = registry.render("t", &json!({ "timestamp": timestamp }));
let result = substitute.evaluate(&args);
// TODO: better matching on the error
assert!(result.is_err());
Ok(())
}
}

View File

@ -1,495 +0,0 @@
//! Generating a set of tag keys and values given a specification
use crate::{
specification,
substitution::{pick_from_replacements, Substitute},
DataGenRng, RandomNumberGenerator,
};
use snafu::{ResultExt, Snafu};
use std::fmt;
/// Tag-specific Results
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Errors that may happen while creating tags
#[derive(Snafu, Debug)]
pub enum Error {
/// Error that may happen when substituting placeholder values in tag keys
#[snafu(display("Could not create tag key, caused by:\n{}", source))]
CouldNotCreateTagKey {
/// Underlying `substitution` module error that caused this problem
source: crate::substitution::Error,
},
/// Error that may happen when substituting placeholder values in tag values
#[snafu(display(
"Could not generate tag value for tag `{}`, caused by:\n{}",
key,
source
))]
CouldNotGenerateTagValue {
/// The key of the tag we couldn't create a value for
key: String,
/// Underlying `substitution` module error that caused this problem
source: crate::substitution::Error,
},
}
/// A generated tag value that will be used in a generated data point.
#[derive(Debug, Clone, PartialEq)]
pub struct Tag {
/// The key for the tag
pub key: String,
/// The value for the tag
pub value: String,
}
impl Tag {
/// Create a new tag with the given key and value.
pub fn new(key: impl Into<String>, value: impl Into<String>) -> Self {
Self {
key: key.into(),
value: value.into(),
}
}
}
/// A set of `count` tags that have the same configuration but different
/// `tag_id`s.
#[derive(Debug)]
pub struct TagGeneratorSet<T: DataGenRng> {
tags: Vec<TagGenerator<T>>,
}
impl<T: DataGenRng> TagGeneratorSet<T> {
/// Create a new set of tag generators for a particular agent, measurement,
/// and tag specification.
pub fn new(
agent_id: usize,
measurement_id: usize,
spec: &specification::TagSpec,
parent_seed: impl fmt::Display,
) -> Result<Self> {
let cardinality = spec.cardinality.unwrap_or(1);
let seed = format!("{}-{}", parent_seed, spec.name);
let tags = (0..cardinality)
.map(|cardinality| {
TagGenerator::new(agent_id, measurement_id, spec, cardinality, &seed)
})
.collect::<Result<_>>()?;
Ok(Self { tags })
}
/// Generate one set of tags
pub fn generate(&mut self) -> Result<Vec<Vec<Tag>>> {
self.tags.iter_mut().map(TagGenerator::generate).collect()
}
/// For tags that shouldn't be included in the multi cartesian product
/// because they have cardinality 1, this method takes the number of
/// lines needed, looks at whether this tag should be resampled or not,
/// and generates the number of lines worth of tags requested.
pub fn generate_to_zip(&mut self, num_lines: usize) -> Result<Vec<Vec<Tag>>> {
// This is a hack. A better way would be to have a different type for tags with
// cardinality = 1, and only that type has this method.
if self.tags.len() != 1 {
panic!("generate_to_zip is only for use with cardinality 1")
}
(&mut self.tags[0]).generate_to_zip(num_lines)
}
/// The cardinality of this tag configuration, used to figure out how many
/// rows each generation will create in total.
pub fn tag_cardinality(&self) -> usize {
self.tags.len()
}
}
#[derive(Debug)]
struct TagGenerator<T: DataGenRng> {
agent_id: String,
measurement_id: String,
tags: Vec<Tag>,
cardinality: u32,
counter: usize,
current_tick: usize,
increment_every: Option<usize>,
rng: RandomNumberGenerator<T>,
replacements: Vec<specification::Replacement>,
resample_every_line: bool,
}
impl<T: DataGenRng> TagGenerator<T> {
fn new(
agent_id: usize,
measurement_id: usize,
spec: &specification::TagSpec,
cardinality: u32,
parent_seed: impl fmt::Display,
) -> Result<Self> {
let count = spec.count.unwrap_or(1);
let increment_every = spec.increment_every;
let agent_id = agent_id.to_string();
let measurement_id = measurement_id.to_string();
let seed = format!("{}-{}-{}", parent_seed, spec.name, cardinality);
let rng = RandomNumberGenerator::<T>::new(seed);
let tags = (0..count)
.map(|tag_id| {
let key = Substitute::once(
&spec.name,
&[
("agent_id", &agent_id),
("measurement_id", &measurement_id),
("tag_id", &tag_id.to_string()),
],
)
.context(CouldNotCreateTagKey)?;
Ok(Tag {
key,
value: spec.value.clone(),
})
})
.collect::<Result<_>>()?;
Ok(Self {
agent_id,
measurement_id,
tags,
cardinality,
counter: 0,
current_tick: 0,
increment_every,
rng,
replacements: spec.replacements.clone(),
resample_every_line: spec.resample_every_line,
})
}
fn generate(&mut self) -> Result<Vec<Tag>> {
let counter = self.increment().to_string();
let cardinality_string = self.cardinality.to_string();
let guid = self.rng.guid().to_string();
let mut substitutions = pick_from_replacements(&mut self.rng, &self.replacements);
substitutions.insert("agent_id", &self.agent_id);
substitutions.insert("measurement_id", &self.measurement_id);
substitutions.insert("counter", &counter);
substitutions.insert("cardinality", &cardinality_string);
substitutions.insert("guid", &guid);
let substitutions: Vec<_> = substitutions.into_iter().collect();
self.tags
.iter()
.map(|tag| {
let key = tag.key.clone();
let value = Substitute::once(&tag.value, &substitutions)
.context(CouldNotGenerateTagValue { key: &key })?;
Ok(Tag { key, value })
})
.collect()
}
// if count and replacements/resampling could never be used on the same tag
// configuration, then this could return `Result<Vec<Tag>>` I think. This
// could also possibly return an iterator rather than a Vec; the measurement
// immediately iterates over it
fn generate_to_zip(&mut self, num_lines: usize) -> Result<Vec<Vec<Tag>>> {
if self.resample_every_line {
Ok((0..num_lines)
.map(|_| self.generate())
.collect::<Result<_>>()?)
} else {
let tags = self.generate()?;
Ok(std::iter::repeat(tags).take(num_lines).collect())
}
}
/// Returns the current value and potentially increments the counter for
/// next time.
fn increment(&mut self) -> usize {
let counter = self.counter;
if let Some(increment) = self.increment_every {
self.current_tick += 1;
if self.current_tick >= increment {
self.counter += 1;
self.current_tick = 0;
}
}
counter
}
}
/// Cycles through each value for each agent tag
pub struct AgentTagIterator {
iters: Vec<Box<dyn Iterator<Item = Tag>>>,
}
impl fmt::Debug for AgentTagIterator {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("AgentTagIterator")
.field("iters", &"(dynamic)")
.finish()
}
}
impl AgentTagIterator {
/// Create a new iterator to manage the cycling
pub fn new(agent_tags: &[specification::AgentTag]) -> Self {
Self {
iters: agent_tags
.iter()
.map(|agent_tag| {
boxed_cycling_iter(agent_tag.key.clone(), agent_tag.values.clone())
})
.collect(),
}
}
}
fn boxed_cycling_iter(key: String, values: Vec<String>) -> Box<dyn Iterator<Item = Tag>> {
Box::new(values.into_iter().cycle().map(move |v| Tag::new(&key, &v)))
}
impl Iterator for AgentTagIterator {
type Item = Vec<Tag>;
fn next(&mut self) -> Option<Self::Item> {
Some(self.iters.iter_mut().flat_map(|i| i.next()).collect())
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::{specification::*, ZeroRng, TEST_SEED};
#[test]
fn empty_agent_spec_tag_set_always_returns_empty_vec() {
let agent = AgentSpec {
tags: vec![],
..AgentSpec::default()
};
let mut iter = AgentTagIterator::new(&agent.tags);
assert_eq!(iter.next().unwrap(), vec![]);
}
#[test]
fn agent_spec_tag_set() {
let tag_alpha = toml::from_str(
r#"key = "alpha"
values = ["1", "2", "3"]"#,
)
.unwrap();
let tag_omega = toml::from_str(
r#"key = "omega"
values = ["apple", "grape"]"#,
)
.unwrap();
let agent = AgentSpec {
tags: vec![tag_alpha, tag_omega],
..AgentSpec::default()
};
let mut iter = AgentTagIterator::new(&agent.tags);
assert_eq!(
iter.next().unwrap(),
vec![Tag::new("alpha", "1"), Tag::new("omega", "apple"),]
);
assert_eq!(
iter.next().unwrap(),
vec![Tag::new("alpha", "2"), Tag::new("omega", "grape"),]
);
assert_eq!(
iter.next().unwrap(),
vec![Tag::new("alpha", "3"), Tag::new("omega", "apple"),]
);
assert_eq!(
iter.next().unwrap(),
vec![Tag::new("alpha", "1"), Tag::new("omega", "grape"),]
);
assert_eq!(
iter.next().unwrap(),
vec![Tag::new("alpha", "2"), Tag::new("omega", "apple"),]
);
assert_eq!(
iter.next().unwrap(),
vec![Tag::new("alpha", "3"), Tag::new("omega", "grape"),]
);
assert_eq!(
iter.next().unwrap(),
vec![Tag::new("alpha", "1"), Tag::new("omega", "apple"),]
);
}
#[test]
fn all_the_tag_substitutions_everywhere() -> Result<()> {
let spec = TagSpec {
name: "{{agent_id}}x{{measurement_id}}x{{tag_id}}".into(),
value: "{{agent_id}}v{{measurement_id}}v{{cardinality}}v{{counter}}".into(),
count: Some(2),
cardinality: Some(3),
increment_every: Some(1),
..Default::default()
};
let mut tg = TagGeneratorSet::<ZeroRng>::new(22, 33, &spec, TEST_SEED)?;
let tags = tg.generate()?;
assert_eq!(
vec![
vec![
Tag::new("22x33x0", "22v33v0v0"),
Tag::new("22x33x1", "22v33v0v0"),
],
vec![
Tag::new("22x33x0", "22v33v1v0"),
Tag::new("22x33x1", "22v33v1v0"),
],
vec![
Tag::new("22x33x0", "22v33v2v0"),
Tag::new("22x33x1", "22v33v2v0"),
],
],
tags
);
let tags = tg.generate()?;
assert_eq!(
vec![
vec![
Tag::new("22x33x0", "22v33v0v1"),
Tag::new("22x33x1", "22v33v0v1"),
],
vec![
Tag::new("22x33x0", "22v33v1v1"),
Tag::new("22x33x1", "22v33v1v1"),
],
vec![
Tag::new("22x33x0", "22v33v2v1"),
Tag::new("22x33x1", "22v33v2v1"),
],
],
tags
);
Ok(())
}
#[test]
fn string_replacements() -> Result<()> {
let host_tag_spec: specification::TagSpec = toml::from_str(
r#"name = "host"
value = "{{host}}"
replacements = [
{replace = "host", with = ["serverA", "serverB", "serverC", "serverD"]},
]"#,
)
.unwrap();
let mut tg = TagGeneratorSet::<ZeroRng>::new(22, 33, &host_tag_spec, TEST_SEED)?;
let tags = tg.generate()?;
assert_eq!(vec![vec![Tag::new("host", "serverA")]], tags);
Ok(())
}
#[test]
fn generate_to_zip_with_resample() -> Result<()> {
let host_tag_spec: specification::TagSpec = toml::from_str(
r#"name = "host"
value = "{{host}}"
replacements = [
{replace = "host", with = ["serverA", "serverB", "serverC", "serverD"]},
]
resample_every_line = true
"#,
)
.unwrap();
let mut tg = TagGeneratorSet::<ZeroRng>::new(22, 33, &host_tag_spec, TEST_SEED)?;
let tags = tg.generate_to_zip(3)?;
assert_eq!(
vec![
vec![Tag::new("host", "serverA")],
vec![Tag::new("host", "serverA")],
vec![Tag::new("host", "serverA")],
],
tags
);
Ok(())
}
#[test]
fn generate_to_zip_without_resample() -> Result<()> {
let host_tag_spec: specification::TagSpec = toml::from_str(
r#"name = "host"
value = "{{host}}"
replacements = [
{replace = "host", with = ["serverA", "serverB", "serverC", "serverD"]},
]
resample_every_line = false
"#,
)
.unwrap();
let mut tg = TagGeneratorSet::<ZeroRng>::new(22, 33, &host_tag_spec, TEST_SEED)?;
let tags = tg.generate_to_zip(3)?;
assert_eq!(
vec![
vec![Tag::new("host", "serverA")],
vec![Tag::new("host", "serverA")],
vec![Tag::new("host", "serverA")],
],
tags
);
Ok(())
}
#[test]
fn generate_to_zip_with_default_no_resample() -> Result<()> {
let host_tag_spec: specification::TagSpec = toml::from_str(
r#"name = "host"
value = "{{host}}"
replacements = [
{replace = "host", with = ["serverA", "serverB", "serverC", "serverD"]},
]"#,
)
.unwrap();
let mut tg = TagGeneratorSet::<ZeroRng>::new(22, 33, &host_tag_spec, TEST_SEED)?;
let tags = tg.generate_to_zip(3)?;
assert_eq!(
vec![
vec![Tag::new("host", "serverA")],
vec![Tag::new("host", "serverA")],
vec![Tag::new("host", "serverA")]
],
tags
);
Ok(())
}
}

View File

@ -0,0 +1,171 @@
//! Module for generating tag key/value pairs to be used in the data generator
use crate::specification::TagPairSpec;
use crate::substitution::new_handlebars_registry;
use handlebars::Handlebars;
use serde_json::{json, Value};
use snafu::{ResultExt, Snafu};
use std::fmt::Formatter;
use std::sync::{Arc, Mutex};
/// Results specific to the tag_pair module
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Errors that may happen while creating or regenerating tag pairs
#[derive(Snafu, Debug)]
pub enum Error {
#[snafu(display(
"Could not compile template for tag pair {} caused by: {}",
tag_key,
source
))]
CantCompileTemplate {
tag_key: String,
source: handlebars::TemplateError,
},
#[snafu(display(
"Could not render template for tag pair {}, cause by: {}",
tag_key,
source
))]
CantRenderTemplate {
tag_key: String,
source: handlebars::RenderError,
},
}
#[derive(Debug)]
pub enum TagPair {
Static(StaticTagPair),
Regenerating(Box<Mutex<RegeneratingTagPair>>),
}
impl TagPair {
pub fn pairs_from_specs(
specs: &[TagPairSpec],
mut template_data: Value,
) -> Result<Vec<Arc<Self>>> {
let tag_pairs: Vec<_> = specs
.iter()
.map(|tag_pair_spec| {
let tag_count = tag_pair_spec.count.unwrap_or(1);
let tags: Vec<_> = (1..tag_count + 1)
.map(|tag_id| {
let tag_key = if tag_id == 1 {
tag_pair_spec.key.to_string()
} else {
format!("{}{}", tag_pair_spec.key, tag_id)
};
let data = template_data.as_object_mut().expect("data must be object");
data.insert("id".to_string(), json!(tag_id));
data.insert("line_number".to_string(), json!(1));
let mut template = new_handlebars_registry();
template
.register_template_string(&tag_key, &tag_pair_spec.template)
.context(CantCompileTemplate {
tag_key: &tag_pair_spec.key,
})?;
let value = template
.render(&tag_key, &template_data)
.context(CantRenderTemplate { tag_key: &tag_key })?;
let tag_pair = StaticTagPair {
key: Arc::new(tag_key),
value: Arc::new(value),
};
let tag_pair = if let Some(regenerate_after_lines) =
tag_pair_spec.regenerate_after_lines
{
let regenerating_pair = RegeneratingTagPair {
regenerate_after_lines,
tag_pair,
template,
line_number: 0,
data: template_data.clone(),
};
Self::Regenerating(Box::new(Mutex::new(regenerating_pair)))
} else {
Self::Static(tag_pair)
};
Ok(Arc::new(tag_pair))
})
.collect::<Result<Vec<_>>>()?;
Ok(tags)
})
.collect::<Result<Vec<_>>>()?;
Ok(tag_pairs.into_iter().flatten().collect())
}
pub fn key(&self) -> String {
match self {
Self::Static(p) => p.key.to_string(),
Self::Regenerating(p) => {
let p = p.lock().expect("mutex poisoned");
p.tag_pair.key.to_string()
}
}
}
}
/// A tag key/value pair
#[derive(Debug, PartialEq, PartialOrd, Clone)]
pub struct StaticTagPair {
/// the key
pub key: Arc<String>,
/// the value
pub value: Arc<String>,
}
impl std::fmt::Display for StaticTagPair {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}={}", self.key, self.value)
}
}
/// Used for tag pairs specified in either an agent or measurement specification. The
/// spec must be kept around to support regenerating the tag pair.
#[derive(Debug, Clone)]
pub struct RegeneratingTagPair {
regenerate_after_lines: usize,
tag_pair: StaticTagPair,
template: Handlebars<'static>,
data: Value,
line_number: usize,
}
impl RegeneratingTagPair {
pub fn tag_pair(&mut self) -> &StaticTagPair {
self.line_number += 1;
if self.should_regenerate() {
let data = self.data.as_object_mut().expect("data must be object");
data.insert("line_number".to_string(), json!(self.line_number));
let value = self
.template
.render(self.tag_pair.key.as_str(), &self.data)
.expect("this template has been rendered before so this shouldn't be possible");
self.tag_pair = StaticTagPair {
key: Arc::clone(&self.tag_pair.key),
value: Arc::new(value),
};
}
&self.tag_pair
}
fn should_regenerate(&self) -> bool {
self.line_number % (self.regenerate_after_lines + 1) == 0
}
}

View File

@ -1,17 +1,17 @@
//! Code for defining values and tag sets with tags that are dependent on other tags.
use crate::specification::{DataSpec, ValuesSpec};
use crate::substitution::{FormatNowHelper, RandomHelper};
use crate::RandomNumberGenerator;
use crate::substitution::new_handlebars_registry;
use crate::tag_pair::StaticTagPair;
use handlebars::Handlebars;
use itertools::Itertools;
use serde_json::json;
use snafu::{OptionExt, ResultExt, Snafu};
use std::fmt::Formatter;
use std::sync::Arc;
/// Module for pre-generated values and tag sets that can be used when generating samples from
/// agents.
use std::{collections::BTreeMap, sync::Mutex};
use std::collections::BTreeMap;
use std::fmt::Formatter;
use std::sync::Arc;
/// Errors that may happen while reading a TOML specification.
#[derive(Snafu, Debug)]
@ -64,7 +64,7 @@ pub struct GeneratedValueCollection {
#[derive(Debug)]
pub struct GeneratedValue {
id: usize,
tag_pair: Arc<TagPair>,
tag_pair: Arc<StaticTagPair>,
}
/// All generated tag sets specified
@ -96,16 +96,7 @@ impl GeneratedTagSets {
/// Generate tag sets from a `DataSpec`
pub fn from_spec(spec: &DataSpec) -> Result<Self> {
let mut generated_tag_sets = Self::default();
let rng: RandomNumberGenerator<rand::rngs::SmallRng> = RandomNumberGenerator::new(
spec.base_seed
.to_owned()
.unwrap_or_else(|| "default seed".to_string()),
);
let random_helper = RandomHelper::new(Mutex::new(rng));
let mut template = Handlebars::new();
template.register_helper("format-time", Box::new(FormatNowHelper));
template.register_helper("random", Box::new(random_helper));
let mut template = new_handlebars_registry();
let mut leftover_specs = -1;
@ -183,7 +174,7 @@ impl GeneratedTagSets {
// iteration and then just do a single clone at the very end.
let mut tag_pairs: Vec<_> = (0..for_each.len())
.map(|_| {
Arc::new(TagPair {
Arc::new(StaticTagPair {
key: Arc::new("default".to_string()),
value: Arc::new("default".to_string()),
})
@ -200,7 +191,7 @@ impl GeneratedTagSets {
&self,
parent_id: Option<usize>,
keys: &[Key<'_>],
tag_pairs: &mut Vec<Arc<TagPair>>,
tag_pairs: &mut Vec<Arc<StaticTagPair>>,
position: usize,
) -> Result<Vec<TagSet>> {
let key = &keys[position];
@ -325,7 +316,7 @@ impl GeneratedTagSets {
vals.push(Arc::new(GeneratedValue {
id: i,
tag_pair: Arc::new(TagPair {
tag_pair: Arc::new(StaticTagPair {
key: Arc::clone(&tag_key),
value,
}),
@ -398,7 +389,7 @@ impl GeneratedTagSets {
let data = json!({
belongs_to: {
"id": parent.id,
"value": &parent.tag_pair.value,
"value": &parent.tag_pair.value.as_ref(),
},
"id": child_value_id,
});
@ -413,7 +404,7 @@ impl GeneratedTagSets {
let child_value = Arc::new(GeneratedValue {
id: child_value_id,
tag_pair: Arc::new(TagPair {
tag_pair: Arc::new(StaticTagPair {
key: Arc::clone(&tag_key),
value,
}),
@ -453,11 +444,11 @@ fn has_one_values_key(parent: &str, child: &str) -> String {
#[derive(Debug)]
pub struct TagSet {
/// The tags in the set
pub tags: Vec<Arc<TagPair>>,
pub tags: Vec<Arc<StaticTagPair>>,
}
impl TagSet {
fn new(tags: Vec<Arc<TagPair>>) -> Self {
fn new(tags: Vec<Arc<StaticTagPair>>) -> Self {
Self { tags }
}
}
@ -469,21 +460,6 @@ impl std::fmt::Display for TagSet {
}
}
/// A tag key/value pair
#[derive(Debug, PartialEq, PartialOrd)]
pub struct TagPair {
/// the key
pub key: Arc<String>,
/// the value
pub value: Arc<String>,
}
impl std::fmt::Display for TagPair {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}={}", self.key, self.value)
}
}
#[cfg(test)]
mod test {
use super::*;
@ -493,7 +469,6 @@ mod test {
fn generate_tag_sets_basic() {
let toml = r#"
name = "demo"
base_seed = "foo"
[[values]]
name = "foo"
@ -505,7 +480,6 @@ name = "testage"
for_each = ["foo"]
[[agents]]
name = "basic"
[[agents.measurements]]
name = "cpu"
@ -530,7 +504,6 @@ foo=3#foo"#;
fn generate_tag_sets_belongs_to() {
let toml = r#"
name = "demo"
base_seed = "foo"
[[values]]
name = "foo"
@ -551,7 +524,6 @@ for_each = [
]
[[agents]]
name = "basic"
[[agents.measurements]]
name = "cpu"
@ -577,7 +549,6 @@ bar=4-2-2#foo,foo=2#foo"#;
fn generate_tag_sets_test() {
let toml = r#"
name = "demo"
base_seed = "foo"
[[values]]
name = "foo"
@ -618,7 +589,6 @@ for_each = [
]
[[agents]]
name = "basic"
[[agents.measurements]]
name = "cpu"

View File

@ -1,7 +1,8 @@
//! Writing generated points
use crate::measurement::LineToGenerate;
use futures::stream;
use influxdb2_client::models::{DataPoint, PostBucketRequest, WriteDataPoint};
use influxdb2_client::models::{PostBucketRequest, WriteDataPoint};
use snafu::{ensure, OptionExt, ResultExt, Snafu};
#[cfg(test)]
use std::{
@ -103,7 +104,7 @@ enum PointsWriterConfig {
perform_write: bool,
},
#[cfg(test)]
Vector(BTreeMap<String, Arc<Mutex<Vec<u8>>>>),
Vector(BTreeMap<usize, Arc<Mutex<Vec<u8>>>>),
Stdout,
}
@ -183,7 +184,7 @@ impl PointsWriterBuilder {
/// Create a writer out of this writer's configuration for a particular
/// agent that runs in a separate thread/task.
pub fn build_for_agent(&mut self, agent_name: &str) -> Result<PointsWriter> {
pub fn build_for_agent(&mut self, id: usize) -> Result<PointsWriter> {
let inner_writer = match &mut self.config {
PointsWriterConfig::Api {
client,
@ -196,7 +197,7 @@ impl PointsWriterBuilder {
},
PointsWriterConfig::Directory(dir_path) => {
let mut filename = dir_path.clone();
filename.push(agent_name);
filename.push(format!("agent_{}", id));
filename.set_extension("txt");
let file = OpenOptions::new()
@ -215,7 +216,7 @@ impl PointsWriterBuilder {
#[cfg(test)]
PointsWriterConfig::Vector(ref mut agents_by_name) => {
let v = agents_by_name
.entry(agent_name.to_string())
.entry(id)
.or_insert_with(|| Arc::new(Mutex::new(Vec::new())));
InnerPointsWriter::Vec(Arc::clone(v))
}
@ -234,7 +235,10 @@ pub struct PointsWriter {
impl PointsWriter {
/// Write these points
pub async fn write_points(&mut self, points: Vec<DataPoint>) -> Result<()> {
pub async fn write_points(
&mut self,
points: impl Iterator<Item = LineToGenerate> + Send + Sync + 'static,
) -> Result<()> {
self.inner_writer.write_points(points).await
}
}
@ -258,7 +262,10 @@ enum InnerPointsWriter {
}
impl InnerPointsWriter {
async fn write_points(&mut self, points: Vec<DataPoint>) -> Result<()> {
async fn write_points(
&mut self,
points: impl Iterator<Item = LineToGenerate> + Send + Sync + 'static,
) -> Result<()> {
match self {
Self::Api {
client,
@ -313,7 +320,7 @@ impl InnerPointsWriter {
#[cfg(test)]
mod test {
use super::*;
use crate::{generate, now_ns, specification::*, ZeroRng};
use crate::{generate, now_ns, specification::*};
use std::str::FromStr;
type Error = Box<dyn std::error::Error>;
@ -326,11 +333,11 @@ mod test {
}
}
fn written_data(self, agent_name: &str) -> String {
fn written_data(self, agent_id: usize) -> String {
match self.config {
PointsWriterConfig::Vector(agents_by_name) => {
let bytes_ref =
Arc::clone(agents_by_name.get(agent_name).expect(
Arc::clone(agents_by_name.get(&agent_id).expect(
"Should have written some data, did not find any for this agent",
));
let bytes = bytes_ref
@ -347,24 +354,22 @@ mod test {
async fn test_generate() -> Result<()> {
let toml = r#"
name = "demo_schema"
base_seed = "this is a demo"
[[agents]]
name = "basic"
[[agents.measurements]]
name = "cpu"
[[agents.measurements.fields]]
name = "up"
bool = true"#;
name = "val"
i64_range = [3,3]"#;
let data_spec = DataSpec::from_str(toml).unwrap();
let mut points_writer_builder = PointsWriterBuilder::new_vec();
let now = now_ns();
generate::<ZeroRng>(
generate(
&data_spec,
&mut points_writer_builder,
Some(now),
@ -376,10 +381,10 @@ bool = true"#;
)
.await?;
let line_protocol = points_writer_builder.written_data("basic");
let line_protocol = points_writer_builder.written_data(0);
let expected_line_protocol = format!(
r#"cpu,data_spec=demo_schema up=f {}
r#"cpu val=3i {}
"#,
now
);
@ -392,25 +397,23 @@ bool = true"#;
async fn test_generate_batches() -> Result<()> {
let toml = r#"
name = "demo_schema"
base_seed = "this is a demo"
[[agents]]
name = "basic"
sampling_interval = "1s" # seconds
[[agents.measurements]]
name = "cpu"
[[agents.measurements.fields]]
name = "up"
bool = true"#;
name = "val"
i64_range = [2, 2]"#;
let data_spec = DataSpec::from_str(toml).unwrap();
let mut points_writer_builder = PointsWriterBuilder::new_vec();
let now = now_ns();
generate::<ZeroRng>(
generate(
&data_spec,
&mut points_writer_builder,
Some(now - 1_000_000_000),
@ -422,11 +425,11 @@ bool = true"#;
)
.await?;
let line_protocol = points_writer_builder.written_data("basic");
let line_protocol = points_writer_builder.written_data(0);
let expected_line_protocol = format!(
r#"cpu,data_spec=demo_schema up=f {}
cpu,data_spec=demo_schema up=f {}
r#"cpu val=2i {}
cpu val=2i {}
"#,
now - 1_000_000_000,
now