feat: Write data to Parquet files from the data generator

pull/24376/head
Jake Goulding 2022-09-23 11:51:39 -04:00 committed by Carol (Nichols || Goulding)
parent 79afb6ef54
commit 7389fbe528
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
8 changed files with 212 additions and 41 deletions

5
Cargo.lock generated
View File

@ -2272,6 +2272,7 @@ dependencies = [
name = "iox_data_generator"
version = "0.1.0"
dependencies = [
"bytes",
"chrono",
"chrono-english",
"clap",
@ -2281,8 +2282,12 @@ dependencies = [
"humantime",
"influxdb2_client",
"itertools",
"mutable_batch",
"mutable_batch_lp",
"parquet_file",
"rand",
"regex",
"schema",
"serde",
"serde_json",
"snafu",

View File

@ -6,6 +6,7 @@ edition = "2021"
default-run = "iox_data_generator"
[dependencies]
bytes = "1.2"
chrono = { version = "0.4", default-features = false }
chrono-english = "0.1.4"
clap = { version = "3", features = ["derive", "env", "cargo"] }
@ -14,8 +15,12 @@ handlebars = "4.3.4"
humantime = "2.1.0"
influxdb2_client = { path = "../influxdb2_client" }
itertools = "0.10.5"
mutable_batch_lp = { path = "../mutable_batch_lp" }
mutable_batch = { path = "../mutable_batch" }
parquet_file = { path = "../parquet_file" }
rand = { version = "0.8.3", features = ["small_rng"] }
regex = "1.6"
schema = { path = "../schema" }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.83"
snafu = "0.7"

View File

@ -1,14 +1,17 @@
use criterion::{criterion_group, criterion_main, Criterion, Throughput};
use iox_data_generator::agent::Agent;
use iox_data_generator::specification::{AgentAssignmentSpec, DatabaseWriterSpec};
use iox_data_generator::{
specification::{AgentSpec, DataSpec, FieldSpec, FieldValueSpec, MeasurementSpec},
agent::Agent,
specification::{
AgentAssignmentSpec, AgentSpec, DataSpec, DatabaseWriterSpec, FieldSpec, FieldValueSpec,
MeasurementSpec,
},
tag_set::GeneratedTagSets,
write::PointsWriterBuilder,
};
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use std::time::Duration;
use std::{
sync::{atomic::AtomicU64, Arc},
time::Duration,
};
pub fn single_agent(c: &mut Criterion) {
let spec = DataSpec {
@ -74,19 +77,22 @@ pub fn single_agent(c: &mut Criterion) {
}
pub fn agent_pre_generated(c: &mut Criterion) {
let spec: DataSpec = toml::from_str(r#"
let spec: DataSpec = toml::from_str(
r#"
name = "storage_cardinality_example"
# Values are automatically generated before the agents are initialized. They generate tag key/value pairs
# with the name of the value as the tag key and the evaluated template as the value. These pairs
# are Arc wrapped so they can be shared across tagsets and used in the agents as pre-generated data.
# Values are automatically generated before the agents are initialized. They generate tag key/value
# pairs with the name of the value as the tag key and the evaluated template as the value. These
# pairs are Arc wrapped so they can be shared across tagsets and used in the agents as
# pre-generated data.
[[values]]
# the name must not have a . in it, which is used to access children later. Otherwise it's open.
name = "role"
# the template can use a number of helpers to get an id, a random string and the name, see below for examples
# the template can use a number of helpers to get an id, a random string and the name, see below
# for examples
template = "storage"
# this number of tag pairs will be generated. If this is > 1, the id or a random character string should be
# used in the template to ensure that the tag key/value pairs are unique.
# this number of tag pairs will be generated. If this is > 1, the id or a random character string
# should be used in the template to ensure that the tag key/value pairs are unique.
cardinality = 1
[[values]]
@ -108,10 +114,11 @@ cardinality = 10
[[values]]
name = "bucket_id"
# a bucket belongs to an org. With this, you would be able to access the org.id or org.value in the template
# a bucket belongs to an org. With this, you would be able to access the org.id or org.value in the
# template
belongs_to = "org_id"
# each bucket will have a unique id, which is used here to guarantee uniqueness even across orgs. We also
# have a random 15 character alphanumeric sequence to pad out the value length.
# each bucket will have a unique id, which is used here to guarantee uniqueness even across orgs.
# We also have a random 15 character alphanumeric sequence to pad out the value length.
template = "{{id}}_{{random 15}}"
# For each org, 3 buckets will be generated
cardinality = 3
@ -121,9 +128,10 @@ name = "partition_id"
template = "{{id}}"
cardinality = 10
# makes a tagset so every bucket appears in every partition. The other tags are descriptive and don't
# increase the cardinality beyond count(bucket) * count(partition). Later this example will use the
# agent and measurement generation to take this base tagset and increase cardinality on a per-agent basis.
# makes a tagset so every bucket appears in every partition. The other tags are descriptive and
# don't increase the cardinality beyond count(bucket) * count(partition). Later this example will
# use the agent and measurement generation to take this base tagset and increase cardinality on a
# per-agent basis.
[[tag_sets]]
name = "bucket_set"
for_each = [
@ -140,7 +148,8 @@ name = "foo"
[[agents.measurements]]
name = "storage_usage_bucket_cardinality"
# each sampling will have all the tag sets from this collection in addition to the tags and tag_pairs specified
# each sampling will have all the tag sets from this collection in addition to the tags and
# tag_pairs specified
tag_set = "bucket_set"
# for each agent, this specific measurement will be decorated with these additional tags.
tag_pairs = [
@ -155,7 +164,9 @@ i64_range = [1, 8147240]
[[database_writers]]
agents = [{name = "foo", sampling_interval = "1s", count = 3}]
"#).unwrap();
"#,
)
.unwrap();
let generated_tag_sets = GeneratedTagSets::from_spec(&spec).unwrap();

View File

@ -13,8 +13,10 @@
use chrono::prelude::*;
use chrono_english::{parse_date_string, Dialect};
use iox_data_generator::{specification::DataSpec, write::PointsWriterBuilder};
use std::fs::File;
use std::io::{self, BufRead};
use std::{
fs::File,
io::{self, BufRead},
};
use tracing::info;
#[derive(clap::Parser)]
@ -57,14 +59,19 @@ struct Config {
#[clap(long, action)]
print: bool,
/// Runs the generation with agents writing to a sink. Useful for quick stress test to see how much resources the generator will take
/// Runs the generation with agents writing to a sink. Useful for quick stress test to see how
/// much resources the generator will take
#[clap(long, action)]
noop: bool,
/// The filename to write line protocol
/// The directory to write line protocol to
#[clap(long, short, action)]
output: Option<String>,
/// The directory to write Parquet files to
#[clap(long, short, action)]
parquet: Option<String>,
/// The host name part of the API endpoint to write to
#[clap(long, short, action)]
host: Option<String>,
@ -105,7 +112,8 @@ struct Config {
#[clap(long = "continue", action)]
do_continue: bool,
/// Generate this many samplings to batch into a single API call. Good for sending a bunch of historical data in quickly if paired with a start time from long ago.
/// Generate this many samplings to batch into a single API call. Good for sending a bunch of
/// historical data in quickly if paired with a start time from long ago.
#[clap(long, action, default_value = "1")]
batch_size: usize,
@ -142,10 +150,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let data_spec = DataSpec::from_file(&config.specification)?;
// TODO: parquet output
let mut points_writer_builder = if let Some(line_protocol_filename) = config.output {
PointsWriterBuilder::new_file(line_protocol_filename)?
} else if let Some(parquet_directory) = config.parquet {
PointsWriterBuilder::new_parquet(parquet_directory)?
} else if let Some(ref host) = config.host {
let token = config.token.expect("--token must be specified");

View File

@ -31,9 +31,9 @@
use crate::{agent::Agent, tag_set::GeneratedTagSets};
use snafu::{ResultExt, Snafu};
use std::sync::{atomic::AtomicU64, Arc};
use std::{
convert::TryFrom,
sync::{atomic::AtomicU64, Arc},
time::{SystemTime, UNIX_EPOCH},
};
use tracing::info;
@ -154,7 +154,8 @@ pub async fn generate(
.context(CouldNotCreateAgentSnafu)?;
info!(
"Configuring {} agents of \"{}\" to write data to org {} and bucket {} (database {})",
"Configuring {} agents of \"{}\" to write data \
to org {} and bucket {} (database {})",
agent_assignment.count,
agent_assignment.spec.name,
org,
@ -171,7 +172,8 @@ pub async fn generate(
let total_rows = Arc::clone(&total_rows);
handles.push(tokio::task::spawn(async move {
// did this weird hack because otherwise the stdout outputs would be jumbled together garbage
// did this weird hack because otherwise the stdout outputs would be jumbled
// together garbage
if one_agent_at_a_time {
let _l = lock_ref.lock().await;
agent

View File

@ -89,8 +89,8 @@ impl DataSpec {
let mut start = 0;
// either all database writers must use regex or none of them can. It's either ratio or regex
// for assignment
// either all database writers must use regex or none of them can. It's either ratio or
// regex for assignment
let use_ratio = self.database_writers[0].database_regex.is_none();
for b in &self.database_writers {
if use_ratio && b.database_regex.is_some() {
@ -200,7 +200,8 @@ impl FromStr for DataSpec {
pub struct ValuesSpec {
/// The name of the collection of values
pub name: String,
/// If values not specified this handlebars template will be used to create each value in the collection
/// If values not specified this handlebars template will be used to create each value in the
/// collection
pub template: String,
/// How many of these values should be generated. If belongs_to is
/// specified, each parent will have this many of this value. So
@ -297,8 +298,8 @@ pub struct AgentSpec {
pub has_one: Vec<String>,
/// Specification of tag key/value pairs that get generated once and reused for
/// every sampling. Every measurement (and thus line) will have these tag pairs added onto it.
/// The template can use `{{agent.id}}` to reference the agent's id and `{{guid}}` or `{{random N}}`
/// to generate random strings.
/// The template can use `{{agent.id}}` to reference the agent's id and `{{guid}}` or
/// `{{random N}}` to generate random strings.
#[serde(default)]
pub tag_pairs: Vec<TagPairSpec>,
}
@ -675,7 +676,10 @@ agents = [{name = "foo", sampling_interval = "10s"}]
let field_spec = &a0m0f0.field_value_spec;
assert!(
matches!(field_spec, FieldValueSpec::String { replacements, .. } if replacements.is_empty()),
matches!(
field_spec,
FieldValueSpec::String { replacements, .. } if replacements.is_empty()
),
"expected a String field with empty replacements; was {:?}",
field_spec
);

View File

@ -1,8 +1,12 @@
//! Writing generated points
use crate::measurement::LineToGenerate;
use bytes::Bytes;
use futures::stream;
use influxdb2_client::models::WriteDataPoint;
use mutable_batch_lp::lines_to_batches;
use parquet_file::{metadata::IoxMetadata, serialize};
use schema::selection::Selection;
use snafu::{ensure, ResultExt, Snafu};
#[cfg(test)]
use std::{
@ -10,9 +14,8 @@ use std::{
sync::{Arc, Mutex},
};
use std::{
fs,
fs::{File, OpenOptions},
io::BufWriter,
fs::{self, File, OpenOptions},
io::{BufWriter, Write},
path::{Path, PathBuf},
};
@ -20,7 +23,7 @@ use std::{
#[derive(Snafu, Debug)]
pub enum Error {
/// Error that may happen when writing line protocol to a file
#[snafu(display("Could open line protocol file {}: {}", filename.display(), source))]
#[snafu(display("Couldn't open line protocol file {}: {}", filename.display(), source))]
CantOpenLineProtocolFile {
/// The location of the file we tried to open
filename: PathBuf,
@ -28,6 +31,15 @@ pub enum Error {
source: std::io::Error,
},
/// Error that may happen when writing Parquet to a file
#[snafu(display("Couldn't open Parquet file {}: {}", filename.display(), source))]
CantOpenParquetFile {
/// The location of the file we tried to open
filename: PathBuf,
/// Underlying IO error that caused this problem
source: std::io::Error,
},
/// Error that may happen when writing line protocol to a no-op sink
#[snafu(display("Could not generate line protocol: {}", source))]
CantWriteToNoOp {
@ -42,6 +54,34 @@ pub enum Error {
source: std::io::Error,
},
/// Error that may happen when writing line protocol to a Vec of bytes
#[snafu(display("Could not write to vec: {}", source))]
WriteToVec {
/// Underlying IO error that caused this problem
source: std::io::Error,
},
/// Error that may happen when writing Parquet to a file
#[snafu(display("Could not write Parquet: {}", source))]
WriteToParquetFile {
/// Underlying IO error that caused this problem
source: std::io::Error,
},
/// Error that may happen when converting line protocol to a mutable batch
#[snafu(display("Could not convert to a mutable batch: {}", source))]
ConvertToMutableBatch {
/// Underlying mutable_batch_lp error that caused this problem
source: mutable_batch_lp::Error,
},
/// Error that may happen when converting a mutable batch to an Arrow RecordBatch
#[snafu(display("Could not convert to a record batch: {}", source))]
ConvertToArrow {
/// Underlying mutable_batch error that caused this problem
source: mutable_batch::Error,
},
/// Error that may happen when creating a directory to store files to write
/// to
#[snafu(display("Could not create directory: {}", source))]
@ -81,6 +121,13 @@ pub enum Error {
/// specifying the org ID
#[snafu(display("Could not create a bucket without an `org_id`"))]
OrgIdRequiredToCreateBucket,
/// Error that may happen when serializing to Parquet
#[snafu(display("Could not serialize to Parquet"))]
ParquetSerialization {
/// Underlying `parquet_file` error that caused this problem
source: parquet_file::serialize::CodecError,
},
}
type Result<T, E = Error> = std::result::Result<T, E>;
@ -96,6 +143,7 @@ pub struct PointsWriterBuilder {
enum PointsWriterConfig {
Api(influxdb2_client::Client),
Directory(PathBuf),
ParquetFile(PathBuf),
NoOp {
perform_write: bool,
},
@ -144,6 +192,17 @@ impl PointsWriterBuilder {
})
}
/// Write points to a Parquet file in the directory specified.
pub fn new_parquet<P: AsRef<Path>>(path: P) -> Result<Self> {
fs::create_dir_all(&path).context(CantCreateDirectorySnafu)?;
let metadata = fs::metadata(&path).context(CantGetMetadataSnafu)?;
ensure!(metadata.is_dir(), MustBeDirectorySnafu);
Ok(Self {
config: PointsWriterConfig::ParquetFile(PathBuf::from(path.as_ref())),
})
}
/// Write points to stdout
pub fn new_std_out() -> Self {
Self {
@ -187,6 +246,12 @@ impl PointsWriterBuilder {
InnerPointsWriter::File { file }
}
PointsWriterConfig::ParquetFile(dir_path) => InnerPointsWriter::ParquetFile {
dir_path: dir_path.clone(),
agent_name: name.into(),
},
PointsWriterConfig::NoOp { perform_write } => InnerPointsWriter::NoOp {
perform_write: *perform_write,
},
@ -230,6 +295,10 @@ enum InnerPointsWriter {
File {
file: BufWriter<File>,
},
ParquetFile {
dir_path: PathBuf,
agent_name: String,
},
NoOp {
perform_write: bool,
},
@ -261,6 +330,52 @@ impl InnerPointsWriter {
.context(CantWriteToLineProtocolFileSnafu)?;
}
}
Self::ParquetFile {
dir_path,
agent_name,
} => {
let mut raw_line_protocol = Vec::new();
for point in points {
point
.write_data_point_to(&mut raw_line_protocol)
.context(WriteToVecSnafu)?;
}
let line_protocol = String::from_utf8(raw_line_protocol)
.expect("Generator should be creating valid UTF-8");
let batches_by_measurement =
lines_to_batches(&line_protocol, 0).context(ConvertToMutableBatchSnafu)?;
for (measurement, batch) in batches_by_measurement {
let record_batch = batch
.to_arrow(Selection::All)
.context(ConvertToArrowSnafu)?;
let stream = futures::stream::iter([Ok(record_batch)]);
let meta = IoxMetadata::external(crate::now_ns(), &*measurement);
let (data, _parquet_file_meta) = serialize::to_parquet_bytes(stream, &meta)
.await
.context(ParquetSerializationSnafu)?;
let data = Bytes::from(data);
let mut filename = dir_path.clone();
filename.push(format!("{agent_name}_{measurement}"));
filename.set_extension("parquet");
let file = OpenOptions::new()
.create(true)
.write(true)
.open(&filename)
.context(CantOpenParquetFileSnafu { filename })?;
let mut file = BufWriter::new(file);
file.write_all(&data).context(WriteToParquetFileSnafu)?;
}
}
Self::NoOp { perform_write } => {
if *perform_write {
let mut sink = std::io::sink();

View File

@ -394,6 +394,27 @@ impl IoxMetadata {
})
}
/// Generate metadata for a file generated from some process other than IOx ingesting.
///
/// This metadata will not have valid catalog values; inserting files with this metadata into
/// the catalog should get valid values out-of-band.
pub fn external(creation_timestamp_ns: i64, table_name: impl Into<Arc<str>>) -> Self {
Self {
object_store_id: Default::default(),
creation_timestamp: Time::from_timestamp_nanos(creation_timestamp_ns),
namespace_id: NamespaceId::new(1),
namespace_name: "external".into(),
shard_id: ShardId::new(1),
table_id: TableId::new(1),
table_name: table_name.into(),
partition_id: PartitionId::new(1),
partition_key: "unknown".into(),
max_sequence_number: SequenceNumber::new(1),
compaction_level: CompactionLevel::Initial,
sort_key: None,
}
}
/// verify uuid
pub fn match_object_store_id(&self, uuid: Uuid) -> bool {
uuid == self.object_store_id