feat: add ability for data generator to write to many buckets

This adds the ability for the data generator to write to many databases. A new command line argument, `bucket_list`, is added which should be a file name. The file should contain a list of databsaes, one per line, with the structure of <org>_<bucket>. This is a little odd given the data generator expects org and bucket separately, but I expect the file that we'll be using will be database names, which have this format.

The configuration can specify what percentage of the list should get written to by which agents at what sampling interval. This should allow configurations where databases get different levels of ingest and different types (as specified via different agent specs). The structure is a little wonky, but I think it'll get the job done. The next step is to run some perf tests to see how the data generator performs if writing to 10k databases.
pull/24376/head
Paul Dix 2021-12-07 16:54:40 -05:00
parent 35edee6b4f
commit 31aa41e240
13 changed files with 451 additions and 147 deletions

View File

@ -1,10 +1,12 @@
use criterion::{criterion_group, criterion_main, Criterion, Throughput};
use iox_data_generator::agent::Agent;
use iox_data_generator::specification::{AgentAssignmentSpec, BucketWriterSpec, OrgBucket};
use iox_data_generator::{
specification::{AgentSpec, DataSpec, FieldSpec, FieldValueSpec, MeasurementSpec},
tag_set::GeneratedTagSets,
write::PointsWriterBuilder,
};
use std::time::Duration;
pub fn single_agent(c: &mut Criterion) {
let spec = DataSpec {
@ -13,8 +15,6 @@ pub fn single_agent(c: &mut Criterion) {
tag_sets: vec![],
agents: vec![AgentSpec {
name: "foo".to_string(),
count: None,
sampling_interval: Some("1s".to_string()),
measurements: vec![MeasurementSpec {
name: "measurement-1".into(),
count: None,
@ -29,6 +29,14 @@ pub fn single_agent(c: &mut Criterion) {
has_one: vec![],
tag_pairs: vec![],
}],
bucket_writers: vec![BucketWriterSpec {
percent: 1.0,
agents: vec![AgentAssignmentSpec {
name: "foo".to_string(),
count: None,
sampling_interval: "1s".to_string(),
}],
}],
};
let mut points_writer = PointsWriterBuilder::new_no_op(true);
@ -47,6 +55,10 @@ pub fn single_agent(c: &mut Criterion) {
b.iter(|| {
let r = block_on(iox_data_generator::generate(
&spec,
vec![OrgBucket {
org: "foo".to_string(),
bucket: "bar".to_string(),
}],
&mut points_writer,
start_datetime,
end_datetime,
@ -125,8 +137,6 @@ for_each = [
[[agents]]
name = "foo"
# create this many agents
count = 3
[[agents.measurements]]
name = "storage_usage_bucket_cardinality"
@ -142,6 +152,10 @@ tag_pairs = [
[[agents.measurements.fields]]
name = "gauge"
i64_range = [1, 8147240]
[[bucket_writers]]
percent = 1.0
agents = [{name = "foo", sampling_interval = "1s", count = 3}]
"#).unwrap();
let generated_tag_sets = GeneratedTagSets::from_spec(&spec).unwrap();
@ -155,6 +169,8 @@ i64_range = [1, 8147240]
let mut agents = Agent::from_spec(
&spec.agents[0],
3,
Duration::from_millis(10),
start_datetime,
end_datetime,
0,
@ -172,7 +188,7 @@ i64_range = [1, 8147240]
group.bench_function("single agent with basic configuration", |b| {
b.iter(|| {
agent.reset_current_date_time(0);
let points_writer = points_writer.build_for_agent("foo").unwrap();
let points_writer = points_writer.build_for_agent("foo", "foo", "foo").unwrap();
let r = block_on(agent.generate_all(points_writer, 1));
let n_points = r.expect("Could not generate data");
assert_eq!(n_points, expected_points as usize);

View File

@ -2,10 +2,12 @@
# https://github.com/influxdata/idpe/tree/e493a8e9b6b773e9374a8542ddcab7d8174d320d/performance/capacity/write
name = "cap_write"
[[bucket_writers]]
percent = 1.0
agents = [{name = "telegraf", count = 3, sampling_interval = "10s"}]
[[agents]]
name = "telegraf"
count = 3
sampling_interval = "10s"
tag_pairs = [
{key = "host", template = "host-{{agent.id}}"}
]

View File

@ -103,14 +103,10 @@ for_each = [
"foo_bar"
]
# Agent specs can be referenced later on by bucket writers, which specify how frequently
# data should be written and by how many different agents.
[[agents]]
name = "first_agent"
# Create this many agents. Agents are single threaded so the way to get more paralellism is to
# increase the number of agents.
count = 2
# each agent will generate a sampling of data every 10ms. If running from a historical point in
# time, the agents will push data as quickly as they can until they catch up to now.
sampling_interval = "10ms"
# if specifying tag_pairs at the agent level, every line that the agent generates will have these
# tag pairs added to it. Note that the template has the same helpers as those in value (except for id).
# In addition, it has an accessor for the agent id.
@ -172,3 +168,21 @@ count = 2
[[agents.measurements.fields]]
name = "f1"
bool = true
# bucket_writers specify how to split up the list of supplied buckets to write to. If
# only a single one is specified via the CLI flags, then you'd want only a single bucket_writer
# with a percent of 1.0.
#
# These make it possible to split up a large list of buckets to write to and send different
# amounts of write load as well as different schemas through specifying different agents.
[[bucket_writers]]
# 20% of the buckets specified in the --bucket_list file will have these agents writing to them
percent = 0.2
# for each of those buckets, have 3 of the first_agent writing every 10s, and 1 of the another_example writing every minute.
agents = [{name = "first_agent", count = 3, sampling_interval = "10s"}, {name = "another_example", sampling_interval = "1m"}]
[[bucket_writers]]
# the remaining 80% of the buckeets specified will write using these agents
percent = 0.8
# we'll only have a single agent of another_example for each database
agents = [{name = "another_example", sampling_interval = "1s"}]

View File

@ -58,11 +58,12 @@ for_each = [
"partition_id",
]
[[bucket_writers]]
percent = 1.0
agents = [{name = "sender", sampling_interval = "10s"}]
[[agents]]
name = "sender"
# create this many agents
count = 1
sampling_interval = "10s"
[[agents.measurements]]
name = "storage_usage_bucket_cardinality"

View File

@ -17,8 +17,6 @@ for_each = ["host", "host.service"]
[[agents]]
name = "tracing_agent"
count = 1
sampling_interval = "1s"
[[agents.measurements]]
name = "traces"
@ -31,3 +29,7 @@ tag_pairs = [
[[agents.measurements.fields]]
name = "timing"
f64_range = [0.0, 500.0]
[[bucket_writers]]
percent = 1.0
agents = [{name = "tracing_agent", sampling_interval = "1s"}]

View File

@ -8,7 +8,6 @@ use crate::{
};
use crate::tag_set::GeneratedTagSets;
use humantime::parse_duration;
use serde_json::json;
use snafu::{ResultExt, Snafu};
use std::time::{Duration, Instant};
@ -39,12 +38,6 @@ pub enum Error {
source: crate::write::Error,
},
#[snafu(display("Sampling interval must be valid string: {}", source))]
InvalidSamplingInterval {
/// Underlying `parse` error
source: humantime::DurationError,
},
#[snafu(display("Error creating agent tag pairs: {}", source))]
CouldNotCreateAgentTagPairs { source: crate::tag_pair::Error },
}
@ -77,17 +70,18 @@ pub struct Agent {
impl Agent {
/// Create agents that will generate data points according to these
/// specs.
#[allow(clippy::too_many_arguments)]
pub fn from_spec(
agent_spec: &specification::AgentSpec,
count: usize,
sampling_interval: Duration,
start_datetime: Option<i64>, // in nanoseconds since the epoch, defaults to now
end_datetime: Option<i64>, // also in nanoseconds since the epoch, defaults to now
execution_start_time: i64,
continue_on: bool, // If true, run in "continue" mode after historical data is generated
generated_tag_sets: &GeneratedTagSets,
) -> Result<Vec<Self>> {
let agent_count = agent_spec.count.unwrap_or(1);
let agents: Vec<_> = (1..agent_count + 1)
let agents: Vec<_> = (1..count + 1)
.into_iter()
.map(|agent_id| {
let data = json!({"agent": {"id": agent_id, "name": agent_spec.name}});
@ -114,17 +108,11 @@ impl Agent {
let current_datetime = start_datetime.unwrap_or_else(now_ns);
let end_datetime = end_datetime.unwrap_or_else(now_ns);
// Convert to nanoseconds
let sampling_interval = match &agent_spec.sampling_interval {
None => None,
Some(s) => Some(parse_duration(s).context(InvalidSamplingInterval)?),
};
Ok(Self {
id: agent_id,
name: agent_spec.name.to_string(),
measurement_generators,
sampling_interval,
sampling_interval: Some(sampling_interval),
current_datetime,
end_datetime,
continue_on,

View File

@ -13,7 +13,12 @@
use chrono::prelude::*;
use chrono_english::{parse_date_string, Dialect};
use clap::{crate_authors, crate_version, App, Arg};
use iox_data_generator::{specification::DataSpec, write::PointsWriterBuilder};
use iox_data_generator::{
specification::{DataSpec, OrgBucket},
write::PointsWriterBuilder,
};
use std::fs::File;
use std::io::{self, BufRead};
use tracing::info;
#[tokio::main]
@ -91,6 +96,12 @@ Logging:
.help("The bucket name to write to")
.takes_value(true),
)
.arg(
Arg::with_name("BUCKET_LIST")
.long("bucket_list")
.help("File name with a list of databases. 1 per line with <org>_<bucket> format")
.takes_value(true),
)
.arg(
Arg::with_name("TOKEN")
.long("token")
@ -176,21 +187,11 @@ Logging:
{
PointsWriterBuilder::new_file(line_protocol_filename)?
} else if let Some(host) = matches.value_of("HOST") {
let (host, org, bucket, token) = validate_api_arguments(
host,
matches.value_of("ORG"),
matches.value_of("BUCKET"),
matches.value_of("TOKEN"),
);
let token = matches
.value_of("TOKEN")
.expect("--token must be specified");
PointsWriterBuilder::new_api(
host,
org,
bucket,
token,
matches.value_of("jaeger_debug_header"),
)
.await?
PointsWriterBuilder::new_api(host, token, matches.value_of("jaeger_debug_header")).await?
} else if matches.is_present("PRINT") {
PointsWriterBuilder::new_std_out()
} else if matches.is_present("NOOP") {
@ -199,8 +200,42 @@ Logging:
panic!("One of --print or --output or --host must be provided.");
};
let buckets = match (
matches.value_of("ORG"),
matches.value_of("BUCKET"),
matches.value_of("BUCKET_LIST"),
) {
(Some(org), Some(bucket), None) => {
vec![OrgBucket {
org: org.to_string(),
bucket: bucket.to_string(),
}]
}
(None, None, Some(bucket_list)) => {
let f = File::open(bucket_list).expect("unable to open bucket_list file");
io::BufReader::new(f)
.lines()
.map(|l| {
let l = l.expect("unable to read line from bucket list");
let parts = l.split('_').collect::<Vec<_>>();
if parts.len() != 2 {
panic!("error parsing org and bucket from {}", l);
}
let org = parts[0].to_string();
let bucket = parts[1].to_string();
OrgBucket { org, bucket }
})
.collect::<Vec<_>>()
}
_ => panic!("must specify either --org AND --bucket OR --bucket_list"),
};
let result = iox_data_generator::generate(
&data_spec,
buckets,
&mut points_writer_builder,
start_datetime,
end_datetime,
@ -230,36 +265,6 @@ fn datetime_nanoseconds(arg: Option<&str>, now: DateTime<Local>) -> Option<i64>
})
}
fn validate_api_arguments<'a>(
host: &'a str,
org: Option<&'a str>,
bucket: Option<&'a str>,
token: Option<&'a str>,
) -> (&'a str, &'a str, &'a str, &'a str) {
let mut errors = vec![];
if org.is_none() {
errors.push("`--org` is missing");
}
if bucket.is_none() {
errors.push("`--bucket` is missing");
}
if token.is_none() {
errors.push("`--token` is missing");
}
if errors.is_empty() {
// These `unwrap`s are safe because otherwise errors wouldn't be empty
(host, org.unwrap(), bucket.unwrap(), token.unwrap())
} else {
panic!(
"When `--host` is specified, `--org`, `--bucket`, and `--token` are required, \
but {}",
errors.join(", ")
);
}
}
#[cfg(test)]
mod test {
use super::*;

View File

@ -89,6 +89,16 @@ pub enum Error {
/// Underlying `tag_set` module error
source: tag_set::Error,
},
/// Error splitting input buckets to agents that write to them
#[snafu(display(
"Error splitting input buckets into agents that write to them: {}",
source
))]
CouldNotAssignAgents {
/// Underlying `specification` module error
source: specification::Error,
},
}
type Result<T, E = Error> = std::result::Result<T, E>;
@ -102,6 +112,7 @@ type Result<T, E = Error> = std::result::Result<T, E>;
#[allow(clippy::too_many_arguments)]
pub async fn generate(
spec: &specification::DataSpec,
buckets: Vec<specification::OrgBucket>,
points_writer_builder: &mut write::PointsWriterBuilder,
start_datetime: Option<i64>,
end_datetime: Option<i64>,
@ -112,45 +123,60 @@ pub async fn generate(
) -> Result<usize> {
let mut handles = vec![];
let bucket_agents = spec
.bucket_split_to_agents(&buckets)
.context(CouldNotAssignAgents)?;
let generated_tag_sets = GeneratedTagSets::from_spec(spec).context(CouldNotGenerateTagSets)?;
let lock = Arc::new(tokio::sync::Mutex::new(()));
let start = std::time::Instant::now();
let agents = spec
.agents
.iter()
.map(|spec| {
Agent::from_spec(
spec,
for bucket_agents in &bucket_agents {
for agent_assignment in bucket_agents.agent_assignments.iter() {
let agents = Agent::from_spec(
agent_assignment.spec,
agent_assignment.count,
agent_assignment.sampling_interval,
start_datetime,
end_datetime,
execution_start_time,
continue_on,
&generated_tag_sets,
)
.context(CouldNotCreateAgent)
})
.collect::<Result<Vec<Vec<Agent>>>>()?;
let agents = agents.into_iter().flatten();
.context(CouldNotCreateAgent)?;
for mut agent in agents {
let agent_points_writer = points_writer_builder
.build_for_agent(&agent.name)
.context(CouldNotCreateAgentWriter)?;
info!(
"Configuring {} agents of \"{}\" to write data to org {} and bucket {}",
agent_assignment.count,
agent_assignment.spec.name,
bucket_agents.org_bucket.org,
bucket_agents.org_bucket.bucket
);
let lock_ref = Arc::clone(&lock);
for mut agent in agents.into_iter() {
let agent_points_writer = points_writer_builder
.build_for_agent(
&agent_assignment.spec.name,
&bucket_agents.org_bucket.org,
&bucket_agents.org_bucket.bucket,
)
.context(CouldNotCreateAgentWriter)?;
handles.push(tokio::task::spawn(async move {
// did this weird hack because otherwise the stdout outputs would be jumbled together garbage
if one_agent_at_a_time {
let _l = lock_ref.lock().await;
agent.generate_all(agent_points_writer, batch_size).await
} else {
agent.generate_all(agent_points_writer, batch_size).await
let lock_ref = Arc::clone(&lock);
handles.push(tokio::task::spawn(async move {
// did this weird hack because otherwise the stdout outputs would be jumbled together garbage
if one_agent_at_a_time {
let _l = lock_ref.lock().await;
agent.generate_all(agent_points_writer, batch_size).await
} else {
agent.generate_all(agent_points_writer, batch_size).await
}
}));
}
}));
}
}
let mut total_points = 0;
@ -190,6 +216,7 @@ mod test {
use crate::specification::*;
use influxdb2_client::models::WriteDataPoint;
use std::str::FromStr;
use std::time::Duration;
type Error = Box<dyn std::error::Error>;
type Result<T = (), E = Error> = std::result::Result<T, E>;
@ -201,14 +228,18 @@ name = "demo_schema"
[[agents]]
name = "foo"
sampling_interval = "10s" # seconds
[[agents.measurements]]
name = "cpu"
[[agents.measurements.fields]]
name = "val"
i64_range = [1, 1]"#;
i64_range = [1, 1]
[[bucket_writers]]
percent = 1.0
agents = [{name = "foo", sampling_interval = "10s"}]
"#;
let data_spec = DataSpec::from_str(toml).unwrap();
let agent_spec = &data_spec.agents[0];
@ -224,6 +255,8 @@ i64_range = [1, 1]"#;
let mut agent = agent::Agent::from_spec(
agent_spec,
1,
Duration::from_secs(10),
start_datetime,
end_datetime,
execution_start_time,

View File

@ -545,7 +545,11 @@ mod test {
[[agents.measurements.fields]]
name = "val"
i64_range = [3, 3]"#,
i64_range = [3, 3]
[[bucket_writers]]
percent = 1.0
agents = [{name = "foo", sampling_interval = "10s"}]"#,
)
.unwrap();
@ -604,7 +608,11 @@ mod test {
[[agents.measurements.fields]]
name = "val"
i64_range = [3, 3]"#,
i64_range = [3, 3]
[[bucket_writers]]
percent = 1.0
agents = [{name = "foo", sampling_interval = "10s"}]"#,
)
.unwrap();

View File

@ -1,11 +1,14 @@
//! Reading and interpreting data generation specifications.
use humantime::parse_duration;
use serde::Deserialize;
use snafu::{ResultExt, Snafu};
use std::{fs, ops::Range, str::FromStr};
use snafu::{OptionExt, ResultExt, Snafu};
use std::{fs, ops::Range, str::FromStr, sync::Arc, time::Duration};
use tracing::warn;
/// Errors that may happen while reading a TOML specification.
#[derive(Snafu, Debug)]
#[allow(missing_docs)]
pub enum Error {
/// File-related error that may happen while reading a specification
#[snafu(display(r#"Error reading data spec from TOML file: {}"#, source))]
@ -20,6 +23,15 @@ pub enum Error {
/// Underlying TOML error that caused this problem
source: toml::de::Error,
},
#[snafu(display("Sampling interval must be valid string: {}", source))]
InvalidSamplingInterval { source: humantime::DurationError },
#[snafu(display(
"Agent {} referenced in bucket_writers, but not present in spec",
agent
))]
AgentNotFound { agent: String },
}
type Result<T, E = Error> = std::result::Result<T, E>;
@ -43,8 +55,10 @@ pub struct DataSpec {
/// set of tags that appear.
#[serde(default)]
pub tag_sets: Vec<TagSetsSpec>,
/// The specification for the data-generating agents in this data set.
/// The specification for the agents that can be used to write data to databases.
pub agents: Vec<AgentSpec>,
/// The specification for writing to the provided list of buckets.
pub bucket_writers: Vec<BucketWriterSpec>,
}
impl DataSpec {
@ -53,6 +67,111 @@ impl DataSpec {
let spec_toml = fs::read_to_string(file_name).context(ReadFile)?;
Self::from_str(&spec_toml)
}
/// Given a collection of OrgBuckets, assign each a set of agents based on the spec
pub fn bucket_split_to_agents<'a>(
&'a self,
buckets: &'a [OrgBucket],
) -> Result<Vec<BucketAgents<'a>>> {
let mut bucket_agents = Vec::with_capacity(buckets.len());
let mut start = 0;
for w in &self.bucket_writers {
if start >= buckets.len() {
warn!(
"Bucket_writers percentages > 1.0. Writer {:?} and later skipped.",
w
);
break;
}
// validate the agents actually exist in the spec
for a in &w.agents {
let _ = self.agent_by_name(&a.name).unwrap_or_else(|| {
panic!(
"agent {} referenced in bucket writers, but isn't in the spec",
a.name
)
});
}
let agents: Vec<_> = w
.agents
.iter()
.map(|a| {
let count = a.count.unwrap_or(1);
let sampling_interval =
parse_duration(&a.sampling_interval).context(InvalidSamplingInterval)?;
let spec = self
.agent_by_name(&a.name)
.context(AgentNotFound { agent: &a.name })?;
Ok(AgentAssignment {
spec,
count,
sampling_interval,
})
})
.collect::<Result<Vec<_>>>()?;
let agents = Arc::new(agents);
let mut end = (buckets.len() as f64 * w.percent).ceil() as usize + start;
if end > buckets.len() {
end = buckets.len();
}
for org_bucket in &buckets[start..end] {
bucket_agents.push(BucketAgents {
org_bucket,
agent_assignments: Arc::clone(&agents),
})
}
start = end;
}
Ok(bucket_agents)
}
/// Get the agent spec by its name
pub fn agent_by_name(&self, name: &str) -> Option<&AgentSpec> {
for a in &self.agents {
if a.name == name {
return Some(a);
}
}
None
}
}
#[derive(Debug, Eq, PartialEq)]
/// Specifies an org and a bucket to write data to
pub struct OrgBucket {
/// The organization name
pub org: String,
/// The bucket name
pub bucket: String,
}
#[derive(Debug)]
/// Assignment info for an agent to a bucket
pub struct AgentAssignment<'a> {
/// The agent specification for writing to the assigned bucket
pub spec: &'a AgentSpec,
/// The number of these agents that should be writing to the bucket
pub count: usize,
/// The sampling interval agents will generate data on
pub sampling_interval: Duration,
}
#[derive(Debug)]
/// Agent assignments mapped to a bucket
pub struct BucketAgents<'a> {
/// The organization and bucket data will get written to
pub org_bucket: &'a OrgBucket,
/// The agents specifications that will be writing to the bucket
pub agent_assignments: Arc<Vec<AgentAssignment<'a>>>,
}
impl FromStr for DataSpec {
@ -110,6 +229,33 @@ pub struct TagSetsSpec {
pub for_each: Vec<String>,
}
/// The specification for what should be written to the list of provided org buckets.
/// Buckets will be written to by one or more agents with the given sampling interval and
/// agent count.
#[derive(Deserialize, Debug)]
#[serde(deny_unknown_fields)]
pub struct BucketWriterSpec {
/// The percentage of buckets from the provided list that should use these agents. The
/// percent should add up to 1.0 (100%).
pub percent: f64,
/// The agents that should be used to write to these databases.
pub agents: Vec<AgentAssignmentSpec>,
}
/// The specification for the specific configuration of how an agent should write to a database.
#[derive(Deserialize, Debug, Clone)]
#[serde(deny_unknown_fields)]
pub struct AgentAssignmentSpec {
/// The name of the `AgentSpec` to use
pub name: String,
/// The number of these agents that should write to the database
pub count: Option<usize>,
/// How frequently each agent will write to the database. This is applicable when using the
/// --continue flag. Otherwise, if doing historical backfill, timestamps of generated data
/// will be this far apart and data will be written in as quickly as possible.
pub sampling_interval: String,
}
/// The specification of the behavior of an agent, the entity responsible for
/// generating a number of data points according to its configuration.
#[derive(Deserialize, Debug)]
@ -118,12 +264,6 @@ pub struct TagSetsSpec {
pub struct AgentSpec {
/// The name of the agent, which can be referenced in templates with `agent.name`.
pub name: String,
/// Specifies the number of agents that should be created with this spec.
/// Default value is 1.
pub count: Option<usize>,
/// How often this agent should generate samples, in a duration string. If
/// not specified, this agent will only generate one sample.
pub sampling_interval: Option<String>,
/// The specifications for the measurements for the agent to generate.
pub measurements: Vec<MeasurementSpec>,
/// A collection of strings that reference other `Values` collections. Each agent will have one
@ -493,6 +633,10 @@ name = "cpu"
[[agents.measurements.fields]]
name = "host"
template = "server"
[[bucket_writers]]
percent = 1.0
agents = [{name = "foo", sampling_interval = "10s"}]
"#;
let spec = DataSpec::from_str(toml).unwrap();
@ -513,4 +657,79 @@ template = "server"
field_spec
);
}
#[test]
fn split_buckets_by_writer_spec() {
let toml = r#"
name = "demo_schema"
[[agents]]
name = "foo"
[[agents.measurements]]
name = "cpu"
[[agents.measurements.fields]]
name = "host"
template = "server"
[[agents]]
name = "bar"
[[agents.measurements]]
name = "whatevs"
[[agents.measurements.fields]]
name = "val"
i64_range = [0, 10]
[[bucket_writers]]
percent = 0.6
agents = [{name = "foo", sampling_interval = "10s"}]
[[bucket_writers]]
percent = 0.4
agents = [{name = "bar", sampling_interval = "1m", count = 3}]
"#;
let spec = DataSpec::from_str(toml).unwrap();
let buckets = vec![
OrgBucket {
org: "a".to_string(),
bucket: "1".to_string(),
},
OrgBucket {
org: "a".to_string(),
bucket: "2".to_string(),
},
OrgBucket {
org: "b".to_string(),
bucket: "1".to_string(),
},
];
let bucket_agents = spec.bucket_split_to_agents(&buckets).unwrap();
let b = &bucket_agents[0];
assert_eq!(b.org_bucket, &buckets[0]);
assert_eq!(
b.agent_assignments[0].sampling_interval,
Duration::from_secs(10)
);
assert_eq!(b.agent_assignments[0].count, 1);
assert_eq!(b.agent_assignments[0].spec.name, "foo");
let b = &bucket_agents[1];
assert_eq!(b.org_bucket, &buckets[1]);
assert_eq!(
b.agent_assignments[0].sampling_interval,
Duration::from_secs(10)
);
assert_eq!(b.agent_assignments[0].count, 1);
assert_eq!(b.agent_assignments[0].spec.name, "foo");
let b = &bucket_agents[2];
assert_eq!(b.org_bucket, &buckets[2]);
assert_eq!(
b.agent_assignments[0].sampling_interval,
Duration::from_secs(60)
);
assert_eq!(b.agent_assignments[0].count, 3);
assert_eq!(b.agent_assignments[0].spec.name, "bar");
}
}

View File

@ -492,7 +492,10 @@ name = "cpu"
[[agents.measurements.fields]]
name = "f1"
i64_range = [0, 23]
"#;
[[bucket_writers]]
percent = 1.0
agents = [{name = "foo", sampling_interval = "10s"}]"#;
let spec = DataSpec::from_str(toml).unwrap();
let tag_sets = GeneratedTagSets::from_spec(&spec).unwrap();
@ -537,7 +540,10 @@ name = "cpu"
[[agents.measurements.fields]]
name = "f1"
i64_range = [0, 23]
"#;
[[bucket_writers]]
percent = 1.0
agents = [{name = "foo", sampling_interval = "10s"}]"#;
let spec = DataSpec::from_str(toml).unwrap();
let tag_sets = GeneratedTagSets::from_spec(&spec).unwrap();
@ -603,7 +609,10 @@ name = "cpu"
[[agents.measurements.fields]]
name = "f1"
i64_range = [0, 23]
"#;
[[bucket_writers]]
percent = 1.0
agents = [{name = "foo", sampling_interval = "10s"}]"#;
let spec = DataSpec::from_str(toml).unwrap();
let tag_sets = GeneratedTagSets::from_spec(&spec).unwrap();

View File

@ -94,11 +94,7 @@ pub struct PointsWriterBuilder {
#[derive(Debug)]
enum PointsWriterConfig {
Api {
client: influxdb2_client::Client,
org: String,
bucket: String,
},
Api(influxdb2_client::Client),
Directory(PathBuf),
NoOp {
perform_write: bool,
@ -113,8 +109,6 @@ impl PointsWriterBuilder {
/// specified org and bucket.
pub async fn new_api(
host: impl Into<String> + Send,
org: impl Into<String> + Send,
bucket: impl Into<String> + Send,
token: impl Into<String> + Send,
jaeger_debug: Option<&str>,
) -> Result<Self> {
@ -133,15 +127,9 @@ impl PointsWriterBuilder {
if let Some(header) = jaeger_debug {
client = client.with_jaeger_debug(header.to_string());
}
let org = org.into();
let bucket = bucket.into();
Ok(Self {
config: PointsWriterConfig::Api {
client,
org,
bucket,
},
config: PointsWriterConfig::Api(client),
})
}
@ -172,16 +160,17 @@ impl PointsWriterBuilder {
/// Create a writer out of this writer's configuration for a particular
/// agent that runs in a separate thread/task.
pub fn build_for_agent(&mut self, name: impl Into<String>) -> Result<PointsWriter> {
pub fn build_for_agent(
&mut self,
name: impl Into<String>,
org: impl Into<String>,
bucket: impl Into<String>,
) -> Result<PointsWriter> {
let inner_writer = match &mut self.config {
PointsWriterConfig::Api {
client,
org,
bucket,
} => InnerPointsWriter::Api {
PointsWriterConfig::Api(client) => InnerPointsWriter::Api {
client: client.clone(),
org: org.clone(),
bucket: bucket.clone(),
org: org.into(),
bucket: bucket.into(),
},
PointsWriterConfig::Directory(dir_path) => {
let mut filename = dir_path.clone();
@ -351,7 +340,12 @@ name = "cpu"
[[agents.measurements.fields]]
name = "val"
i64_range = [3,3]"#;
i64_range = [3,3]
[[bucket_writers]]
percent = 1.0
agents = [{name = "foo", sampling_interval = "1s"}]
"#;
let data_spec = DataSpec::from_str(toml).unwrap();
let mut points_writer_builder = PointsWriterBuilder::new_vec();
@ -360,6 +354,10 @@ i64_range = [3,3]"#;
generate(
&data_spec,
vec![OrgBucket {
org: "foo".to_string(),
bucket: "bar".to_string(),
}],
&mut points_writer_builder,
Some(now),
Some(now),
@ -389,14 +387,18 @@ name = "demo_schema"
[[agents]]
name = "foo"
sampling_interval = "1s" # seconds
[[agents.measurements]]
name = "cpu"
[[agents.measurements.fields]]
name = "val"
i64_range = [2, 2]"#;
i64_range = [2, 2]
[[bucket_writers]]
percent = 1.0
agents = [{name = "foo", sampling_interval = "1s"}]
"#;
let data_spec = DataSpec::from_str(toml).unwrap();
let mut points_writer_builder = PointsWriterBuilder::new_vec();
@ -405,6 +407,10 @@ i64_range = [2, 2]"#;
generate(
&data_spec,
vec![OrgBucket {
org: "foo".to_string(),
bucket: "bar".to_string(),
}],
&mut points_writer_builder,
Some(now - 1_000_000_000),
Some(now),

View File

@ -2,8 +2,6 @@ name = "example"
[[agents]]
name = "foo"
count = 3
sampling_interval = "10s"
[[agents.measurements]]
name = "cpu"
@ -15,3 +13,6 @@ tag_pairs = [
name = "usage_user"
f64_range = [0.0, 100.0]
[[bucket_writers]]
percent = 1.0
agents = [{name = "foo", count = 3, sampling_interval = "10s"}]