Merge pull request #3330 from influxdata/pd/data-generator-many-dbs
feat: add ability for data generator to write to many bucketspull/24376/head
commit
b0209137e6
|
@ -1,10 +1,12 @@
|
|||
use criterion::{criterion_group, criterion_main, Criterion, Throughput};
|
||||
use iox_data_generator::agent::Agent;
|
||||
use iox_data_generator::specification::{AgentAssignmentSpec, BucketWriterSpec, OrgBucket};
|
||||
use iox_data_generator::{
|
||||
specification::{AgentSpec, DataSpec, FieldSpec, FieldValueSpec, MeasurementSpec},
|
||||
tag_set::GeneratedTagSets,
|
||||
write::PointsWriterBuilder,
|
||||
};
|
||||
use std::time::Duration;
|
||||
|
||||
pub fn single_agent(c: &mut Criterion) {
|
||||
let spec = DataSpec {
|
||||
|
@ -13,8 +15,6 @@ pub fn single_agent(c: &mut Criterion) {
|
|||
tag_sets: vec![],
|
||||
agents: vec![AgentSpec {
|
||||
name: "foo".to_string(),
|
||||
count: None,
|
||||
sampling_interval: Some("1s".to_string()),
|
||||
measurements: vec![MeasurementSpec {
|
||||
name: "measurement-1".into(),
|
||||
count: None,
|
||||
|
@ -29,6 +29,14 @@ pub fn single_agent(c: &mut Criterion) {
|
|||
has_one: vec![],
|
||||
tag_pairs: vec![],
|
||||
}],
|
||||
bucket_writers: vec![BucketWriterSpec {
|
||||
ratio: 1.0,
|
||||
agents: vec![AgentAssignmentSpec {
|
||||
name: "foo".to_string(),
|
||||
count: None,
|
||||
sampling_interval: "1s".to_string(),
|
||||
}],
|
||||
}],
|
||||
};
|
||||
|
||||
let mut points_writer = PointsWriterBuilder::new_no_op(true);
|
||||
|
@ -47,6 +55,10 @@ pub fn single_agent(c: &mut Criterion) {
|
|||
b.iter(|| {
|
||||
let r = block_on(iox_data_generator::generate(
|
||||
&spec,
|
||||
vec![OrgBucket {
|
||||
org: "foo".to_string(),
|
||||
bucket: "bar".to_string(),
|
||||
}],
|
||||
&mut points_writer,
|
||||
start_datetime,
|
||||
end_datetime,
|
||||
|
@ -125,8 +137,6 @@ for_each = [
|
|||
|
||||
[[agents]]
|
||||
name = "foo"
|
||||
# create this many agents
|
||||
count = 3
|
||||
|
||||
[[agents.measurements]]
|
||||
name = "storage_usage_bucket_cardinality"
|
||||
|
@ -142,6 +152,10 @@ tag_pairs = [
|
|||
[[agents.measurements.fields]]
|
||||
name = "gauge"
|
||||
i64_range = [1, 8147240]
|
||||
|
||||
[[bucket_writers]]
|
||||
ratio = 1.0
|
||||
agents = [{name = "foo", sampling_interval = "1s", count = 3}]
|
||||
"#).unwrap();
|
||||
|
||||
let generated_tag_sets = GeneratedTagSets::from_spec(&spec).unwrap();
|
||||
|
@ -155,6 +169,8 @@ i64_range = [1, 8147240]
|
|||
|
||||
let mut agents = Agent::from_spec(
|
||||
&spec.agents[0],
|
||||
3,
|
||||
Duration::from_millis(10),
|
||||
start_datetime,
|
||||
end_datetime,
|
||||
0,
|
||||
|
@ -172,7 +188,7 @@ i64_range = [1, 8147240]
|
|||
group.bench_function("single agent with basic configuration", |b| {
|
||||
b.iter(|| {
|
||||
agent.reset_current_date_time(0);
|
||||
let points_writer = points_writer.build_for_agent("foo").unwrap();
|
||||
let points_writer = points_writer.build_for_agent("foo", "foo", "foo").unwrap();
|
||||
let r = block_on(agent.generate_all(points_writer, 1));
|
||||
let n_points = r.expect("Could not generate data");
|
||||
assert_eq!(n_points, expected_points as usize);
|
||||
|
|
|
@ -2,10 +2,12 @@
|
|||
# https://github.com/influxdata/idpe/tree/e493a8e9b6b773e9374a8542ddcab7d8174d320d/performance/capacity/write
|
||||
name = "cap_write"
|
||||
|
||||
[[bucket_writers]]
|
||||
ratio = 1.0
|
||||
agents = [{name = "telegraf", count = 3, sampling_interval = "10s"}]
|
||||
|
||||
[[agents]]
|
||||
name = "telegraf"
|
||||
count = 3
|
||||
sampling_interval = "10s"
|
||||
tag_pairs = [
|
||||
{key = "host", template = "host-{{agent.id}}"}
|
||||
]
|
||||
|
|
|
@ -103,14 +103,10 @@ for_each = [
|
|||
"foo_bar"
|
||||
]
|
||||
|
||||
# Agent specs can be referenced later on by bucket writers, which specify how frequently
|
||||
# data should be written and by how many different agents.
|
||||
[[agents]]
|
||||
name = "first_agent"
|
||||
# Create this many agents. Agents are single threaded so the way to get more paralellism is to
|
||||
# increase the number of agents.
|
||||
count = 2
|
||||
# each agent will generate a sampling of data every 10ms. If running from a historical point in
|
||||
# time, the agents will push data as quickly as they can until they catch up to now.
|
||||
sampling_interval = "10ms"
|
||||
# if specifying tag_pairs at the agent level, every line that the agent generates will have these
|
||||
# tag pairs added to it. Note that the template has the same helpers as those in value (except for id).
|
||||
# In addition, it has an accessor for the agent id.
|
||||
|
@ -171,4 +167,22 @@ count = 2
|
|||
|
||||
[[agents.measurements.fields]]
|
||||
name = "f1"
|
||||
bool = true
|
||||
bool = true
|
||||
|
||||
# bucket_writers specify how to split up the list of supplied buckets to write to. If
|
||||
# only a single one is specified via the CLI flags, then you'd want only a single bucket_writer
|
||||
# with a percent of 1.0.
|
||||
#
|
||||
# These make it possible to split up a large list of buckets to write to and send different
|
||||
# amounts of write load as well as different schemas through specifying different agents.
|
||||
[[bucket_writers]]
|
||||
# 20% of the buckets specified in the --bucket_list file will have these agents writing to them
|
||||
ratio = 0.2
|
||||
# for each of those buckets, have 3 of the first_agent writing every 10s, and 1 of the another_example writing every minute.
|
||||
agents = [{name = "first_agent", count = 3, sampling_interval = "10s"}, {name = "another_example", sampling_interval = "1m"}]
|
||||
|
||||
[[bucket_writers]]
|
||||
# the remaining 80% of the buckeets specified will write using these agents
|
||||
ratio = 0.8
|
||||
# we'll only have a single agent of another_example for each database
|
||||
agents = [{name = "another_example", sampling_interval = "1s"}]
|
||||
|
|
|
@ -58,11 +58,12 @@ for_each = [
|
|||
"partition_id",
|
||||
]
|
||||
|
||||
[[bucket_writers]]
|
||||
ratio = 1.0
|
||||
agents = [{name = "sender", sampling_interval = "10s"}]
|
||||
|
||||
[[agents]]
|
||||
name = "sender"
|
||||
# create this many agents
|
||||
count = 1
|
||||
sampling_interval = "10s"
|
||||
|
||||
[[agents.measurements]]
|
||||
name = "storage_usage_bucket_cardinality"
|
||||
|
|
|
@ -17,8 +17,6 @@ for_each = ["host", "host.service"]
|
|||
|
||||
[[agents]]
|
||||
name = "tracing_agent"
|
||||
count = 1
|
||||
sampling_interval = "1s"
|
||||
|
||||
[[agents.measurements]]
|
||||
name = "traces"
|
||||
|
@ -31,3 +29,7 @@ tag_pairs = [
|
|||
[[agents.measurements.fields]]
|
||||
name = "timing"
|
||||
f64_range = [0.0, 500.0]
|
||||
|
||||
[[bucket_writers]]
|
||||
ratio = 1.0
|
||||
agents = [{name = "tracing_agent", sampling_interval = "1s"}]
|
||||
|
|
|
@ -8,7 +8,6 @@ use crate::{
|
|||
};
|
||||
|
||||
use crate::tag_set::GeneratedTagSets;
|
||||
use humantime::parse_duration;
|
||||
use serde_json::json;
|
||||
use snafu::{ResultExt, Snafu};
|
||||
use std::time::{Duration, Instant};
|
||||
|
@ -39,12 +38,6 @@ pub enum Error {
|
|||
source: crate::write::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Sampling interval must be valid string: {}", source))]
|
||||
InvalidSamplingInterval {
|
||||
/// Underlying `parse` error
|
||||
source: humantime::DurationError,
|
||||
},
|
||||
|
||||
#[snafu(display("Error creating agent tag pairs: {}", source))]
|
||||
CouldNotCreateAgentTagPairs { source: crate::tag_pair::Error },
|
||||
}
|
||||
|
@ -77,17 +70,18 @@ pub struct Agent {
|
|||
impl Agent {
|
||||
/// Create agents that will generate data points according to these
|
||||
/// specs.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn from_spec(
|
||||
agent_spec: &specification::AgentSpec,
|
||||
count: usize,
|
||||
sampling_interval: Duration,
|
||||
start_datetime: Option<i64>, // in nanoseconds since the epoch, defaults to now
|
||||
end_datetime: Option<i64>, // also in nanoseconds since the epoch, defaults to now
|
||||
execution_start_time: i64,
|
||||
continue_on: bool, // If true, run in "continue" mode after historical data is generated
|
||||
generated_tag_sets: &GeneratedTagSets,
|
||||
) -> Result<Vec<Self>> {
|
||||
let agent_count = agent_spec.count.unwrap_or(1);
|
||||
|
||||
let agents: Vec<_> = (1..agent_count + 1)
|
||||
let agents: Vec<_> = (1..count + 1)
|
||||
.into_iter()
|
||||
.map(|agent_id| {
|
||||
let data = json!({"agent": {"id": agent_id, "name": agent_spec.name}});
|
||||
|
@ -114,17 +108,11 @@ impl Agent {
|
|||
let current_datetime = start_datetime.unwrap_or_else(now_ns);
|
||||
let end_datetime = end_datetime.unwrap_or_else(now_ns);
|
||||
|
||||
// Convert to nanoseconds
|
||||
let sampling_interval = match &agent_spec.sampling_interval {
|
||||
None => None,
|
||||
Some(s) => Some(parse_duration(s).context(InvalidSamplingInterval)?),
|
||||
};
|
||||
|
||||
Ok(Self {
|
||||
id: agent_id,
|
||||
name: agent_spec.name.to_string(),
|
||||
measurement_generators,
|
||||
sampling_interval,
|
||||
sampling_interval: Some(sampling_interval),
|
||||
current_datetime,
|
||||
end_datetime,
|
||||
continue_on,
|
||||
|
|
|
@ -13,7 +13,12 @@
|
|||
use chrono::prelude::*;
|
||||
use chrono_english::{parse_date_string, Dialect};
|
||||
use clap::{crate_authors, crate_version, App, Arg};
|
||||
use iox_data_generator::{specification::DataSpec, write::PointsWriterBuilder};
|
||||
use iox_data_generator::{
|
||||
specification::{DataSpec, OrgBucket},
|
||||
write::PointsWriterBuilder,
|
||||
};
|
||||
use std::fs::File;
|
||||
use std::io::{self, BufRead};
|
||||
use tracing::info;
|
||||
|
||||
#[tokio::main]
|
||||
|
@ -91,6 +96,12 @@ Logging:
|
|||
.help("The bucket name to write to")
|
||||
.takes_value(true),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("BUCKET_LIST")
|
||||
.long("bucket_list")
|
||||
.help("File name with a list of databases. 1 per line with <org>_<bucket> format")
|
||||
.takes_value(true),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("TOKEN")
|
||||
.long("token")
|
||||
|
@ -176,21 +187,11 @@ Logging:
|
|||
{
|
||||
PointsWriterBuilder::new_file(line_protocol_filename)?
|
||||
} else if let Some(host) = matches.value_of("HOST") {
|
||||
let (host, org, bucket, token) = validate_api_arguments(
|
||||
host,
|
||||
matches.value_of("ORG"),
|
||||
matches.value_of("BUCKET"),
|
||||
matches.value_of("TOKEN"),
|
||||
);
|
||||
let token = matches
|
||||
.value_of("TOKEN")
|
||||
.expect("--token must be specified");
|
||||
|
||||
PointsWriterBuilder::new_api(
|
||||
host,
|
||||
org,
|
||||
bucket,
|
||||
token,
|
||||
matches.value_of("jaeger_debug_header"),
|
||||
)
|
||||
.await?
|
||||
PointsWriterBuilder::new_api(host, token, matches.value_of("jaeger_debug_header")).await?
|
||||
} else if matches.is_present("PRINT") {
|
||||
PointsWriterBuilder::new_std_out()
|
||||
} else if matches.is_present("NOOP") {
|
||||
|
@ -199,8 +200,42 @@ Logging:
|
|||
panic!("One of --print or --output or --host must be provided.");
|
||||
};
|
||||
|
||||
let buckets = match (
|
||||
matches.value_of("ORG"),
|
||||
matches.value_of("BUCKET"),
|
||||
matches.value_of("BUCKET_LIST"),
|
||||
) {
|
||||
(Some(org), Some(bucket), None) => {
|
||||
vec![OrgBucket {
|
||||
org: org.to_string(),
|
||||
bucket: bucket.to_string(),
|
||||
}]
|
||||
}
|
||||
(None, None, Some(bucket_list)) => {
|
||||
let f = File::open(bucket_list).expect("unable to open bucket_list file");
|
||||
|
||||
io::BufReader::new(f)
|
||||
.lines()
|
||||
.map(|l| {
|
||||
let l = l.expect("unable to read line from bucket list");
|
||||
let parts = l.split('_').collect::<Vec<_>>();
|
||||
if parts.len() != 2 {
|
||||
panic!("error parsing org and bucket from {}", l);
|
||||
}
|
||||
|
||||
let org = parts[0].to_string();
|
||||
let bucket = parts[1].to_string();
|
||||
|
||||
OrgBucket { org, bucket }
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
}
|
||||
_ => panic!("must specify either --org AND --bucket OR --bucket_list"),
|
||||
};
|
||||
|
||||
let result = iox_data_generator::generate(
|
||||
&data_spec,
|
||||
buckets,
|
||||
&mut points_writer_builder,
|
||||
start_datetime,
|
||||
end_datetime,
|
||||
|
@ -230,36 +265,6 @@ fn datetime_nanoseconds(arg: Option<&str>, now: DateTime<Local>) -> Option<i64>
|
|||
})
|
||||
}
|
||||
|
||||
fn validate_api_arguments<'a>(
|
||||
host: &'a str,
|
||||
org: Option<&'a str>,
|
||||
bucket: Option<&'a str>,
|
||||
token: Option<&'a str>,
|
||||
) -> (&'a str, &'a str, &'a str, &'a str) {
|
||||
let mut errors = vec![];
|
||||
|
||||
if org.is_none() {
|
||||
errors.push("`--org` is missing");
|
||||
}
|
||||
if bucket.is_none() {
|
||||
errors.push("`--bucket` is missing");
|
||||
}
|
||||
if token.is_none() {
|
||||
errors.push("`--token` is missing");
|
||||
}
|
||||
|
||||
if errors.is_empty() {
|
||||
// These `unwrap`s are safe because otherwise errors wouldn't be empty
|
||||
(host, org.unwrap(), bucket.unwrap(), token.unwrap())
|
||||
} else {
|
||||
panic!(
|
||||
"When `--host` is specified, `--org`, `--bucket`, and `--token` are required, \
|
||||
but {}",
|
||||
errors.join(", ")
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
|
|
|
@ -89,6 +89,16 @@ pub enum Error {
|
|||
/// Underlying `tag_set` module error
|
||||
source: tag_set::Error,
|
||||
},
|
||||
|
||||
/// Error splitting input buckets to agents that write to them
|
||||
#[snafu(display(
|
||||
"Error splitting input buckets into agents that write to them: {}",
|
||||
source
|
||||
))]
|
||||
CouldNotAssignAgents {
|
||||
/// Underlying `specification` module error
|
||||
source: specification::Error,
|
||||
},
|
||||
}
|
||||
|
||||
type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
|
@ -102,6 +112,7 @@ type Result<T, E = Error> = std::result::Result<T, E>;
|
|||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn generate(
|
||||
spec: &specification::DataSpec,
|
||||
buckets: Vec<specification::OrgBucket>,
|
||||
points_writer_builder: &mut write::PointsWriterBuilder,
|
||||
start_datetime: Option<i64>,
|
||||
end_datetime: Option<i64>,
|
||||
|
@ -112,45 +123,60 @@ pub async fn generate(
|
|||
) -> Result<usize> {
|
||||
let mut handles = vec![];
|
||||
|
||||
let bucket_agents = spec
|
||||
.bucket_split_to_agents(&buckets)
|
||||
.context(CouldNotAssignAgents)?;
|
||||
|
||||
let generated_tag_sets = GeneratedTagSets::from_spec(spec).context(CouldNotGenerateTagSets)?;
|
||||
|
||||
let lock = Arc::new(tokio::sync::Mutex::new(()));
|
||||
|
||||
let start = std::time::Instant::now();
|
||||
|
||||
let agents = spec
|
||||
.agents
|
||||
.iter()
|
||||
.map(|spec| {
|
||||
Agent::from_spec(
|
||||
spec,
|
||||
for bucket_agents in &bucket_agents {
|
||||
for agent_assignment in bucket_agents.agent_assignments.iter() {
|
||||
let agents = Agent::from_spec(
|
||||
agent_assignment.spec,
|
||||
agent_assignment.count,
|
||||
agent_assignment.sampling_interval,
|
||||
start_datetime,
|
||||
end_datetime,
|
||||
execution_start_time,
|
||||
continue_on,
|
||||
&generated_tag_sets,
|
||||
)
|
||||
.context(CouldNotCreateAgent)
|
||||
})
|
||||
.collect::<Result<Vec<Vec<Agent>>>>()?;
|
||||
let agents = agents.into_iter().flatten();
|
||||
.context(CouldNotCreateAgent)?;
|
||||
|
||||
for mut agent in agents {
|
||||
let agent_points_writer = points_writer_builder
|
||||
.build_for_agent(&agent.name)
|
||||
.context(CouldNotCreateAgentWriter)?;
|
||||
info!(
|
||||
"Configuring {} agents of \"{}\" to write data to org {} and bucket {}",
|
||||
agent_assignment.count,
|
||||
agent_assignment.spec.name,
|
||||
bucket_agents.org_bucket.org,
|
||||
bucket_agents.org_bucket.bucket
|
||||
);
|
||||
|
||||
let lock_ref = Arc::clone(&lock);
|
||||
for mut agent in agents.into_iter() {
|
||||
let agent_points_writer = points_writer_builder
|
||||
.build_for_agent(
|
||||
&agent_assignment.spec.name,
|
||||
&bucket_agents.org_bucket.org,
|
||||
&bucket_agents.org_bucket.bucket,
|
||||
)
|
||||
.context(CouldNotCreateAgentWriter)?;
|
||||
|
||||
handles.push(tokio::task::spawn(async move {
|
||||
// did this weird hack because otherwise the stdout outputs would be jumbled together garbage
|
||||
if one_agent_at_a_time {
|
||||
let _l = lock_ref.lock().await;
|
||||
agent.generate_all(agent_points_writer, batch_size).await
|
||||
} else {
|
||||
agent.generate_all(agent_points_writer, batch_size).await
|
||||
let lock_ref = Arc::clone(&lock);
|
||||
|
||||
handles.push(tokio::task::spawn(async move {
|
||||
// did this weird hack because otherwise the stdout outputs would be jumbled together garbage
|
||||
if one_agent_at_a_time {
|
||||
let _l = lock_ref.lock().await;
|
||||
agent.generate_all(agent_points_writer, batch_size).await
|
||||
} else {
|
||||
agent.generate_all(agent_points_writer, batch_size).await
|
||||
}
|
||||
}));
|
||||
}
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
let mut total_points = 0;
|
||||
|
@ -190,6 +216,7 @@ mod test {
|
|||
use crate::specification::*;
|
||||
use influxdb2_client::models::WriteDataPoint;
|
||||
use std::str::FromStr;
|
||||
use std::time::Duration;
|
||||
|
||||
type Error = Box<dyn std::error::Error>;
|
||||
type Result<T = (), E = Error> = std::result::Result<T, E>;
|
||||
|
@ -201,14 +228,18 @@ name = "demo_schema"
|
|||
|
||||
[[agents]]
|
||||
name = "foo"
|
||||
sampling_interval = "10s" # seconds
|
||||
|
||||
[[agents.measurements]]
|
||||
name = "cpu"
|
||||
|
||||
[[agents.measurements.fields]]
|
||||
name = "val"
|
||||
i64_range = [1, 1]"#;
|
||||
i64_range = [1, 1]
|
||||
|
||||
[[bucket_writers]]
|
||||
ratio = 1.0
|
||||
agents = [{name = "foo", sampling_interval = "10s"}]
|
||||
"#;
|
||||
let data_spec = DataSpec::from_str(toml).unwrap();
|
||||
let agent_spec = &data_spec.agents[0];
|
||||
|
||||
|
@ -224,6 +255,8 @@ i64_range = [1, 1]"#;
|
|||
|
||||
let mut agent = agent::Agent::from_spec(
|
||||
agent_spec,
|
||||
1,
|
||||
Duration::from_secs(10),
|
||||
start_datetime,
|
||||
end_datetime,
|
||||
execution_start_time,
|
||||
|
|
|
@ -545,7 +545,11 @@ mod test {
|
|||
|
||||
[[agents.measurements.fields]]
|
||||
name = "val"
|
||||
i64_range = [3, 3]"#,
|
||||
i64_range = [3, 3]
|
||||
|
||||
[[bucket_writers]]
|
||||
ratio = 1.0
|
||||
agents = [{name = "foo", sampling_interval = "10s"}]"#,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
|
@ -604,7 +608,11 @@ mod test {
|
|||
|
||||
[[agents.measurements.fields]]
|
||||
name = "val"
|
||||
i64_range = [3, 3]"#,
|
||||
i64_range = [3, 3]
|
||||
|
||||
[[bucket_writers]]
|
||||
ratio = 1.0
|
||||
agents = [{name = "foo", sampling_interval = "10s"}]"#,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
|
|
|
@ -1,11 +1,14 @@
|
|||
//! Reading and interpreting data generation specifications.
|
||||
|
||||
use humantime::parse_duration;
|
||||
use serde::Deserialize;
|
||||
use snafu::{ResultExt, Snafu};
|
||||
use std::{fs, ops::Range, str::FromStr};
|
||||
use snafu::{OptionExt, ResultExt, Snafu};
|
||||
use std::{fs, ops::Range, str::FromStr, sync::Arc, time::Duration};
|
||||
use tracing::warn;
|
||||
|
||||
/// Errors that may happen while reading a TOML specification.
|
||||
#[derive(Snafu, Debug)]
|
||||
#[allow(missing_docs)]
|
||||
pub enum Error {
|
||||
/// File-related error that may happen while reading a specification
|
||||
#[snafu(display(r#"Error reading data spec from TOML file: {}"#, source))]
|
||||
|
@ -20,6 +23,15 @@ pub enum Error {
|
|||
/// Underlying TOML error that caused this problem
|
||||
source: toml::de::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Sampling interval must be valid string: {}", source))]
|
||||
InvalidSamplingInterval { source: humantime::DurationError },
|
||||
|
||||
#[snafu(display(
|
||||
"Agent {} referenced in bucket_writers, but not present in spec",
|
||||
agent
|
||||
))]
|
||||
AgentNotFound { agent: String },
|
||||
}
|
||||
|
||||
type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
|
@ -43,8 +55,10 @@ pub struct DataSpec {
|
|||
/// set of tags that appear.
|
||||
#[serde(default)]
|
||||
pub tag_sets: Vec<TagSetsSpec>,
|
||||
/// The specification for the data-generating agents in this data set.
|
||||
/// The specification for the agents that can be used to write data to databases.
|
||||
pub agents: Vec<AgentSpec>,
|
||||
/// The specification for writing to the provided list of buckets.
|
||||
pub bucket_writers: Vec<BucketWriterSpec>,
|
||||
}
|
||||
|
||||
impl DataSpec {
|
||||
|
@ -53,6 +67,111 @@ impl DataSpec {
|
|||
let spec_toml = fs::read_to_string(file_name).context(ReadFile)?;
|
||||
Self::from_str(&spec_toml)
|
||||
}
|
||||
|
||||
/// Given a collection of OrgBuckets, assign each a set of agents based on the spec
|
||||
pub fn bucket_split_to_agents<'a>(
|
||||
&'a self,
|
||||
buckets: &'a [OrgBucket],
|
||||
) -> Result<Vec<BucketAgents<'a>>> {
|
||||
let mut bucket_agents = Vec::with_capacity(buckets.len());
|
||||
|
||||
let mut start = 0;
|
||||
for w in &self.bucket_writers {
|
||||
if start >= buckets.len() {
|
||||
warn!(
|
||||
"Bucket_writers percentages > 1.0. Writer {:?} and later skipped.",
|
||||
w
|
||||
);
|
||||
break;
|
||||
}
|
||||
|
||||
// validate the agents actually exist in the spec
|
||||
for a in &w.agents {
|
||||
let _ = self.agent_by_name(&a.name).unwrap_or_else(|| {
|
||||
panic!(
|
||||
"agent {} referenced in bucket writers, but isn't in the spec",
|
||||
a.name
|
||||
)
|
||||
});
|
||||
}
|
||||
|
||||
let agents: Vec<_> = w
|
||||
.agents
|
||||
.iter()
|
||||
.map(|a| {
|
||||
let count = a.count.unwrap_or(1);
|
||||
let sampling_interval =
|
||||
parse_duration(&a.sampling_interval).context(InvalidSamplingInterval)?;
|
||||
let spec = self
|
||||
.agent_by_name(&a.name)
|
||||
.context(AgentNotFound { agent: &a.name })?;
|
||||
|
||||
Ok(AgentAssignment {
|
||||
spec,
|
||||
count,
|
||||
sampling_interval,
|
||||
})
|
||||
})
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
let agents = Arc::new(agents);
|
||||
|
||||
let mut end = (buckets.len() as f64 * w.ratio).ceil() as usize + start;
|
||||
if end > buckets.len() {
|
||||
end = buckets.len();
|
||||
}
|
||||
|
||||
for org_bucket in &buckets[start..end] {
|
||||
bucket_agents.push(BucketAgents {
|
||||
org_bucket,
|
||||
agent_assignments: Arc::clone(&agents),
|
||||
})
|
||||
}
|
||||
|
||||
start = end;
|
||||
}
|
||||
|
||||
Ok(bucket_agents)
|
||||
}
|
||||
|
||||
/// Get the agent spec by its name
|
||||
pub fn agent_by_name(&self, name: &str) -> Option<&AgentSpec> {
|
||||
for a in &self.agents {
|
||||
if a.name == name {
|
||||
return Some(a);
|
||||
}
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Eq, PartialEq)]
|
||||
/// Specifies an org and a bucket to write data to
|
||||
pub struct OrgBucket {
|
||||
/// The organization name
|
||||
pub org: String,
|
||||
/// The bucket name
|
||||
pub bucket: String,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
/// Assignment info for an agent to a bucket
|
||||
pub struct AgentAssignment<'a> {
|
||||
/// The agent specification for writing to the assigned bucket
|
||||
pub spec: &'a AgentSpec,
|
||||
/// The number of these agents that should be writing to the bucket
|
||||
pub count: usize,
|
||||
/// The sampling interval agents will generate data on
|
||||
pub sampling_interval: Duration,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
/// Agent assignments mapped to a bucket
|
||||
pub struct BucketAgents<'a> {
|
||||
/// The organization and bucket data will get written to
|
||||
pub org_bucket: &'a OrgBucket,
|
||||
/// The agents specifications that will be writing to the bucket
|
||||
pub agent_assignments: Arc<Vec<AgentAssignment<'a>>>,
|
||||
}
|
||||
|
||||
impl FromStr for DataSpec {
|
||||
|
@ -110,6 +229,33 @@ pub struct TagSetsSpec {
|
|||
pub for_each: Vec<String>,
|
||||
}
|
||||
|
||||
/// The specification for what should be written to the list of provided org buckets.
|
||||
/// Buckets will be written to by one or more agents with the given sampling interval and
|
||||
/// agent count.
|
||||
#[derive(Deserialize, Debug)]
|
||||
#[serde(deny_unknown_fields)]
|
||||
pub struct BucketWriterSpec {
|
||||
/// The ratio of buckets from the provided list that should use these agents. The
|
||||
/// ratios of the collection of bucket writer specs should add up to 1.0.
|
||||
pub ratio: f64,
|
||||
/// The agents that should be used to write to these databases.
|
||||
pub agents: Vec<AgentAssignmentSpec>,
|
||||
}
|
||||
|
||||
/// The specification for the specific configuration of how an agent should write to a database.
|
||||
#[derive(Deserialize, Debug, Clone)]
|
||||
#[serde(deny_unknown_fields)]
|
||||
pub struct AgentAssignmentSpec {
|
||||
/// The name of the `AgentSpec` to use
|
||||
pub name: String,
|
||||
/// The number of these agents that should write to the database
|
||||
pub count: Option<usize>,
|
||||
/// How frequently each agent will write to the database. This is applicable when using the
|
||||
/// --continue flag. Otherwise, if doing historical backfill, timestamps of generated data
|
||||
/// will be this far apart and data will be written in as quickly as possible.
|
||||
pub sampling_interval: String,
|
||||
}
|
||||
|
||||
/// The specification of the behavior of an agent, the entity responsible for
|
||||
/// generating a number of data points according to its configuration.
|
||||
#[derive(Deserialize, Debug)]
|
||||
|
@ -118,12 +264,6 @@ pub struct TagSetsSpec {
|
|||
pub struct AgentSpec {
|
||||
/// The name of the agent, which can be referenced in templates with `agent.name`.
|
||||
pub name: String,
|
||||
/// Specifies the number of agents that should be created with this spec.
|
||||
/// Default value is 1.
|
||||
pub count: Option<usize>,
|
||||
/// How often this agent should generate samples, in a duration string. If
|
||||
/// not specified, this agent will only generate one sample.
|
||||
pub sampling_interval: Option<String>,
|
||||
/// The specifications for the measurements for the agent to generate.
|
||||
pub measurements: Vec<MeasurementSpec>,
|
||||
/// A collection of strings that reference other `Values` collections. Each agent will have one
|
||||
|
@ -493,6 +633,10 @@ name = "cpu"
|
|||
[[agents.measurements.fields]]
|
||||
name = "host"
|
||||
template = "server"
|
||||
|
||||
[[bucket_writers]]
|
||||
ratio = 1.0
|
||||
agents = [{name = "foo", sampling_interval = "10s"}]
|
||||
"#;
|
||||
let spec = DataSpec::from_str(toml).unwrap();
|
||||
|
||||
|
@ -513,4 +657,79 @@ template = "server"
|
|||
field_spec
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn split_buckets_by_writer_spec() {
|
||||
let toml = r#"
|
||||
name = "demo_schema"
|
||||
|
||||
[[agents]]
|
||||
name = "foo"
|
||||
[[agents.measurements]]
|
||||
name = "cpu"
|
||||
[[agents.measurements.fields]]
|
||||
name = "host"
|
||||
template = "server"
|
||||
|
||||
[[agents]]
|
||||
name = "bar"
|
||||
[[agents.measurements]]
|
||||
name = "whatevs"
|
||||
[[agents.measurements.fields]]
|
||||
name = "val"
|
||||
i64_range = [0, 10]
|
||||
|
||||
[[bucket_writers]]
|
||||
ratio = 0.6
|
||||
agents = [{name = "foo", sampling_interval = "10s"}]
|
||||
|
||||
[[bucket_writers]]
|
||||
ratio = 0.4
|
||||
agents = [{name = "bar", sampling_interval = "1m", count = 3}]
|
||||
"#;
|
||||
let spec = DataSpec::from_str(toml).unwrap();
|
||||
let buckets = vec![
|
||||
OrgBucket {
|
||||
org: "a".to_string(),
|
||||
bucket: "1".to_string(),
|
||||
},
|
||||
OrgBucket {
|
||||
org: "a".to_string(),
|
||||
bucket: "2".to_string(),
|
||||
},
|
||||
OrgBucket {
|
||||
org: "b".to_string(),
|
||||
bucket: "1".to_string(),
|
||||
},
|
||||
];
|
||||
|
||||
let bucket_agents = spec.bucket_split_to_agents(&buckets).unwrap();
|
||||
|
||||
let b = &bucket_agents[0];
|
||||
assert_eq!(b.org_bucket, &buckets[0]);
|
||||
assert_eq!(
|
||||
b.agent_assignments[0].sampling_interval,
|
||||
Duration::from_secs(10)
|
||||
);
|
||||
assert_eq!(b.agent_assignments[0].count, 1);
|
||||
assert_eq!(b.agent_assignments[0].spec.name, "foo");
|
||||
|
||||
let b = &bucket_agents[1];
|
||||
assert_eq!(b.org_bucket, &buckets[1]);
|
||||
assert_eq!(
|
||||
b.agent_assignments[0].sampling_interval,
|
||||
Duration::from_secs(10)
|
||||
);
|
||||
assert_eq!(b.agent_assignments[0].count, 1);
|
||||
assert_eq!(b.agent_assignments[0].spec.name, "foo");
|
||||
|
||||
let b = &bucket_agents[2];
|
||||
assert_eq!(b.org_bucket, &buckets[2]);
|
||||
assert_eq!(
|
||||
b.agent_assignments[0].sampling_interval,
|
||||
Duration::from_secs(60)
|
||||
);
|
||||
assert_eq!(b.agent_assignments[0].count, 3);
|
||||
assert_eq!(b.agent_assignments[0].spec.name, "bar");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -492,7 +492,10 @@ name = "cpu"
|
|||
[[agents.measurements.fields]]
|
||||
name = "f1"
|
||||
i64_range = [0, 23]
|
||||
"#;
|
||||
|
||||
[[bucket_writers]]
|
||||
ratio = 1.0
|
||||
agents = [{name = "foo", sampling_interval = "10s"}]"#;
|
||||
|
||||
let spec = DataSpec::from_str(toml).unwrap();
|
||||
let tag_sets = GeneratedTagSets::from_spec(&spec).unwrap();
|
||||
|
@ -537,7 +540,10 @@ name = "cpu"
|
|||
[[agents.measurements.fields]]
|
||||
name = "f1"
|
||||
i64_range = [0, 23]
|
||||
"#;
|
||||
|
||||
[[bucket_writers]]
|
||||
ratio = 1.0
|
||||
agents = [{name = "foo", sampling_interval = "10s"}]"#;
|
||||
|
||||
let spec = DataSpec::from_str(toml).unwrap();
|
||||
let tag_sets = GeneratedTagSets::from_spec(&spec).unwrap();
|
||||
|
@ -603,7 +609,10 @@ name = "cpu"
|
|||
[[agents.measurements.fields]]
|
||||
name = "f1"
|
||||
i64_range = [0, 23]
|
||||
"#;
|
||||
|
||||
[[bucket_writers]]
|
||||
ratio = 1.0
|
||||
agents = [{name = "foo", sampling_interval = "10s"}]"#;
|
||||
|
||||
let spec = DataSpec::from_str(toml).unwrap();
|
||||
let tag_sets = GeneratedTagSets::from_spec(&spec).unwrap();
|
||||
|
|
|
@ -94,11 +94,7 @@ pub struct PointsWriterBuilder {
|
|||
|
||||
#[derive(Debug)]
|
||||
enum PointsWriterConfig {
|
||||
Api {
|
||||
client: influxdb2_client::Client,
|
||||
org: String,
|
||||
bucket: String,
|
||||
},
|
||||
Api(influxdb2_client::Client),
|
||||
Directory(PathBuf),
|
||||
NoOp {
|
||||
perform_write: bool,
|
||||
|
@ -113,8 +109,6 @@ impl PointsWriterBuilder {
|
|||
/// specified org and bucket.
|
||||
pub async fn new_api(
|
||||
host: impl Into<String> + Send,
|
||||
org: impl Into<String> + Send,
|
||||
bucket: impl Into<String> + Send,
|
||||
token: impl Into<String> + Send,
|
||||
jaeger_debug: Option<&str>,
|
||||
) -> Result<Self> {
|
||||
|
@ -133,15 +127,9 @@ impl PointsWriterBuilder {
|
|||
if let Some(header) = jaeger_debug {
|
||||
client = client.with_jaeger_debug(header.to_string());
|
||||
}
|
||||
let org = org.into();
|
||||
let bucket = bucket.into();
|
||||
|
||||
Ok(Self {
|
||||
config: PointsWriterConfig::Api {
|
||||
client,
|
||||
org,
|
||||
bucket,
|
||||
},
|
||||
config: PointsWriterConfig::Api(client),
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -172,16 +160,17 @@ impl PointsWriterBuilder {
|
|||
|
||||
/// Create a writer out of this writer's configuration for a particular
|
||||
/// agent that runs in a separate thread/task.
|
||||
pub fn build_for_agent(&mut self, name: impl Into<String>) -> Result<PointsWriter> {
|
||||
pub fn build_for_agent(
|
||||
&mut self,
|
||||
name: impl Into<String>,
|
||||
org: impl Into<String>,
|
||||
bucket: impl Into<String>,
|
||||
) -> Result<PointsWriter> {
|
||||
let inner_writer = match &mut self.config {
|
||||
PointsWriterConfig::Api {
|
||||
client,
|
||||
org,
|
||||
bucket,
|
||||
} => InnerPointsWriter::Api {
|
||||
PointsWriterConfig::Api(client) => InnerPointsWriter::Api {
|
||||
client: client.clone(),
|
||||
org: org.clone(),
|
||||
bucket: bucket.clone(),
|
||||
org: org.into(),
|
||||
bucket: bucket.into(),
|
||||
},
|
||||
PointsWriterConfig::Directory(dir_path) => {
|
||||
let mut filename = dir_path.clone();
|
||||
|
@ -351,7 +340,12 @@ name = "cpu"
|
|||
|
||||
[[agents.measurements.fields]]
|
||||
name = "val"
|
||||
i64_range = [3,3]"#;
|
||||
i64_range = [3,3]
|
||||
|
||||
[[bucket_writers]]
|
||||
ratio = 1.0
|
||||
agents = [{name = "foo", sampling_interval = "1s"}]
|
||||
"#;
|
||||
|
||||
let data_spec = DataSpec::from_str(toml).unwrap();
|
||||
let mut points_writer_builder = PointsWriterBuilder::new_vec();
|
||||
|
@ -360,6 +354,10 @@ i64_range = [3,3]"#;
|
|||
|
||||
generate(
|
||||
&data_spec,
|
||||
vec![OrgBucket {
|
||||
org: "foo".to_string(),
|
||||
bucket: "bar".to_string(),
|
||||
}],
|
||||
&mut points_writer_builder,
|
||||
Some(now),
|
||||
Some(now),
|
||||
|
@ -389,14 +387,18 @@ name = "demo_schema"
|
|||
|
||||
[[agents]]
|
||||
name = "foo"
|
||||
sampling_interval = "1s" # seconds
|
||||
|
||||
[[agents.measurements]]
|
||||
name = "cpu"
|
||||
|
||||
[[agents.measurements.fields]]
|
||||
name = "val"
|
||||
i64_range = [2, 2]"#;
|
||||
i64_range = [2, 2]
|
||||
|
||||
[[bucket_writers]]
|
||||
ratio = 1.0
|
||||
agents = [{name = "foo", sampling_interval = "1s"}]
|
||||
"#;
|
||||
|
||||
let data_spec = DataSpec::from_str(toml).unwrap();
|
||||
let mut points_writer_builder = PointsWriterBuilder::new_vec();
|
||||
|
@ -405,6 +407,10 @@ i64_range = [2, 2]"#;
|
|||
|
||||
generate(
|
||||
&data_spec,
|
||||
vec![OrgBucket {
|
||||
org: "foo".to_string(),
|
||||
bucket: "bar".to_string(),
|
||||
}],
|
||||
&mut points_writer_builder,
|
||||
Some(now - 1_000_000_000),
|
||||
Some(now),
|
||||
|
|
|
@ -2,8 +2,6 @@ name = "example"
|
|||
|
||||
[[agents]]
|
||||
name = "foo"
|
||||
count = 3
|
||||
sampling_interval = "10s"
|
||||
|
||||
[[agents.measurements]]
|
||||
name = "cpu"
|
||||
|
@ -15,3 +13,6 @@ tag_pairs = [
|
|||
name = "usage_user"
|
||||
f64_range = [0.0, 100.0]
|
||||
|
||||
[[bucket_writers]]
|
||||
ratio = 1.0
|
||||
agents = [{name = "foo", count = 3, sampling_interval = "10s"}]
|
||||
|
|
Loading…
Reference in New Issue