feat: Add tag_set and tag_pairs to measurements in Data Generator
This adds the ability to specify a tag_set and a collection of tag_pairs to measurements in the data generator. Tag pairs are evaluated once when the generator is created. This avoids re-running handlebars evaluations while generating data for tags that don't change value. This commit also fixes an issue when printing the generation output to stdout while generating from more than one agent. Previously it would be garbled together. Follow on PRs will update the tag generation code in measurement specs to be more consistent and optimzised for performance. I'll be removing the restriction of using different options while using tag_set and tag_pairs. I wanted to get this in first to show the structure of what is output.pull/24376/head
parent
07ba629e2b
commit
db2f8a58fc
|
|
@ -7,6 +7,7 @@ use iox_data_generator::{
|
|||
pub fn single_agent(c: &mut Criterion) {
|
||||
let spec = DataSpec {
|
||||
base_seed: Some("faster faster faster".into()),
|
||||
include_spec_tag: Some(true),
|
||||
name: "benchmark".into(),
|
||||
values: vec![],
|
||||
tag_sets: vec![],
|
||||
|
|
@ -25,6 +26,8 @@ pub fn single_agent(c: &mut Criterion) {
|
|||
field_value_spec: FieldValueSpec::Bool(true),
|
||||
count: None,
|
||||
}],
|
||||
tag_set: None,
|
||||
tag_pairs: vec![],
|
||||
}],
|
||||
}],
|
||||
};
|
||||
|
|
@ -52,6 +55,7 @@ pub fn single_agent(c: &mut Criterion) {
|
|||
0,
|
||||
false,
|
||||
1,
|
||||
false,
|
||||
)
|
||||
});
|
||||
let n_points = r.expect("Could not generate data");
|
||||
|
|
|
|||
|
|
@ -1,9 +1,16 @@
|
|||
name = "storage_cardinality_example"
|
||||
base_seed = "this is a demo"
|
||||
|
||||
# Values are automatically generated before the agents are intialized. They generate tag key/value pairs
|
||||
# with the name of the value as the tag key and the evaluated template as the value. These pairs
|
||||
# are Arc wrapped so they can be shared across tagsets and used in the agents as pre-generated data.
|
||||
[[values]]
|
||||
# the name must not have a . in it, which is used to access children later. Otherwise it's open.
|
||||
name = "role"
|
||||
# the template can use a number of helpers to get an id, a random string and the name, see below for examples
|
||||
template = "storage"
|
||||
# this number of tag pairs will be generated. If this is > 1, the id or a random character string should be
|
||||
# used in the template to ensure that the tag key/value pairs are unique.
|
||||
cardinality = 1
|
||||
|
||||
[[values]]
|
||||
|
|
@ -15,7 +22,7 @@ cardinality = 1
|
|||
name = "org_id"
|
||||
# Fill in the value with the cardinality counter and 15 random alphanumeric characters
|
||||
template = "{{id}}_{{random 15}}"
|
||||
cardinality = 1
|
||||
cardinality = 2
|
||||
has_one = ["env"]
|
||||
|
||||
[[values]]
|
||||
|
|
@ -25,36 +32,22 @@ cardinality = 10
|
|||
|
||||
[[values]]
|
||||
name = "bucket_id"
|
||||
# a bucket belongs to an org. With this, you would be able to access the org.id or org.value in the template
|
||||
belongs_to = "org_id"
|
||||
# each bucket will have a unique value so it can be used here to guarantee uniqueness even across orgs
|
||||
# each bucket will have a unique id, which is used here to guarantee uniqueness even across orgs. We also
|
||||
# have a random 15 character alphanumeric sequence to pad out the value length.
|
||||
template = "{{id}}_{{random 15}}"
|
||||
# For each org, 3 buckets will be generated
|
||||
cardinality = 3
|
||||
|
||||
[[values]]
|
||||
name = "node_id"
|
||||
template = "{{id}}"
|
||||
cardinality = 100
|
||||
|
||||
[[values]]
|
||||
name = "host"
|
||||
template = "storage-{{node_id.value}}"
|
||||
belongs_to = "node_id"
|
||||
cardinality = 1
|
||||
|
||||
[[values]]
|
||||
name = "hostname"
|
||||
template = "{{node_id.value}}"
|
||||
belongs_to = "node_id"
|
||||
cardinality = 1
|
||||
|
||||
[[values]]
|
||||
name = "partition_id"
|
||||
template = "{{id}}"
|
||||
cardinality = 10
|
||||
|
||||
# makes a tagset so every bucket exists on every node with every partition.
|
||||
# So count(bucket) * count(node) * count(partition) cardinality.
|
||||
# makes a tagset so every bucket appears in every partition. The other tags are descriptive and don't
|
||||
# increase the cardiality beyond count(bucket) * count(partition). Later this example will use the
|
||||
# agent and measurement generation to take this base tagset and increase cardinality on a per-agent basis.
|
||||
[[tag_sets]]
|
||||
name = "bucket_set"
|
||||
for_each = [
|
||||
|
|
@ -63,19 +56,24 @@ for_each = [
|
|||
"org_id",
|
||||
"org_id.env",
|
||||
"org_id.bucket_id",
|
||||
"node_id",
|
||||
"node_id.hostname",
|
||||
"node_id.host",
|
||||
"partition_id",
|
||||
]
|
||||
|
||||
[[agents]]
|
||||
name = "metric-scraper"
|
||||
# sampling_interval = "1s"
|
||||
# create this many agents
|
||||
count = 3
|
||||
|
||||
[[agents.measurements]]
|
||||
name = "storage_usage_bucket_cardinality"
|
||||
# TODO: new syntax to make use of tag sets here
|
||||
# each sampling will have all the tag sets from this collection in addition to the tags and tag_pairs specified
|
||||
tag_set = "bucket_set"
|
||||
# for each agent, this specific measurement will be decorated with these additional tags.
|
||||
tag_pairs = [
|
||||
{key = "node_id", template = "{{agent.id}}"},
|
||||
{key = "hostname", template = "{{agent.id}}"},
|
||||
{key = "host", template = "storage-{{agent.id}}"},
|
||||
]
|
||||
|
||||
[[agents.measurements.fields]]
|
||||
name = "gauge"
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ use crate::{
|
|||
DataGenRng, RandomNumberGenerator,
|
||||
};
|
||||
|
||||
use crate::tag_set::GeneratedTagSets;
|
||||
use influxdb2_client::models::DataPoint;
|
||||
use snafu::{ResultExt, Snafu};
|
||||
use std::{fmt, time::Duration};
|
||||
|
|
@ -86,6 +87,7 @@ impl<T: DataGenRng> Agent<T> {
|
|||
end_datetime: Option<i64>, // also in nanoseconds since the epoch, defaults to now
|
||||
execution_start_time: i64,
|
||||
continue_on: bool, // If true, run in "continue" mode after historical data is generated
|
||||
generated_tag_sets: &GeneratedTagSets,
|
||||
) -> Result<Self> {
|
||||
let name = agent_name.into();
|
||||
// Will agents actually need rngs? Might just need seeds...
|
||||
|
|
@ -103,6 +105,7 @@ impl<T: DataGenRng> Agent<T> {
|
|||
&seed,
|
||||
&agent_tags,
|
||||
execution_start_time,
|
||||
generated_tag_sets,
|
||||
)
|
||||
})
|
||||
.collect::<crate::measurement::Result<_>>()
|
||||
|
|
@ -240,11 +243,22 @@ mod test {
|
|||
},
|
||||
count: Some(2),
|
||||
}],
|
||||
tag_pairs: vec![],
|
||||
tag_set: None,
|
||||
};
|
||||
|
||||
let measurement_generator_set =
|
||||
MeasurementGeneratorSet::new("test", 42, &measurement_spec, "spec-test", &[], 0)
|
||||
.unwrap();
|
||||
let generated_tag_sets = GeneratedTagSets::default();
|
||||
|
||||
let measurement_generator_set = MeasurementGeneratorSet::new(
|
||||
"test",
|
||||
42,
|
||||
&measurement_spec,
|
||||
"spec-test",
|
||||
&[],
|
||||
0,
|
||||
&generated_tag_sets,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
Self {
|
||||
agent_id: 0,
|
||||
|
|
|
|||
|
|
@ -202,6 +202,7 @@ Logging:
|
|||
execution_start_time.timestamp_nanos(),
|
||||
continue_on,
|
||||
batch_size,
|
||||
disable_log_output,
|
||||
)
|
||||
.await;
|
||||
|
||||
|
|
|
|||
|
|
@ -28,9 +28,11 @@
|
|||
)]
|
||||
|
||||
use crate::substitution::Substitute;
|
||||
use crate::tag_set::GeneratedTagSets;
|
||||
use rand::Rng;
|
||||
use rand_seeder::Seeder;
|
||||
use snafu::{ResultExt, Snafu};
|
||||
use std::sync::Arc;
|
||||
use std::{
|
||||
convert::TryFrom,
|
||||
time::{SystemTime, UNIX_EPOCH},
|
||||
|
|
@ -86,6 +88,13 @@ pub enum Error {
|
|||
/// Underlying `write` module error that caused this problem
|
||||
source: write::Error,
|
||||
},
|
||||
|
||||
/// Error generating tags sets
|
||||
#[snafu(display("Error generating tag sets prior to creating agents: \n{}", source))]
|
||||
CouldNotGenerateTagSets {
|
||||
/// Underlying `tag_set` module error
|
||||
source: tag_set::Error,
|
||||
},
|
||||
}
|
||||
|
||||
type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
|
|
@ -96,6 +105,7 @@ type Result<T, E = Error> = std::result::Result<T, E>;
|
|||
///
|
||||
/// If `start_datetime` or `end_datetime` are `None`, the current datetime will
|
||||
/// be used.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn generate<T: DataGenRng>(
|
||||
spec: &specification::DataSpec,
|
||||
points_writer_builder: &mut write::PointsWriterBuilder,
|
||||
|
|
@ -104,6 +114,7 @@ pub async fn generate<T: DataGenRng>(
|
|||
execution_start_time: i64,
|
||||
continue_on: bool,
|
||||
batch_size: usize,
|
||||
one_agent_at_a_time: bool, // run one agent after another, if printing to stdout
|
||||
) -> Result<usize> {
|
||||
let seed = spec.base_seed.to_owned().unwrap_or_else(|| {
|
||||
let mut rng = rand::thread_rng();
|
||||
|
|
@ -112,6 +123,10 @@ pub async fn generate<T: DataGenRng>(
|
|||
|
||||
let mut handles = vec![];
|
||||
|
||||
let generated_tag_sets = GeneratedTagSets::from_spec(spec).context(CouldNotGenerateTagSets)?;
|
||||
|
||||
let lock = Arc::new(tokio::sync::Mutex::new(()));
|
||||
|
||||
// for each agent specification
|
||||
for agent_spec in &spec.agents {
|
||||
// create iterators to `cycle` through for `agent_spec.tags`
|
||||
|
|
@ -141,6 +156,7 @@ pub async fn generate<T: DataGenRng>(
|
|||
end_datetime,
|
||||
execution_start_time,
|
||||
continue_on,
|
||||
&generated_tag_sets,
|
||||
)
|
||||
.context(CouldNotCreateAgent { name: &agent_name })?;
|
||||
|
||||
|
|
@ -148,8 +164,16 @@ pub async fn generate<T: DataGenRng>(
|
|||
.build_for_agent(&agent_name)
|
||||
.context(CouldNotCreateAgentWriter { name: &agent_name })?;
|
||||
|
||||
let lock_ref = Arc::clone(&lock);
|
||||
|
||||
handles.push(tokio::task::spawn(async move {
|
||||
agent.generate_all(agent_points_writer, batch_size).await
|
||||
// did this weird hack because otherwise the stdout outputs would be jumbled together garbage
|
||||
if one_agent_at_a_time {
|
||||
let _l = lock_ref.lock().await;
|
||||
agent.generate_all(agent_points_writer, batch_size).await
|
||||
} else {
|
||||
agent.generate_all(agent_points_writer, batch_size).await
|
||||
}
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
|
@ -325,6 +349,8 @@ bool = true"#;
|
|||
// for the first 15 seconds of the year
|
||||
let end_datetime = Some(15 * 1_000_000_000);
|
||||
|
||||
let generated_tag_sets = GeneratedTagSets::default();
|
||||
|
||||
let mut agent = agent::Agent::<ZeroRng>::new(
|
||||
agent_spec,
|
||||
&agent_spec.name,
|
||||
|
|
@ -335,6 +361,7 @@ bool = true"#;
|
|||
end_datetime,
|
||||
execution_start_time,
|
||||
false,
|
||||
&generated_tag_sets,
|
||||
)?;
|
||||
|
||||
let data_points = agent.generate().await?;
|
||||
|
|
|
|||
|
|
@ -8,10 +8,15 @@ use crate::{
|
|||
DataGenRng, RandomNumberGenerator,
|
||||
};
|
||||
|
||||
use crate::substitution::{FormatNowHelper, RandomHelper};
|
||||
use crate::tag_set::{GeneratedTagSets, TagPair, TagSet};
|
||||
use handlebars::Handlebars;
|
||||
use influxdb2_client::models::DataPoint;
|
||||
use itertools::Itertools;
|
||||
use snafu::{ResultExt, Snafu};
|
||||
use serde_json::json;
|
||||
use snafu::{OptionExt, ResultExt, Snafu};
|
||||
use std::fmt;
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
/// Measurement-specific Results
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
|
|
@ -78,6 +83,41 @@ pub enum Error {
|
|||
/// Underlying `tag` module error that caused this problem
|
||||
source: crate::tag::Error,
|
||||
},
|
||||
|
||||
/// Error generated if tag_set and tag_pair are used with any other options
|
||||
#[snafu(display("If using either tag_set or tag_pair in a measurement spec, other options are not currently supported"))]
|
||||
TagSetTagPairOnly,
|
||||
|
||||
/// Error generated if specifying a tag_set that wasn't generated
|
||||
#[snafu(display(
|
||||
"Tag set {} referenced not found for measurement {}",
|
||||
tag_set,
|
||||
measurement
|
||||
))]
|
||||
GeneratedTagSetNotFound {
|
||||
/// The name of the tag set
|
||||
tag_set: String,
|
||||
/// The name of the measurement it is being associated with
|
||||
measurement: String,
|
||||
},
|
||||
|
||||
/// Error that may happen when compiling a template from the values specification
|
||||
#[snafu(display("Could not compile template `{}`, caused by:\n{}", template, source))]
|
||||
CantCompileTemplate {
|
||||
/// Underlying Handlebars error that caused this problem
|
||||
source: handlebars::TemplateError,
|
||||
/// Template that caused this problem
|
||||
template: String,
|
||||
},
|
||||
|
||||
/// Error that may happen when rendering a template with passed in data
|
||||
#[snafu(display("Could not render template `{}`, caused by:\n{}", template, source))]
|
||||
CantRenderTemplate {
|
||||
/// Underlying Handlebars error that caused this problem
|
||||
source: handlebars::RenderError,
|
||||
/// Template that caused this problem
|
||||
template: String,
|
||||
},
|
||||
}
|
||||
|
||||
/// A set of `count` measurements that have the same configuration but different
|
||||
|
|
@ -98,6 +138,7 @@ impl<T: DataGenRng> MeasurementGeneratorSet<T> {
|
|||
parent_seed: impl fmt::Display,
|
||||
static_tags: &[Tag],
|
||||
execution_start_time: i64,
|
||||
generated_tag_sets: &GeneratedTagSets,
|
||||
) -> Result<Self> {
|
||||
let count = spec.count.unwrap_or(1);
|
||||
|
||||
|
|
@ -111,6 +152,7 @@ impl<T: DataGenRng> MeasurementGeneratorSet<T> {
|
|||
&parent_seed,
|
||||
static_tags,
|
||||
execution_start_time,
|
||||
generated_tag_sets,
|
||||
)
|
||||
})
|
||||
.collect::<Result<_>>()?;
|
||||
|
|
@ -142,10 +184,12 @@ pub struct MeasurementGenerator<T: DataGenRng> {
|
|||
total_tag_cardinality: usize,
|
||||
field_generator_sets: Vec<FieldGeneratorSet>,
|
||||
count: usize,
|
||||
pre_generated_tags: Option<PreGeneratedTags>,
|
||||
}
|
||||
|
||||
impl<T: DataGenRng> MeasurementGenerator<T> {
|
||||
/// Create a new way to generate measurements from a specification
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn new(
|
||||
agent_name: impl Into<String>,
|
||||
agent_id: usize,
|
||||
|
|
@ -154,7 +198,16 @@ impl<T: DataGenRng> MeasurementGenerator<T> {
|
|||
parent_seed: impl fmt::Display,
|
||||
static_tags: &[Tag],
|
||||
execution_start_time: i64,
|
||||
generated_tag_sets: &GeneratedTagSets,
|
||||
) -> Result<Self> {
|
||||
// For now the config will only support the use of either tag_set and tag_pair or all
|
||||
// the other options. After some refactoring the other options will be brought in.
|
||||
if (spec.tag_set.is_some() || !spec.tag_pairs.is_empty())
|
||||
&& (spec.count.is_some() || !spec.tags.is_empty())
|
||||
{
|
||||
return TagSetTagPairOnly.fail();
|
||||
}
|
||||
|
||||
let agent_name = agent_name.into();
|
||||
let spec_name = Substitute::once(
|
||||
&spec.name,
|
||||
|
|
@ -167,7 +220,7 @@ impl<T: DataGenRng> MeasurementGenerator<T> {
|
|||
.context(CouldNotCreateMeasurementName)?;
|
||||
|
||||
let seed = format!("{}-{}", parent_seed, spec_name);
|
||||
let rng = RandomNumberGenerator::<T>::new(seed);
|
||||
let rng = RandomNumberGenerator::<T>::new(seed.clone());
|
||||
|
||||
let tag_generator_sets: Vec<TagGeneratorSet<T>> = spec
|
||||
.tags
|
||||
|
|
@ -197,6 +250,66 @@ impl<T: DataGenRng> MeasurementGenerator<T> {
|
|||
.collect::<crate::field::Result<_>>()
|
||||
.context(CouldNotCreateFieldGeneratorSets { name: &spec_name })?;
|
||||
|
||||
// generate the tag pairs
|
||||
let random_helper = RandomHelper::new(Mutex::new(RandomNumberGenerator::<T>::new(seed)));
|
||||
let mut template = Handlebars::new();
|
||||
template.register_helper("format-time", Box::new(FormatNowHelper));
|
||||
template.register_helper("random", Box::new(random_helper));
|
||||
let template_data = json!({
|
||||
"agent": {"id": agent_id, "name": &agent_name},
|
||||
"measurement": {"id": measurement_id, "name": &spec_name},
|
||||
});
|
||||
|
||||
// TODO: refactor this structure to have tag_set and tag_pairs work more cleanly
|
||||
// with the other options.
|
||||
let tag_pairs: Vec<_> = spec
|
||||
.tag_pairs
|
||||
.iter()
|
||||
.map(|s| {
|
||||
template
|
||||
.register_template_string(&s.key, &s.template)
|
||||
.context(CantCompileTemplate {
|
||||
template: &spec.name,
|
||||
})?;
|
||||
let value =
|
||||
template
|
||||
.render(&s.key, &template_data)
|
||||
.context(CantRenderTemplate {
|
||||
template: &spec.name,
|
||||
})?;
|
||||
|
||||
Ok(Arc::new(TagPair {
|
||||
key: Arc::new(s.key.to_string()),
|
||||
value: Arc::new(value),
|
||||
}))
|
||||
})
|
||||
.collect::<Result<Vec<Arc<TagPair>>>>()?;
|
||||
|
||||
let pre_generated_tags = match &spec.tag_set {
|
||||
Some(t) => {
|
||||
let generated_tag_sets = Arc::clone(generated_tag_sets.sets_for(t).context(
|
||||
GeneratedTagSetNotFound {
|
||||
tag_set: t,
|
||||
measurement: &spec_name,
|
||||
},
|
||||
)?);
|
||||
Some(PreGeneratedTags {
|
||||
tag_pairs,
|
||||
generated_tag_sets,
|
||||
})
|
||||
}
|
||||
None => {
|
||||
if tag_pairs.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(PreGeneratedTags {
|
||||
tag_pairs,
|
||||
generated_tag_sets: Arc::new(vec![]),
|
||||
})
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
Ok(Self {
|
||||
rng,
|
||||
name: spec_name,
|
||||
|
|
@ -205,12 +318,17 @@ impl<T: DataGenRng> MeasurementGenerator<T> {
|
|||
total_tag_cardinality,
|
||||
field_generator_sets,
|
||||
count: spec.count.unwrap_or(1),
|
||||
pre_generated_tags,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: DataGenRng> MeasurementGenerator<T> {
|
||||
fn generate(&mut self, timestamp: i64) -> Result<Vec<DataPoint>> {
|
||||
if self.pre_generated_tags.is_some() {
|
||||
return self.generate_with_pre_generated_tags(timestamp);
|
||||
}
|
||||
|
||||
// Split out the tags that we want all combinations of. Perhaps these should be
|
||||
// a different type?
|
||||
let mut tags_with_cardinality: Vec<_> = itertools::process_results(
|
||||
|
|
@ -286,7 +404,49 @@ impl<T: DataGenRng> MeasurementGenerator<T> {
|
|||
tags_with_cardinality
|
||||
.iter()
|
||||
.map(|tags| self.one(&tags[..], timestamp))
|
||||
.collect()
|
||||
.collect::<Result<Vec<DataPoint>>>()
|
||||
}
|
||||
|
||||
fn generate_with_pre_generated_tags(&mut self, timestamp: i64) -> Result<Vec<DataPoint>> {
|
||||
let pregen = self.pre_generated_tags.as_ref().unwrap();
|
||||
|
||||
let point_builders: Vec<_> = pregen
|
||||
.generated_tag_sets
|
||||
.iter()
|
||||
.map(|ts| {
|
||||
let mut point = DataPoint::builder(&self.name);
|
||||
|
||||
for tag in &ts.tags {
|
||||
point = point.tag(tag.key.to_string(), tag.value.to_string());
|
||||
}
|
||||
|
||||
for tag in &pregen.tag_pairs {
|
||||
point = point.tag(tag.key.to_string(), tag.value.to_string());
|
||||
}
|
||||
|
||||
point
|
||||
})
|
||||
.collect();
|
||||
|
||||
let mut points = Vec::with_capacity(point_builders.len());
|
||||
|
||||
for mut pb in point_builders {
|
||||
for fgs in &mut self.field_generator_sets {
|
||||
for field in fgs.generate(timestamp) {
|
||||
pb = pb.field(&field.key, field.value);
|
||||
}
|
||||
}
|
||||
|
||||
pb = pb.timestamp(timestamp);
|
||||
|
||||
let point = pb
|
||||
.build()
|
||||
.context(InfluxDataPointError { name: &self.name })?;
|
||||
|
||||
points.push(point);
|
||||
}
|
||||
|
||||
Ok(points)
|
||||
}
|
||||
|
||||
fn one(&mut self, tags: &[Tag], timestamp: i64) -> Result<DataPoint> {
|
||||
|
|
@ -315,6 +475,12 @@ impl<T: DataGenRng> MeasurementGenerator<T> {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct PreGeneratedTags {
|
||||
tag_pairs: Vec<Arc<TagPair>>,
|
||||
generated_tag_sets: Arc<Vec<TagSet>>,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
|
|
@ -361,8 +527,12 @@ mod test {
|
|||
},
|
||||
count: None,
|
||||
}],
|
||||
tag_set: None,
|
||||
tag_pairs: vec![],
|
||||
};
|
||||
|
||||
let generated_tag_sets = GeneratedTagSets::default();
|
||||
|
||||
let mut measurement_generator = MeasurementGenerator::<ZeroRng>::new(
|
||||
"agent_name",
|
||||
0,
|
||||
|
|
@ -371,6 +541,7 @@ mod test {
|
|||
TEST_SEED,
|
||||
&[],
|
||||
fake_now,
|
||||
&generated_tag_sets,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
|
|
@ -409,8 +580,12 @@ mod test {
|
|||
count: None,
|
||||
},
|
||||
],
|
||||
tag_set: None,
|
||||
tag_pairs: vec![],
|
||||
};
|
||||
|
||||
let generated_tag_sets = GeneratedTagSets::default();
|
||||
|
||||
let mut measurement_generator = MeasurementGenerator::<DynamicRng>::new(
|
||||
"agent_name",
|
||||
0,
|
||||
|
|
@ -419,6 +594,7 @@ mod test {
|
|||
TEST_SEED,
|
||||
&[],
|
||||
fake_now,
|
||||
&generated_tag_sets,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
|
|
@ -451,10 +627,14 @@ mod test {
|
|||
},
|
||||
count: None,
|
||||
}],
|
||||
tag_pairs: vec![],
|
||||
tag_set: None,
|
||||
};
|
||||
|
||||
let always_tags = vec![Tag::new("my_tag", "my_val")];
|
||||
|
||||
let generated_tag_sets = GeneratedTagSets::default();
|
||||
|
||||
let mut measurement_generator = MeasurementGenerator::<ZeroRng>::new(
|
||||
"agent_name",
|
||||
0,
|
||||
|
|
@ -463,6 +643,7 @@ mod test {
|
|||
TEST_SEED,
|
||||
&always_tags,
|
||||
fake_now,
|
||||
&generated_tag_sets,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
|
|
@ -501,6 +682,8 @@ mod test {
|
|||
..Default::default()
|
||||
};
|
||||
|
||||
let generated_tag_sets = GeneratedTagSets::default();
|
||||
|
||||
let mut measurement_generator = MeasurementGenerator::<ZeroRng>::new(
|
||||
"agent_name",
|
||||
0,
|
||||
|
|
@ -509,6 +692,7 @@ mod test {
|
|||
TEST_SEED,
|
||||
&[],
|
||||
fake_now,
|
||||
&generated_tag_sets,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
|
|
@ -544,6 +728,8 @@ mod test {
|
|||
..Default::default()
|
||||
};
|
||||
|
||||
let generated_tag_sets = GeneratedTagSets::default();
|
||||
|
||||
let mut measurement_generator = MeasurementGenerator::<ZeroRng>::new(
|
||||
"agent_name",
|
||||
42,
|
||||
|
|
@ -552,6 +738,7 @@ mod test {
|
|||
TEST_SEED,
|
||||
&[],
|
||||
fake_now,
|
||||
&generated_tag_sets,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
|
|
@ -584,6 +771,8 @@ mod test {
|
|||
..Default::default()
|
||||
};
|
||||
|
||||
let generated_tag_sets = GeneratedTagSets::default();
|
||||
|
||||
let mut measurement_generator = MeasurementGenerator::<ZeroRng>::new(
|
||||
"agent_name",
|
||||
0,
|
||||
|
|
@ -592,6 +781,7 @@ mod test {
|
|||
TEST_SEED,
|
||||
&[],
|
||||
fake_now,
|
||||
&generated_tag_sets,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
|
|
@ -642,6 +832,8 @@ mod test {
|
|||
..Default::default()
|
||||
};
|
||||
|
||||
let generated_tag_sets = GeneratedTagSets::default();
|
||||
|
||||
let mut measurement_generator = MeasurementGenerator::<ZeroRng>::new(
|
||||
"agent_name",
|
||||
0,
|
||||
|
|
@ -650,6 +842,7 @@ mod test {
|
|||
TEST_SEED,
|
||||
&[],
|
||||
fake_now,
|
||||
&generated_tag_sets,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
|
|
@ -706,6 +899,8 @@ mod test {
|
|||
..Default::default()
|
||||
};
|
||||
|
||||
let generated_tag_sets = GeneratedTagSets::default();
|
||||
|
||||
let mut measurement_generator = MeasurementGenerator::<ZeroRng>::new(
|
||||
"agent_name",
|
||||
0,
|
||||
|
|
@ -714,6 +909,7 @@ mod test {
|
|||
TEST_SEED,
|
||||
&[],
|
||||
fake_now,
|
||||
&generated_tag_sets,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
|
|
@ -763,8 +959,11 @@ mod test {
|
|||
},
|
||||
count: Some(2),
|
||||
}],
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let generated_tag_sets = GeneratedTagSets::default();
|
||||
|
||||
let mut measurement_generator_set = MeasurementGeneratorSet::<ZeroRng>::new(
|
||||
"agent_name",
|
||||
42,
|
||||
|
|
@ -772,6 +971,7 @@ mod test {
|
|||
TEST_SEED,
|
||||
&[],
|
||||
fake_now,
|
||||
&generated_tag_sets,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
|
|
@ -818,6 +1018,8 @@ measurement-42-1 field-42-1-0=0i,field-42-1-1=0i {}
|
|||
)
|
||||
.unwrap();
|
||||
|
||||
let generated_tag_sets = GeneratedTagSets::default();
|
||||
|
||||
let mut measurement_generator = MeasurementGenerator::<DynamicRng>::new(
|
||||
"agent_name",
|
||||
0,
|
||||
|
|
@ -826,6 +1028,7 @@ measurement-42-1 field-42-1-0=0i,field-42-1-1=0i {}
|
|||
TEST_SEED,
|
||||
&[],
|
||||
fake_now,
|
||||
&generated_tag_sets,
|
||||
)?;
|
||||
|
||||
let line_protocol = measurement_generator.generate_strings(fake_now)?;
|
||||
|
|
@ -880,6 +1083,68 @@ measurement-42-1 field-42-1-0=0i,field-42-1-1=0i {}
|
|||
resampling_test("", false)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn tag_set_and_tag_pairs() {
|
||||
let data_spec: specification::DataSpec = toml::from_str(
|
||||
r#"
|
||||
name = "ex"
|
||||
|
||||
[[values]]
|
||||
name = "foo"
|
||||
template = "foo-{{id}}"
|
||||
cardinality = 2
|
||||
|
||||
[[tag_sets]]
|
||||
name = "foo_set"
|
||||
for_each = ["foo"]
|
||||
|
||||
[[agents]]
|
||||
name = "test"
|
||||
|
||||
[[agents.measurements]]
|
||||
name = "m1"
|
||||
tag_set = "foo_set"
|
||||
tag_pairs = [{key = "hello", template = "world{{measurement.id}}"}]
|
||||
|
||||
[[agents.measurements.fields]]
|
||||
name = "val"
|
||||
i64_range = [3, 3]"#,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let fake_now = 678;
|
||||
|
||||
let generated_tag_sets = GeneratedTagSets::from_spec(&data_spec).unwrap();
|
||||
|
||||
let mut measurement_generator_set = MeasurementGeneratorSet::<ZeroRng>::new(
|
||||
"agent_name",
|
||||
42,
|
||||
&data_spec.agents[0].measurements[0],
|
||||
TEST_SEED,
|
||||
&[],
|
||||
fake_now,
|
||||
&generated_tag_sets,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let points = measurement_generator_set.generate(fake_now).unwrap();
|
||||
let mut v = Vec::new();
|
||||
for point in points {
|
||||
point.write_data_point_to(&mut v).unwrap();
|
||||
}
|
||||
let line_protocol = str::from_utf8(&v).unwrap();
|
||||
|
||||
assert_eq!(
|
||||
line_protocol,
|
||||
format!(
|
||||
"m1,foo=foo-1,hello=world0 val=3i {}
|
||||
m1,foo=foo-2,hello=world0 val=3i {}
|
||||
",
|
||||
fake_now, fake_now
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
fn resampling_test(resampling_toml: &str, expect_different: bool) -> Result<()> {
|
||||
let fake_now = 678;
|
||||
|
||||
|
|
@ -907,6 +1172,8 @@ measurement-42-1 field-42-1-0=0i,field-42-1-1=0i {}
|
|||
))
|
||||
.unwrap();
|
||||
|
||||
let generated_tag_sets = GeneratedTagSets::default();
|
||||
|
||||
let mut measurement_generator = MeasurementGenerator::<DynamicRng>::new(
|
||||
"agent_name",
|
||||
0,
|
||||
|
|
@ -915,6 +1182,7 @@ measurement-42-1 field-42-1-0=0i,field-42-1-1=0i {}
|
|||
TEST_SEED,
|
||||
&[],
|
||||
fake_now,
|
||||
&generated_tag_sets,
|
||||
)?;
|
||||
|
||||
let lines = measurement_generator.generate_strings(fake_now)?;
|
||||
|
|
|
|||
|
|
@ -28,11 +28,14 @@ type Result<T, E = Error> = std::result::Result<T, E>;
|
|||
#[derive(Deserialize, Debug)]
|
||||
#[serde(deny_unknown_fields)]
|
||||
pub struct DataSpec {
|
||||
/// Every point generated from this configuration will contain a tag
|
||||
/// `data_spec=[this value]` to identify what generated that data. This
|
||||
/// name can also be used in string replacements by using the
|
||||
/// placeholder `{{data_spec}}`.
|
||||
/// If `include_spec_tag` is set to true, eveyr point generated from this
|
||||
/// configuration will contain a tag `data_spec=[this value]` to identify
|
||||
/// what generated that data. This name can also be used in string replacements
|
||||
/// by using the placeholder `{{data_spec}}`.
|
||||
pub name: String,
|
||||
/// If set to true, all samples generated will include a `data_spec=[name]`
|
||||
/// key/value pair.
|
||||
pub include_spec_tag: Option<bool>,
|
||||
/// A string to be used as the seed to the random number generators.
|
||||
///
|
||||
/// When specified, this is used as a base seed propagated through all
|
||||
|
|
@ -197,11 +200,29 @@ pub struct MeasurementSpec {
|
|||
/// Specification of the tags for this measurement
|
||||
#[serde(default)]
|
||||
pub tags: Vec<TagSpec>,
|
||||
/// Specifies a tag set to include in every sampling in addition to tags specified
|
||||
pub tag_set: Option<String>,
|
||||
/// Specification of tag key/value pairs that get generated once and reused for
|
||||
/// every sampling.
|
||||
#[serde(default)]
|
||||
pub tag_pairs: Vec<TagPairSpec>,
|
||||
/// Specification of the fields for this measurement. At least one field is
|
||||
/// required.
|
||||
pub fields: Vec<FieldSpec>,
|
||||
}
|
||||
|
||||
/// Specification of a tag key/value pair whose template will be evaluated once and
|
||||
/// the value will be reused across every sampling.
|
||||
#[derive(Deserialize, Debug)]
|
||||
#[cfg_attr(test, derive(Default))]
|
||||
#[serde(deny_unknown_fields)]
|
||||
pub struct TagPairSpec {
|
||||
/// The tag key
|
||||
pub key: String,
|
||||
/// The template the generate the tag value
|
||||
pub template: String,
|
||||
}
|
||||
|
||||
/// The specification of how to generate tag keys and values for a particular
|
||||
/// measurement.
|
||||
#[derive(Deserialize, Debug)]
|
||||
|
|
|
|||
|
|
@ -75,7 +75,7 @@ pub struct GeneratedTagSets {
|
|||
has_one_values: BTreeMap<String, ParentToHasOnes>,
|
||||
// this maps the name of the tag set specified in the spec to the collection of tag
|
||||
// sets that were pre-generated.
|
||||
tag_sets: BTreeMap<String, Vec<TagSet>>,
|
||||
tag_sets: BTreeMap<String, Arc<Vec<TagSet>>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
|
|
@ -86,7 +86,6 @@ pub struct ParentToHasOnes {
|
|||
}
|
||||
|
||||
impl GeneratedTagSets {
|
||||
#[allow(dead_code)]
|
||||
pub fn from_spec(spec: &DataSpec) -> Result<Self> {
|
||||
let mut generated_tag_sets = Self::default();
|
||||
|
||||
|
|
@ -119,8 +118,7 @@ impl GeneratedTagSets {
|
|||
Ok(generated_tag_sets)
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub fn sets_for(&self, name: &str) -> Option<&Vec<TagSet>> {
|
||||
pub fn sets_for(&self, name: &str) -> Option<&Arc<Vec<TagSet>>> {
|
||||
self.tag_sets.get(name)
|
||||
}
|
||||
|
||||
|
|
@ -183,7 +181,8 @@ impl GeneratedTagSets {
|
|||
})
|
||||
.collect();
|
||||
let tag_sets = self.for_each_tag_set(None, &tag_set_keys, &mut tag_pairs, 0)?;
|
||||
self.tag_sets.insert(set_name.to_string(), tag_sets);
|
||||
self.tag_sets
|
||||
.insert(set_name.to_string(), Arc::new(tag_sets));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
@ -461,8 +460,8 @@ impl std::fmt::Display for TagSet {
|
|||
|
||||
#[derive(Debug, PartialEq, PartialOrd)]
|
||||
pub struct TagPair {
|
||||
key: Arc<String>,
|
||||
value: Arc<String>,
|
||||
pub key: Arc<String>,
|
||||
pub value: Arc<String>,
|
||||
}
|
||||
|
||||
impl std::fmt::Display for TagPair {
|
||||
|
|
|
|||
|
|
@ -372,6 +372,7 @@ bool = true"#;
|
|||
now,
|
||||
false,
|
||||
1,
|
||||
false,
|
||||
)
|
||||
.await?;
|
||||
|
||||
|
|
@ -417,6 +418,7 @@ bool = true"#;
|
|||
now,
|
||||
false,
|
||||
2,
|
||||
false,
|
||||
)
|
||||
.await?;
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue