feat(influx_tools): Add support for describing schema via a TOML file

The `influx_tools generate` command has a new option, `-schema`, for
specifying a TOML file which describes a desired schema.
pull/12211/head
Stuart Carnie 2019-02-05 14:31:54 -07:00
parent 926fa7b079
commit 9b7ffd36ad
35 changed files with 3650 additions and 188 deletions

9
Gopkg.lock generated
View File

@ -812,6 +812,14 @@
revision = "c6db9435477f3cb658e2dd0fa93e02118c870251"
version = "v0.2.0"
[[projects]]
digest = "1:08d65904057412fc0270fc4812a1c90c594186819243160dc779a402d4b6d0bc"
name = "github.com/spf13/cast"
packages = ["."]
pruneopts = "UT"
revision = "8c9545af88b134710ab1cd196795e7f2388358d7"
version = "v1.3.0"
[[projects]]
digest = "1:eaa6698f44de8f2977e93c9b946e60a8af75f565058658aad2df8032b55c84e5"
name = "github.com/tinylib/msgp"
@ -1122,6 +1130,7 @@
"github.com/prometheus/client_golang/prometheus",
"github.com/prometheus/client_golang/prometheus/promhttp",
"github.com/retailnext/hllpp",
"github.com/spf13/cast",
"github.com/tinylib/msgp/msgp",
"github.com/xlab/treeprint",
"go.uber.org/zap",

View File

@ -75,3 +75,7 @@ required = ["github.com/influxdata/platform/pkg/data/gen"]
[[constraint]]
name = "github.com/influxdata/flux"
version = "0.12.0"
[[constraint]]
name = "github.com/spf13/cast"
version = "1.3.x"

View File

@ -6,15 +6,16 @@ import (
"flag"
"fmt"
"io"
"math"
"os"
"strings"
"text/template"
"time"
"github.com/influxdata/influxdb/cmd/influx_tools/generate"
"github.com/influxdata/influxdb/cmd/influx_tools/internal/profile"
"github.com/influxdata/influxdb/cmd/influx_tools/server"
"github.com/influxdata/influxdb/pkg/data/gen"
"github.com/influxdata/influxdb/services/meta"
"github.com/influxdata/platform/pkg/data/gen"
)
// Command represents the program execution for "store query".
@ -28,14 +29,17 @@ type Command struct {
configPath string
printOnly bool
example bool
noTSI bool
concurrency int
spec generate.Spec
schemaPath string
storageSpec generate.StorageSpec
schemaSpec generate.SchemaSpec
profile profile.Config
}
type SeriesGeneratorFilter func(sgi meta.ShardGroupInfo, g SeriesGenerator) SeriesGenerator
type SeriesGeneratorFilter func(sgi meta.ShardGroupInfo, g gen.SeriesGenerator) gen.SeriesGenerator
type Dependencies struct {
Server server.Interface
@ -62,73 +66,121 @@ func (cmd *Command) Run(args []string) (err error) {
return err
}
if cmd.example {
return cmd.printExample()
}
err = cmd.server.Open(cmd.configPath)
if err != nil {
return err
}
plan, err := cmd.spec.Plan(cmd.server)
storagePlan, err := cmd.storageSpec.Plan(cmd.server)
if err != nil {
return err
}
plan.PrintPlan(cmd.Stdout)
storagePlan.PrintPlan(cmd.Stdout)
var spec *gen.Spec
if cmd.schemaPath != "" {
var err error
spec, err = gen.NewSpecFromPath(cmd.schemaPath)
if err != nil {
return err
}
} else {
schemaPlan, err := cmd.schemaSpec.Plan(storagePlan)
if err != nil {
return err
}
schemaPlan.PrintPlan(cmd.Stdout)
spec = cmd.planToSpec(schemaPlan)
}
if cmd.printOnly {
return nil
}
if err = plan.InitFileSystem(cmd.server.MetaClient()); err != nil {
if err = storagePlan.InitFileSystem(cmd.server.MetaClient()); err != nil {
return err
}
return cmd.exec(plan)
return cmd.exec(storagePlan, spec)
}
func (cmd *Command) parseFlags(args []string) error {
fs := flag.NewFlagSet("gen-init", flag.ContinueOnError)
fs.StringVar(&cmd.configPath, "config", "", "Config file")
fs.StringVar(&cmd.schemaPath, "schema", "", "Schema TOML file")
fs.BoolVar(&cmd.printOnly, "print", false, "Print data spec only")
fs.BoolVar(&cmd.noTSI, "no-tsi", false, "Skip building TSI index")
fs.BoolVar(&cmd.example, "example", false, "Print an example toml schema to STDOUT")
fs.IntVar(&cmd.concurrency, "c", 1, "Number of shards to generate concurrently")
fs.StringVar(&cmd.profile.CPU, "cpuprofile", "", "Collect a CPU profile")
fs.StringVar(&cmd.profile.Memory, "memprofile", "", "Collect a memory profile")
cmd.spec.AddFlags(fs)
cmd.storageSpec.AddFlags(fs)
cmd.schemaSpec.AddFlags(fs)
if err := fs.Parse(args); err != nil {
return err
}
if cmd.spec.Database == "" {
if cmd.example {
return nil
}
if cmd.storageSpec.Database == "" {
return errors.New("database is required")
}
if cmd.spec.Retention == "" {
if cmd.storageSpec.Retention == "" {
return errors.New("retention policy is required")
}
return nil
}
func (cmd *Command) exec(p *generate.Plan) error {
groups := p.ShardGroups()
gens := make([]SeriesGenerator, len(groups))
var (
tomlSchema = template.Must(template.New("schema").Parse(`
title = "CLI schema"
[[measurements]]
name = "m0"
sample = 1.0
tags = [
{{- range $i, $e := .Tags }}
{ name = "tag{{$i}}", source = { type = "sequence", format = "value%s", start = 0, count = {{$e}} } },{{ end }}
]
fields = [
{ name = "v0", count = {{ .PointsPerSeriesPerShard }}, source = 1.0 },
]`))
)
func (cmd *Command) planToSpec(p *generate.SchemaPlan) *gen.Spec {
var sb strings.Builder
if err := tomlSchema.Execute(&sb, p); err != nil {
panic(err)
}
spec, err := gen.NewSpecFromToml(sb.String())
if err != nil {
panic(err)
}
return spec
}
func (cmd *Command) exec(storagePlan *generate.StoragePlan, spec *gen.Spec) error {
groups := storagePlan.ShardGroups()
gens := make([]gen.SeriesGenerator, len(groups))
for i := range gens {
var (
name []byte
keys []string
tv []gen.CountableSequence
)
name = []byte("m0")
tv = make([]gen.CountableSequence, len(p.Tags))
setTagVals(p.Tags, tv)
keys = make([]string, len(p.Tags))
setTagKeys("tag", keys)
sgi := groups[i]
vg := gen.NewIntegerConstantValuesSequence(p.PointsPerSeriesPerShard, sgi.StartTime, p.ShardDuration/time.Duration(p.PointsPerSeriesPerShard), 1)
gens[i] = NewSeriesGenerator(name, []byte("v0"), vg, gen.NewTagsValuesSequenceKeysValues(keys, tv))
tr := gen.TimeRange{
Start: sgi.StartTime,
End: sgi.EndTime,
}
gens[i] = gen.NewSeriesGeneratorFromSpec(spec, tr)
if cmd.filter != nil {
gens[i] = cmd.filter(sgi, gens[i])
}
@ -145,19 +197,136 @@ func (cmd *Command) exec(p *generate.Plan) error {
}()
g := Generator{Concurrency: cmd.concurrency, BuildTSI: !cmd.noTSI}
return g.Run(context.Background(), p.Database, p.ShardPath(), p.NodeShardGroups(), gens)
return g.Run(context.Background(), storagePlan.Database, storagePlan.ShardPath(), storagePlan.NodeShardGroups(), gens)
}
func setTagVals(tags []int, tv []gen.CountableSequence) {
for j := range tags {
tv[j] = gen.NewCounterByteSequenceCount(tags[j])
}
}
const exampleSchema = `title = "CLI schema"
func setTagKeys(prefix string, keys []string) {
tw := int(math.Ceil(math.Log10(float64(len(keys)))))
tf := fmt.Sprintf("%s%%0%dd", prefix, tw)
for i := range keys {
keys[i] = fmt.Sprintf(tf, i)
}
# limit the maximum number of series generated across all measurements
#
# series-limit: integer, optional (default: unlimited)
# multiple measurements are merged together
[[measurements]]
# name of measurement
name = "cpu"
# sample: float; where 0 < sample 1.0 (default: 0.5)
# sample a subset of the tag set
#
# sample 25% of the tags
#
# sample = 0.25
# Keys for defining a tag
#
# name: string, required
# Name of field
#
# source: array<string> or object
#
# A literal array of string values defines the tag values.
#
# An object defines more complex generators. The type key determines the
# type of generator.
#
# source types:
#
# type: "sequence"
# generate a sequence of tag values
#
# format: string
# a format string for the values (default: "value%s")
# start: int (default: 0)
# beginning value
# count: int, required
# ending value
#
# type: "file"
# generate a sequence of tag values from a file source.
# The data in the file is sorted, deduplicated and verified is valid UTF-8
#
# path: string
# absolute path or relative path to current toml file
tags = [
# example sequence tag source. The range of values are automatically prefixed with 0s
# to ensure correct sort behavior.
{ name = "host", source = { type = "sequence", format = "host-%s", start = 0, count = 5 } },
# tags can also be sourced from a file. The path is relative to the schema.toml.
# Each value must be on a new line. The file is also sorted, validated for UTF-8 and deduplicated.
# { name = "region", source = { type = "file", path = "files/regions.txt" } },
# Example string array source, which is also deduplicated and sorted
{ name = "region", source = ["us-west-01","us-west-02","us-east"] },
]
# Keys for defining a field
#
# name: string, required
# Name of field
#
# count: int, required
# Number of values to generate. When multiple fields have the same
# count, they will share timestamps.
#
# time-precision: string (default: ms)
# The precision for generated timestamps.
# One of ns, us, ms, s, m, h
#
# source: int, float, boolean, string, array or object
#
# A literal int, float, boolean or string will produce
# a constant value of the same data type.
#
# A literal array of homogeneous values will generate a repeating
# sequence.
#
# An object defines more complex generators. The type key determines the
# type of generator.
#
# source types:
#
# type: "rand<float>"
# generate random float values
# seed: seed to random number generator (default: 0)
# min: minimum value (default: 0.0)
# max: maximum value (default: 1.0)
#
# type: "zipf<integer>"
# generate random integer values using a Zipf distribution
# The generator generates values k [0, imax] such that P(k)
# is proportional to (v + k) ** (-s). Requirements: s > 1 and v 1.
# See https://golang.org/pkg/math/rand/#NewZipf for more information.
#
# seed: seed to random number generator (default: 0)
# s: float > 1 (required)
# v: float 1 (required)
# imax: integer (required)
#
fields = [
# Example constant float
{ name = "system", count = 5000, source = 2.5 },
# Example random floats
{ name = "user", count = 5000, source = { type = "rand<float>", seed = 10, min = 0.0, max = 1.0 } },
]
# Multiple measurements may be defined.
[[measurements]]
name = "mem"
tags = [
{ name = "host", source = { type = "sequence", format = "host-%s", start = 0, count = 5 } },
{ name = "region", source = ["us-west-01","us-west-02","us-east"] },
]
fields = [
# An example of a sequence of integer values
{ name = "free", count = 17, source = [10,15,20,25,30,35,30], time-precision = "ms" },
{ name = "low_mem", count = 17, source = [false,true,true], time-precision = "ms" },
]
`
func (cmd *Command) printExample() error {
fmt.Fprint(cmd.Stdout, exampleSchema)
return nil
}

View File

@ -11,6 +11,7 @@ import (
"github.com/influxdata/influxdb/cmd/influx_tools/internal/errlist"
"github.com/influxdata/influxdb/cmd/influx_tools/internal/shard"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/pkg/data/gen"
"github.com/influxdata/influxdb/services/meta"
"github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/influxdb/tsdb/engine/tsm1"
@ -24,7 +25,7 @@ type Generator struct {
sfile *tsdb.SeriesFile
}
func (g *Generator) Run(ctx context.Context, database, shardPath string, groups []meta.ShardGroupInfo, gens []SeriesGenerator) (err error) {
func (g *Generator) Run(ctx context.Context, database, shardPath string, groups []meta.ShardGroupInfo, gens []gen.SeriesGenerator) (err error) {
limit := make(chan struct{}, g.Concurrency)
for i := 0; i < g.Concurrency; i++ {
limit <- struct{}{}
@ -127,7 +128,7 @@ func (g *Generator) Run(ctx context.Context, database, shardPath string, groups
// seriesBatchSize specifies the number of series keys passed to the index.
const seriesBatchSize = 1000
func (g *Generator) writeShard(idx seriesIndex, sg SeriesGenerator, id uint64, path string) error {
func (g *Generator) writeShard(idx seriesIndex, sg gen.SeriesGenerator, id uint64, path string) error {
sw := shard.NewWriter(id, path)
defer sw.Close()
@ -152,7 +153,7 @@ func (g *Generator) writeShard(idx seriesIndex, sg SeriesGenerator, id uint64, p
tags = tags[:0]
}
vg := sg.ValuesGenerator()
vg := sg.TimeValuesGenerator()
key := tsm1.SeriesFieldKeyBytes(string(seriesKey), string(sg.Field()))
for vg.Next() {

View File

@ -1,89 +0,0 @@
package exec
import (
"github.com/influxdata/influxdb/models"
"github.com/influxdata/platform/pkg/data/gen"
)
type SeriesGenerator interface {
// Next advances the series generator to the next series key.
Next() bool
// Key returns the series key.
// The returned value may be cached.
Key() []byte
// Name returns the name of the measurement.
// The returned value may be cached.
Name() []byte
// Tags returns the tag set.
// The returned value may be cached.
Tags() models.Tags
// Field returns the name of the field.
// The returned value may be cached.
Field() []byte
// ValuesGenerator returns a values sequence for the current series.
ValuesGenerator() gen.ValuesSequence
}
type cache struct {
key []byte
tags models.Tags
}
type seriesGenerator struct {
name []byte
tags gen.TagsSequence
field []byte
vg gen.ValuesSequence
c cache
}
func NewSeriesGenerator(name []byte, field []byte, vg gen.ValuesSequence, tags gen.TagsSequence) SeriesGenerator {
return &seriesGenerator{
name: name,
field: field,
vg: vg,
tags: tags,
}
}
func (g *seriesGenerator) Next() bool {
if g.tags.Next() {
g.c = cache{}
g.vg.Reset()
return true
}
return false
}
func (g *seriesGenerator) Key() []byte {
if len(g.c.key) == 0 {
g.c.key = models.MakeKey(g.name, g.tags.Value())
}
return g.c.key
}
func (g *seriesGenerator) Name() []byte {
return g.name
}
func (g *seriesGenerator) Tags() models.Tags {
if len(g.c.tags) == 0 {
g.c.tags = g.tags.Value().Clone()
}
return g.c.tags
}
func (g *seriesGenerator) Field() []byte {
return g.field
}
func (g *seriesGenerator) ValuesGenerator() gen.ValuesSequence {
return g.vg
}

View File

@ -19,7 +19,7 @@ type Command struct {
configPath string
printOnly bool
spec generate.Spec
spec generate.StorageSpec
}
// NewCommand returns a new instance of Command.

View File

@ -16,35 +16,28 @@ import (
"github.com/pkg/errors"
)
type Plan struct {
Database string
Retention string
ReplicaN int
StartTime time.Time
ShardCount int
ShardDuration time.Duration
Tags TagCardinalities
PointsPerSeriesPerShard int
DatabasePath string
type StoragePlan struct {
Database string
Retention string
ReplicaN int
StartTime time.Time
ShardCount int
ShardDuration time.Duration
DatabasePath string
info *meta.DatabaseInfo
groups []meta.ShardGroupInfo
}
func (p *Plan) String() string {
func (p *StoragePlan) String() string {
sb := new(strings.Builder)
p.PrintPlan(sb)
return sb.String()
}
func (p *Plan) PrintPlan(w io.Writer) {
func (p *StoragePlan) PrintPlan(w io.Writer) {
tw := tabwriter.NewWriter(w, 25, 4, 2, ' ', 0)
fmt.Fprintf(tw, "Data Path\t%s\n", p.ShardPath())
fmt.Fprintf(tw, "Tag cardinalities\t%s\n", p.Tags)
fmt.Fprintf(tw, "Points per series per shard\t%d\n", p.PointsPerSeriesPerShard)
fmt.Fprintf(tw, "Total points per shard\t%d\n", p.Tags.Cardinality()*p.PointsPerSeriesPerShard)
fmt.Fprintf(tw, "Total series\t%d\n", p.Tags.Cardinality())
fmt.Fprintf(tw, "Total points\t%d\n", p.Tags.Cardinality()*p.ShardCount*p.PointsPerSeriesPerShard)
fmt.Fprintf(tw, "Shard Count\t%d\n", p.ShardCount)
fmt.Fprintf(tw, "Database\t%s/%s (Shard duration: %s)\n", p.Database, p.Retention, p.ShardDuration)
fmt.Fprintf(tw, "Start time\t%s\n", p.StartTime)
@ -52,20 +45,20 @@ func (p *Plan) PrintPlan(w io.Writer) {
tw.Flush()
}
func (p *Plan) ShardPath() string {
func (p *StoragePlan) ShardPath() string {
return filepath.Join(p.DatabasePath, p.Retention)
}
// TimeSpan returns the total duration for which the data set.
func (p *Plan) TimeSpan() time.Duration {
func (p *StoragePlan) TimeSpan() time.Duration {
return p.ShardDuration * time.Duration(p.ShardCount)
}
func (p *Plan) EndTime() time.Time {
func (p *StoragePlan) EndTime() time.Time {
return p.StartTime.Add(p.TimeSpan())
}
func (p *Plan) InitMetadata(client server.MetaClient) (err error) {
func (p *StoragePlan) InitMetadata(client server.MetaClient) (err error) {
if err = client.DropDatabase(p.Database); err != nil {
return err
}
@ -85,7 +78,7 @@ func (p *Plan) InitMetadata(client server.MetaClient) (err error) {
// InitFileSystem initializes the file system structure, cleaning up
// existing files and re-creating the appropriate shard directories.
func (p *Plan) InitFileSystem(client server.MetaClient) error {
func (p *StoragePlan) InitFileSystem(client server.MetaClient) error {
var err error
if err = os.RemoveAll(p.DatabasePath); err != nil {
return err
@ -117,15 +110,15 @@ func (p *Plan) InitFileSystem(client server.MetaClient) error {
}
// NodeShardGroups returns ShardGroupInfo with Shards limited to the current node
func (p *Plan) NodeShardGroups() []meta.ShardGroupInfo {
func (p *StoragePlan) NodeShardGroups() []meta.ShardGroupInfo {
return p.groups
}
func (p *Plan) ShardGroups() []meta.ShardGroupInfo {
func (p *StoragePlan) ShardGroups() []meta.ShardGroupInfo {
return p.info.RetentionPolicy(p.info.DefaultRetentionPolicy).ShardGroups
}
func (p *Plan) createShardGroupMetadata(client server.MetaClient, rp string) error {
func (p *StoragePlan) createShardGroupMetadata(client server.MetaClient, rp string) error {
ts := p.StartTime.Truncate(p.ShardDuration).UTC()
var err error
@ -141,13 +134,13 @@ func (p *Plan) createShardGroupMetadata(client server.MetaClient, rp string) err
return nil
}
func (p *Plan) TimeRange() (start, end time.Time) {
func (p *StoragePlan) TimeRange() (start, end time.Time) {
start = p.StartTime.Truncate(p.ShardDuration).UTC()
end = start.Add(time.Duration(p.ShardDuration.Nanoseconds() * int64(p.ShardCount)))
return start, end
}
func (p *Plan) Validate() error {
func (p *StoragePlan) Validate() error {
// build default values
def := &planDefaults{}
WalkPlan(def, p)
@ -164,7 +157,7 @@ type Visitor interface {
type Node interface{ node() }
func (*Plan) node() {}
func (*StoragePlan) node() {}
func WalkPlan(v Visitor, node Node) {
if v = v.Visit(node); v == nil {
@ -172,7 +165,7 @@ func WalkPlan(v Visitor, node Node) {
}
switch n := node.(type) {
case *Plan:
case *StoragePlan:
default:
panic(fmt.Sprintf("WalkConfig: unexpected node type %T", n))
@ -185,7 +178,7 @@ type planValidator struct {
func (v *planValidator) Visit(node Node) Visitor {
switch n := node.(type) {
case *Plan:
case *StoragePlan:
if n.DatabasePath == "" {
v.errs.Add(errors.New("missing DataPath"))
}
@ -205,7 +198,7 @@ type planDefaults struct{}
func (v *planDefaults) Visit(node Node) Visitor {
switch n := node.(type) {
case *Plan:
case *StoragePlan:
if n.DatabasePath == "" {
n.DatabasePath = "${HOME}/.influxdb/data"
}
@ -228,3 +221,25 @@ func (v *planDefaults) Visit(node Node) Visitor {
return v
}
type SchemaPlan struct {
StoragePlan *StoragePlan
Tags TagCardinalities
PointsPerSeriesPerShard int
}
func (p *SchemaPlan) String() string {
sb := new(strings.Builder)
p.PrintPlan(sb)
return sb.String()
}
func (p *SchemaPlan) PrintPlan(w io.Writer) {
tw := tabwriter.NewWriter(w, 25, 4, 2, ' ', 0)
fmt.Fprintf(tw, "Tag cardinalities\t%s\n", p.Tags)
fmt.Fprintf(tw, "Points per series per shard\t%d\n", p.PointsPerSeriesPerShard)
fmt.Fprintf(tw, "Total points per shard\t%d\n", p.Tags.Cardinality()*p.PointsPerSeriesPerShard)
fmt.Fprintf(tw, "Total series\t%d\n", p.Tags.Cardinality())
fmt.Fprintf(tw, "Total points\t%d\n", p.Tags.Cardinality()*p.StoragePlan.ShardCount*p.PointsPerSeriesPerShard)
_ = tw.Flush()
}

View File

@ -41,39 +41,32 @@ func (t *TagCardinalities) Set(tags string) error {
return nil
}
type Spec struct {
StartTime string
Database string
Retention string
ReplicaN int
ShardCount int
ShardDuration time.Duration
Tags TagCardinalities
PointsPerSeriesPerShard int
type StorageSpec struct {
StartTime string
Database string
Retention string
ReplicaN int
ShardCount int
ShardDuration time.Duration
}
func (a *Spec) AddFlags(fs *flag.FlagSet) {
func (a *StorageSpec) AddFlags(fs *flag.FlagSet) {
fs.StringVar(&a.StartTime, "start-time", "", "Start time")
fs.StringVar(&a.Database, "db", "db", "Name of database to create")
fs.StringVar(&a.Retention, "rp", "rp", "Name of retention policy")
fs.IntVar(&a.ReplicaN, "rf", 1, "Replication factor")
fs.IntVar(&a.ShardCount, "shards", 1, "Number of shards to create")
fs.DurationVar(&a.ShardDuration, "shard-duration", 24*time.Hour, "Shard duration (default 24h)")
a.Tags = []int{10, 10, 10}
fs.Var(&a.Tags, "t", "Tag cardinality")
fs.IntVar(&a.PointsPerSeriesPerShard, "p", 100, "Points per series per shard")
}
func (a *Spec) Plan(server server.Interface) (*Plan, error) {
plan := &Plan{
Database: a.Database,
Retention: a.Retention,
ReplicaN: a.ReplicaN,
ShardCount: a.ShardCount,
ShardDuration: a.ShardDuration,
Tags: a.Tags,
PointsPerSeriesPerShard: a.PointsPerSeriesPerShard,
DatabasePath: filepath.Join(server.TSDBConfig().Dir, a.Database),
func (a *StorageSpec) Plan(server server.Interface) (*StoragePlan, error) {
plan := &StoragePlan{
Database: a.Database,
Retention: a.Retention,
ReplicaN: a.ReplicaN,
ShardCount: a.ShardCount,
ShardDuration: a.ShardDuration,
DatabasePath: filepath.Join(server.TSDBConfig().Dir, a.Database),
}
if a.StartTime != "" {
@ -90,3 +83,22 @@ func (a *Spec) Plan(server server.Interface) (*Plan, error) {
return plan, nil
}
type SchemaSpec struct {
Tags TagCardinalities
PointsPerSeriesPerShard int
}
func (s *SchemaSpec) AddFlags(fs *flag.FlagSet) {
s.Tags = []int{10, 10, 10}
fs.Var(&s.Tags, "t", "Tag cardinality")
fs.IntVar(&s.PointsPerSeriesPerShard, "p", 100, "Points per series per shard")
}
func (s *SchemaSpec) Plan(sp *StoragePlan) (*SchemaPlan, error) {
return &SchemaPlan{
StoragePlan: sp,
Tags: s.Tags,
PointsPerSeriesPerShard: s.PointsPerSeriesPerShard,
}, nil
}

View File

@ -7,8 +7,8 @@ import (
"strconv"
"github.com/influxdata/influxdb/cmd/influx_tools/internal/errlist"
"github.com/influxdata/influxdb/pkg/data/gen"
"github.com/influxdata/influxdb/tsdb/engine/tsm1"
"github.com/influxdata/platform/pkg/data/gen"
)
const (

View File

@ -0,0 +1,16 @@
// Code generated by "stringer -type=FieldType"; DO NOT EDIT.
package models
import "strconv"
const _FieldType_name = "IntegerFloatBooleanStringEmptyUnsigned"
var _FieldType_index = [...]uint8{0, 7, 12, 19, 25, 30, 38}
func (i FieldType) String() string {
if i < 0 || i >= FieldType(len(_FieldType_index)-1) {
return "FieldType(" + strconv.FormatInt(int64(i), 10) + ")"
}
return _FieldType_name[_FieldType_index[i]:_FieldType_index[i+1]]
}

3
models/gen.go Normal file
View File

@ -0,0 +1,3 @@
package models
//go:generate stringer -type=FieldType

View File

@ -0,0 +1,97 @@
// Generated by tmpl
// https://github.com/benbjohnson/tmpl
//
// DO NOT EDIT!
// Source: arrays.gen.go.tmpl
package gen
import (
"github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/influxdb/tsdb/engine/tsm1"
)
type floatArray struct {
tsdb.FloatArray
}
func newFloatArrayLen(sz int) *floatArray {
return &floatArray{
FloatArray: tsdb.FloatArray{
Timestamps: make([]int64, sz),
Values: make([]float64, sz),
},
}
}
func (a *floatArray) Encode(b []byte) ([]byte, error) {
return tsm1.EncodeFloatArrayBlock(&a.FloatArray, b)
}
type integerArray struct {
tsdb.IntegerArray
}
func newIntegerArrayLen(sz int) *integerArray {
return &integerArray{
IntegerArray: tsdb.IntegerArray{
Timestamps: make([]int64, sz),
Values: make([]int64, sz),
},
}
}
func (a *integerArray) Encode(b []byte) ([]byte, error) {
return tsm1.EncodeIntegerArrayBlock(&a.IntegerArray, b)
}
type unsignedArray struct {
tsdb.UnsignedArray
}
func newUnsignedArrayLen(sz int) *unsignedArray {
return &unsignedArray{
UnsignedArray: tsdb.UnsignedArray{
Timestamps: make([]int64, sz),
Values: make([]uint64, sz),
},
}
}
func (a *unsignedArray) Encode(b []byte) ([]byte, error) {
return tsm1.EncodeUnsignedArrayBlock(&a.UnsignedArray, b)
}
type stringArray struct {
tsdb.StringArray
}
func newStringArrayLen(sz int) *stringArray {
return &stringArray{
StringArray: tsdb.StringArray{
Timestamps: make([]int64, sz),
Values: make([]string, sz),
},
}
}
func (a *stringArray) Encode(b []byte) ([]byte, error) {
return tsm1.EncodeStringArrayBlock(&a.StringArray, b)
}
type booleanArray struct {
tsdb.BooleanArray
}
func newBooleanArrayLen(sz int) *booleanArray {
return &booleanArray{
BooleanArray: tsdb.BooleanArray{
Timestamps: make([]int64, sz),
Values: make([]bool, sz),
},
}
}
func (a *booleanArray) Encode(b []byte) ([]byte, error) {
return tsm1.EncodeBooleanArrayBlock(&a.BooleanArray, b)
}

View File

@ -0,0 +1,27 @@
package gen
import (
"github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/influxdb/tsdb/engine/tsm1"
)
{{range .}}
{{ $typename := print .name "Array" }}
{{ $tsdbname := print .Name "Array" }}
type {{$typename}} struct {
tsdb.{{$tsdbname}}
}
func new{{$tsdbname}}Len(sz int) *{{$typename}} {
return &{{$typename}}{
{{$tsdbname}}: tsdb.{{$tsdbname}}{
Timestamps: make([]int64, sz),
Values: make([]{{.Type}}, sz),
},
}
}
func (a *{{$typename}}) Encode(b []byte) ([]byte, error) {
return tsm1.Encode{{$tsdbname}}Block(&a.{{$tsdbname}}, b)
}
{{end}}

4
pkg/data/gen/gen.go Normal file
View File

@ -0,0 +1,4 @@
package gen
//go:generate tmpl -data=@types.tmpldata arrays.gen.go.tmpl values.gen.go.tmpl values_sequence.gen.go.tmpl
//go:generate stringer -type=precision -trimprefix=precision

View File

@ -0,0 +1,141 @@
package gen
import (
"container/heap"
"math"
"github.com/influxdata/influxdb/models"
)
type mergedSeriesGenerator struct {
heap seriesGeneratorHeap
last constSeries
err error
n int64
first bool
}
func NewMergedSeriesGenerator(s []SeriesGenerator) SeriesGenerator {
if len(s) == 0 {
return nil
} else if len(s) == 1 {
return s[0]
}
msg := &mergedSeriesGenerator{first: true, n: math.MaxInt64}
msg.heap.init(s)
return msg
}
func NewMergedSeriesGeneratorLimit(s []SeriesGenerator, n int64) SeriesGenerator {
if len(s) == 0 {
return nil
}
msg := &mergedSeriesGenerator{first: true, n: n}
msg.heap.init(s)
return msg
}
func (s *mergedSeriesGenerator) Next() bool {
if len(s.heap.items) == 0 {
return false
}
if s.n > 0 {
s.n--
if !s.first {
top := s.heap.items[0]
s.last.CopyFrom(top) // capture last key for duplicate checking
for {
if top.Next() {
if len(s.heap.items) > 1 {
heap.Fix(&s.heap, 0)
}
} else {
heap.Pop(&s.heap)
if len(s.heap.items) == 0 {
return false
}
}
top = s.heap.items[0]
if CompareSeries(&s.last, top) == 0 {
// duplicate key, get next
continue
}
return true
}
}
s.first = false
return true
}
return false
}
func (s *mergedSeriesGenerator) Key() []byte {
return s.heap.items[0].Key()
}
func (s *mergedSeriesGenerator) Name() []byte {
return s.heap.items[0].Name()
}
func (s *mergedSeriesGenerator) Tags() models.Tags {
return s.heap.items[0].Tags()
}
func (s *mergedSeriesGenerator) Field() []byte {
return s.heap.items[0].Field()
}
func (s *mergedSeriesGenerator) TimeValuesGenerator() TimeValuesSequence {
return s.heap.items[0].TimeValuesGenerator()
}
type seriesGeneratorHeap struct {
items []SeriesGenerator
}
func (h *seriesGeneratorHeap) init(results []SeriesGenerator) {
if cap(h.items) < len(results) {
h.items = make([]SeriesGenerator, 0, len(results))
} else {
h.items = h.items[:0]
}
for _, rs := range results {
if rs.Next() {
h.items = append(h.items, rs)
}
}
heap.Init(h)
}
func (h *seriesGeneratorHeap) Less(i, j int) bool {
return CompareSeries(h.items[i], h.items[j]) == -1
}
func (h *seriesGeneratorHeap) Len() int {
return len(h.items)
}
func (h *seriesGeneratorHeap) Swap(i, j int) {
h.items[i], h.items[j] = h.items[j], h.items[i]
}
func (h *seriesGeneratorHeap) Push(x interface{}) {
panic("not implemented")
}
func (h *seriesGeneratorHeap) Pop() interface{} {
old := h.items
n := len(old)
item := old[n-1]
old[n-1] = nil
h.items = old[0 : n-1]
return item
}

View File

@ -0,0 +1,213 @@
package gen
import (
"fmt"
"math"
"strings"
"testing"
"time"
"github.com/google/go-cmp/cmp"
)
func sg(name, prefix, field string, counts ...int) SeriesGenerator {
spec := TimeSequenceSpec{Count: 1, Start: time.Unix(0, 0), Delta: time.Second}
ts := NewTimestampSequenceFromSpec(spec)
vs := NewFloatConstantValuesSequence(1)
vg := NewTimeFloatValuesSequence(spec.Count, ts, vs)
return NewSeriesGenerator([]byte(name), []byte(field), vg, NewTagsValuesSequenceCounts(prefix, counts))
}
func tags(sb *strings.Builder, prefix string, vals []int) {
sb.WriteByte(',')
// max tag width
tw := int(math.Ceil(math.Log10(float64(len(vals)))))
tf := fmt.Sprintf("%s%%0%dd=value%%d", prefix, tw)
tvs := make([]string, len(vals))
for i := range vals {
tvs[i] = fmt.Sprintf(tf, i, vals[i])
}
sb.WriteString(strings.Join(tvs, ","))
}
func line(name, prefix, field string, vals ...int) string {
var sb strings.Builder
sb.WriteString(name)
tags(&sb, prefix, vals)
sb.WriteString("#!~#")
sb.WriteString(field)
return sb.String()
}
func seriesGeneratorString(sg SeriesGenerator) []string {
var lines []string
for sg.Next() {
lines = append(lines, fmt.Sprintf("%s#!~#%s", string(sg.Key()), string(sg.Field())))
}
return lines
}
func TestNewMergedSeriesGenerator(t *testing.T) {
tests := []struct {
n string
s []SeriesGenerator
exp []string
}{
{
n: "single",
s: []SeriesGenerator{
sg("cpu", "t", "f0", 2, 1),
},
exp: []string{
line("cpu", "t", "f0", 0, 0),
line("cpu", "t", "f0", 1, 0),
},
},
{
n: "multiple,interleaved",
s: []SeriesGenerator{
sg("cpu", "t", "f0", 2, 1),
sg("cpu", "t", "f1", 2, 1),
},
exp: []string{
line("cpu", "t", "f0", 0, 0),
line("cpu", "t", "f1", 0, 0),
line("cpu", "t", "f0", 1, 0),
line("cpu", "t", "f1", 1, 0),
},
},
{
n: "multiple,sequential",
s: []SeriesGenerator{
sg("cpu", "t", "f0", 2),
sg("cpu", "u", "f0", 2, 1),
},
exp: []string{
line("cpu", "t", "f0", 0),
line("cpu", "t", "f0", 1),
line("cpu", "u", "f0", 0, 0),
line("cpu", "u", "f0", 1, 0),
},
},
{
n: "multiple,sequential",
s: []SeriesGenerator{
sg("m1", "t", "f0", 2, 1),
sg("m0", "t", "f0", 2, 1),
},
exp: []string{
line("m0", "t", "f0", 0, 0),
line("m0", "t", "f0", 1, 0),
line("m1", "t", "f0", 0, 0),
line("m1", "t", "f0", 1, 0),
},
},
{
// ensure duplicates are removed
n: "duplicates",
s: []SeriesGenerator{
sg("cpu", "t", "f0", 2, 1),
sg("cpu", "t", "f0", 2, 1),
},
exp: []string{
line("cpu", "t", "f0", 0, 0),
line("cpu", "t", "f0", 1, 0),
},
},
{
// ensure duplicates are removed, but non-dupes from same SeriesGenerator
// are still included
n: "duplicates,multiple,interleaved",
s: []SeriesGenerator{
sg("cpu", "t", "f0", 2, 1),
sg("cpu", "t", "f1", 2, 1),
sg("cpu", "t", "f0", 2, 1),
sg("cpu", "t", "f1", 3, 1),
},
exp: []string{
line("cpu", "t", "f0", 0, 0),
line("cpu", "t", "f1", 0, 0),
line("cpu", "t", "f0", 1, 0),
line("cpu", "t", "f1", 1, 0),
line("cpu", "t", "f1", 2, 0),
},
},
}
for _, tt := range tests {
t.Run(tt.n, func(t *testing.T) {
sg := NewMergedSeriesGenerator(tt.s)
if got := seriesGeneratorString(sg); !cmp.Equal(got, tt.exp) {
t.Errorf("unpexected -got/+exp\n%s", cmp.Diff(got, tt.exp))
}
})
}
}
func TestNewMergedSeriesGeneratorLimit(t *testing.T) {
tests := []struct {
n string
s []SeriesGenerator
lim int64
exp []string
}{
{
n: "single",
s: []SeriesGenerator{
sg("cpu", "t", "f0", 4, 1),
},
lim: 2,
exp: []string{
line("cpu", "t", "f0", 0, 0),
line("cpu", "t", "f0", 1, 0),
},
},
{
n: "multiple,interleaved",
s: []SeriesGenerator{
sg("cpu", "t", "f0", 2, 1),
sg("cpu", "t", "f1", 2, 1),
},
lim: 3,
exp: []string{
line("cpu", "t", "f0", 0, 0),
line("cpu", "t", "f1", 0, 0),
line("cpu", "t", "f0", 1, 0),
},
},
{
n: "multiple,sequential",
s: []SeriesGenerator{
sg("cpu", "t", "f0", 2),
sg("cpu", "u", "f0", 2, 1),
},
lim: 2,
exp: []string{
line("cpu", "t", "f0", 0),
line("cpu", "t", "f0", 1),
},
},
{
n: "multiple,sequential",
s: []SeriesGenerator{
sg("m1", "t", "f0", 2, 1),
sg("m0", "t", "f0", 2, 1),
},
lim: 4,
exp: []string{
line("m0", "t", "f0", 0, 0),
line("m0", "t", "f0", 1, 0),
line("m1", "t", "f0", 0, 0),
line("m1", "t", "f0", 1, 0),
},
},
}
for _, tt := range tests {
t.Run(tt.n, func(t *testing.T) {
sg := NewMergedSeriesGeneratorLimit(tt.s, tt.lim)
if got := seriesGeneratorString(sg); !cmp.Equal(got, tt.exp) {
t.Errorf("unpexected -got/+exp\n%s", cmp.Diff(got, tt.exp))
}
})
}
}

View File

@ -0,0 +1,16 @@
// Code generated by "stringer -type=precision -trimprefix=precision"; DO NOT EDIT.
package gen
import "strconv"
const _precision_name = "MillisecondNanosecondMicrosecondSecondMinuteHour"
var _precision_index = [...]uint8{0, 11, 21, 32, 38, 44, 48}
func (i precision) String() string {
if i >= precision(len(_precision_index)-1) {
return "precision(" + strconv.FormatInt(int64(i), 10) + ")"
}
return _precision_name[_precision_index[i]:_precision_index[i+1]]
}

238
pkg/data/gen/schema.go Normal file
View File

@ -0,0 +1,238 @@
package gen
import (
"fmt"
)
type Visitor interface {
Visit(node SchemaNode) (w Visitor)
}
type SchemaNode interface {
node()
}
type Schema struct {
Title string
Version string
SeriesLimit *SeriesLimit `toml:"series-limit"`
Measurements Measurements
}
func (*Schema) node() {}
type Measurements []Measurement
func (Measurements) node() {}
type Tags []Tag
func (Tags) node() {}
type Fields []Field
func (Fields) node() {}
type Measurement struct {
Name string
SeriesLimit *SeriesLimit `toml:"series-limit"`
Sample *sample
Tags Tags
Fields Fields
}
func (*Measurement) node() {}
type TagSource interface {
fmt.Stringer
SchemaNode
tagsource()
}
type Tag struct {
Name string
Source TagSource
}
func (*Tag) node() {}
type TagArraySource struct {
Values []string
}
func (*TagArraySource) node() {}
func (*TagArraySource) tagsource() {}
func (s *TagArraySource) String() string {
return fmt.Sprintf("array, source=%#v", s.Values)
}
type TagSequenceSource struct {
Format string
Start int64
Count int64
}
func (*TagSequenceSource) node() {}
func (*TagSequenceSource) tagsource() {}
func (t *TagSequenceSource) String() string {
return fmt.Sprintf("sequence, prefix=%q, range=[%d,%d)", t.Format, t.Start, t.Start+t.Count)
}
type TagFileSource struct {
Path string
}
func (*TagFileSource) node() {}
func (*TagFileSource) tagsource() {}
func (s *TagFileSource) String() string {
return fmt.Sprintf("file, path=%s", s.Path)
}
type FieldSource interface {
fmt.Stringer
SchemaNode
fieldsource()
}
type Field struct {
Name string
Count int64
TimePrecision precision `toml:"time-precision"` // TimePrecision determines the precision for generated timestamp values
Source FieldSource
}
func (*Field) node() {}
type FieldConstantValue struct {
Value interface{}
}
func (*FieldConstantValue) node() {}
func (*FieldConstantValue) fieldsource() {}
func (f *FieldConstantValue) String() string {
return fmt.Sprintf("constant, source=%#v", f.Value)
}
type FieldArraySource struct {
Value interface{}
}
func (*FieldArraySource) node() {}
func (*FieldArraySource) fieldsource() {}
func (f *FieldArraySource) String() string {
return fmt.Sprintf("array, source=%#v", f.Value)
}
type FieldFloatRandomSource struct {
Seed int64
Min, Max float64
}
func (*FieldFloatRandomSource) node() {}
func (*FieldFloatRandomSource) fieldsource() {}
func (f *FieldFloatRandomSource) String() string {
return fmt.Sprintf("rand<float>, seed=%d, min=%f, max=%f", f.Seed, f.Max, f.Max)
}
type FieldIntegerZipfSource struct {
Seed int64
S, V float64
IMAX uint64
}
func (*FieldIntegerZipfSource) node() {}
func (*FieldIntegerZipfSource) fieldsource() {}
func (f *FieldIntegerZipfSource) String() string {
return fmt.Sprintf("rand<float>, seed=%d, s=%f, v=%f, imax=%d", f.Seed, f.S, f.V, f.IMAX)
}
type VisitorFn func(node SchemaNode) bool
func (fn VisitorFn) Visit(node SchemaNode) (w Visitor) {
if fn(node) {
return fn
}
return nil
}
// WalkDown performs a pre-order, depth-first traversal of the graph, calling v for each node.
// Pre-order starts by calling the visitor for the root and each child as it traverses down
// the graph to the leaves.
func WalkDown(v Visitor, node SchemaNode) {
walk(v, node, false)
}
// WalkUp performs a post-order, depth-first traversal of the graph, calling v for each node.
// Post-order starts by calling the visitor for the leaves then each parent as it traverses up
// the graph to the root.
func WalkUp(v Visitor, node SchemaNode) {
walk(v, node, true)
}
func walk(v Visitor, node SchemaNode, up bool) Visitor {
if v == nil {
return nil
}
if !up {
if v = v.Visit(node); v == nil {
return nil
}
}
switch n := node.(type) {
case *Schema:
walk(v, n.Measurements, up)
case Measurements:
v := v
for i := range n {
v = walk(v, &n[i], up)
}
case *Measurement:
v := v
v = walk(v, n.Tags, up)
v = walk(v, n.Fields, up)
case Fields:
v := v
for i := 0; i < len(n); i++ {
v = walk(v, &n[i], up)
}
case Tags:
v := v
for i := 0; i < len(n); i++ {
v = walk(v, &n[i], up)
}
case *Tag:
walk(v, n.Source, up)
case *TagArraySource, *TagSequenceSource, *TagFileSource:
// nothing to do
case *Field:
walk(v, n.Source, up)
case *FieldConstantValue, *FieldArraySource, *FieldFloatRandomSource, *FieldIntegerZipfSource:
// nothing to do
default:
panic(fmt.Sprintf("schema.Walk: unexpected node type %T", n))
}
if up && v != nil {
v = v.Visit(node)
}
return v
}

84
pkg/data/gen/sequence.go Normal file
View File

@ -0,0 +1,84 @@
package gen
import (
"fmt"
"math"
)
type Sequence interface {
Next() bool
Value() string
}
type CountableSequence interface {
Sequence
Count() int
}
type CounterByteSequence struct {
format string
nfmt string
val string
s int
i int
end int
}
func NewCounterByteSequenceCount(n int) *CounterByteSequence {
return NewCounterByteSequence("value%s", 0, n)
}
func NewCounterByteSequence(format string, start, end int) *CounterByteSequence {
s := &CounterByteSequence{
format: format,
nfmt: fmt.Sprintf("%%0%dd", int(math.Ceil(math.Log10(float64(end))))),
s: start,
i: start,
end: end,
}
s.update()
return s
}
func (s *CounterByteSequence) Next() bool {
s.i++
if s.i >= s.end {
s.i = s.s
}
s.update()
return true
}
func (s *CounterByteSequence) update() {
s.val = fmt.Sprintf(s.format, fmt.Sprintf(s.nfmt, s.i))
}
func (s *CounterByteSequence) Value() string { return s.val }
func (s *CounterByteSequence) Count() int { return s.end - s.s }
type StringArraySequence struct {
vals []string
c int
i int
}
func NewStringArraySequence(vals []string) *StringArraySequence {
return &StringArraySequence{vals: sortDedupStrings(vals)}
}
func (s *StringArraySequence) Next() bool {
s.i++
if s.i == len(s.vals) {
s.i = 0
}
s.c = s.i
return true
}
func (s *StringArraySequence) Value() string {
return s.vals[s.c]
}
func (s *StringArraySequence) Count() int {
return len(s.vals)
}

63
pkg/data/gen/series.go Normal file
View File

@ -0,0 +1,63 @@
package gen
import (
"bytes"
)
type Series interface {
// Key returns the series key.
// The returned value may be cached.
Key() []byte
// Field returns the name of the field.
// The returned value may be modified by a subsequent call to Next.
Field() []byte
}
type constSeries struct {
key []byte
field []byte
}
func (s *constSeries) Key() []byte { return s.key }
func (s *constSeries) Field() []byte { return s.field }
var nilSeries Series = &constSeries{}
// Compare returns an integer comparing two SeriesGenerator instances
// lexicographically.
// The result will be 0 if a==b, -1 if a < b, and +1 if a > b.
// A nil argument is equivalent to an empty SeriesGenerator.
func CompareSeries(a, b Series) int {
if a == nil {
a = nilSeries
}
if b == nil {
b = nilSeries
}
switch res := bytes.Compare(a.Key(), b.Key()); res {
case 0:
return bytes.Compare(a.Field(), b.Field())
default:
return res
}
}
func (s *constSeries) CopyFrom(a Series) {
key := a.Key()
if cap(s.key) < len(key) {
s.key = make([]byte, len(key))
} else {
s.key = s.key[:len(key)]
}
copy(s.key, key)
field := a.Field()
if cap(s.field) < len(field) {
s.field = make([]byte, len(field))
} else {
s.field = s.field[:len(field)]
}
copy(s.field, field)
}

View File

@ -0,0 +1,74 @@
package gen
import (
"testing"
"github.com/google/go-cmp/cmp"
)
func TestCompareSeries(t *testing.T) {
mk := func(k, f string) Series {
return &constSeries{key: []byte(k), field: []byte(f)}
}
tests := []struct {
name string
a Series
b Series
exp int
}{
{
name: "nil a,b",
exp: 0,
},
{
name: "a(nil) < b",
a: nil,
b: mk("cpu,t0=v0", "f0"),
exp: -1,
},
{
name: "a > b(nil)",
a: mk("cpu,t0=v0", "f0"),
b: nil,
exp: 1,
},
{
name: "a = b",
a: mk("cpu,t0=v0", "f0"),
b: mk("cpu,t0=v0", "f0"),
exp: 0,
},
{
name: "a(f0) < b(f1)",
a: mk("cpu,t0=v0", "f0"),
b: mk("cpu,t0=v0", "f1"),
exp: -1,
},
{
name: "a(v0) < b(v1)",
a: mk("cpu,t0=v0", "f0"),
b: mk("cpu,t0=v1", "f0"),
exp: -1,
},
{
name: "a(f1) > b(f0)",
a: mk("cpu,t0=v0", "f1"),
b: mk("cpu,t0=v0", "f0"),
exp: 1,
},
{
name: "a(v1) > b(v0)",
a: mk("cpu,t0=v1", "f0"),
b: mk("cpu,t0=v0", "f0"),
exp: 1,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := CompareSeries(tt.a, tt.b); got != tt.exp {
t.Errorf("unexpected value -got/+exp\n%s", cmp.Diff(got, tt.exp))
}
})
}
}

View File

@ -0,0 +1,133 @@
package gen
import (
"math"
"time"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/platform/pkg/data/gen"
)
type SeriesGenerator interface {
// Next advances the series generator to the next series key.
Next() bool
// Key returns the series key.
// The returned value may be cached.
Key() []byte
// Name returns the name of the measurement.
// The returned value may be modified by a subsequent call to Next.
Name() []byte
// Tags returns the tag set.
// The returned value may be modified by a subsequent call to Next.
Tags() models.Tags
// Field returns the name of the field.
// The returned value may be modified by a subsequent call to Next.
Field() []byte
// TimeValuesGenerator returns a values sequence for the current series.
TimeValuesGenerator() TimeValuesSequence
}
type TimeSequenceSpec struct {
// Count specifies the number of values to generate.
Count int
// Start specifies the starting time for the values.
Start time.Time
// Delta specifies the interval between time stamps.
Delta time.Duration
// Precision specifies the precision of timestamp intervals
Precision time.Duration
}
type TimeRange struct {
Start time.Time
End time.Time
}
type TimeValuesSequence interface {
Reset()
Next() bool
Values() Values
}
type Values interface {
MinTime() int64
MaxTime() int64
Encode([]byte) ([]byte, error)
}
type cache struct {
key []byte
tags models.Tags
}
type seriesGenerator struct {
name []byte
tags gen.TagsSequence
field []byte
vg TimeValuesSequence
n int64
c cache
}
func NewSeriesGenerator(name []byte, field []byte, vg TimeValuesSequence, tags TagsSequence) SeriesGenerator {
return NewSeriesGeneratorLimit(name, field, vg, tags, math.MaxInt64)
}
func NewSeriesGeneratorLimit(name []byte, field []byte, vg TimeValuesSequence, tags TagsSequence, n int64) SeriesGenerator {
return &seriesGenerator{
name: name,
field: field,
tags: tags,
vg: vg,
n: n,
}
}
func (g *seriesGenerator) Next() bool {
if g.n > 0 {
g.n--
if g.tags.Next() {
g.c = cache{}
g.vg.Reset()
return true
}
g.n = 0
}
return false
}
func (g *seriesGenerator) Key() []byte {
if len(g.c.key) == 0 {
g.c.key = models.MakeKey(g.name, g.tags.Value())
}
return g.c.key
}
func (g *seriesGenerator) Name() []byte {
return g.name
}
func (g *seriesGenerator) Tags() models.Tags {
if len(g.c.tags) == 0 {
g.c.tags = g.tags.Value().Clone()
}
return g.c.tags
}
func (g *seriesGenerator) Field() []byte {
return g.field
}
func (g *seriesGenerator) TimeValuesGenerator() TimeValuesSequence {
return g.vg
}

569
pkg/data/gen/specs.go Normal file
View File

@ -0,0 +1,569 @@
package gen
import (
"bufio"
"fmt"
"math/rand"
"os"
"path"
"path/filepath"
"sort"
"time"
"unicode/utf8"
"github.com/BurntSushi/toml"
"github.com/influxdata/influxdb/models"
)
type Spec struct {
SeriesLimit *int64
Measurements []MeasurementSpec
}
func NewSeriesGeneratorFromSpec(s *Spec, tr TimeRange) SeriesGenerator {
sg := make([]SeriesGenerator, len(s.Measurements))
for i := range s.Measurements {
sg[i] = newSeriesGeneratorFromMeasurementSpec(&s.Measurements[i], tr)
}
if s.SeriesLimit == nil {
return NewMergedSeriesGenerator(sg)
}
return NewMergedSeriesGeneratorLimit(sg, *s.SeriesLimit)
}
type MeasurementSpec struct {
Name string
SeriesLimit *SeriesLimit
TagsSpec *TagsSpec
FieldValuesSpec *FieldValuesSpec
}
func newSeriesGeneratorFromMeasurementSpec(ms *MeasurementSpec, tr TimeRange) SeriesGenerator {
if ms.SeriesLimit == nil {
return NewSeriesGenerator(
[]byte(ms.Name),
[]byte(ms.FieldValuesSpec.Name),
newTimeValuesSequenceFromFieldValuesSpec(ms.FieldValuesSpec, tr),
newTagsSequenceFromTagsSpec(ms.TagsSpec))
}
return NewSeriesGeneratorLimit(
[]byte(ms.Name),
[]byte(ms.FieldValuesSpec.Name),
newTimeValuesSequenceFromFieldValuesSpec(ms.FieldValuesSpec, tr),
newTagsSequenceFromTagsSpec(ms.TagsSpec),
int64(*ms.SeriesLimit))
}
// NewTimeValuesSequenceFn returns a TimeValuesSequence that will generate a
// sequence of values based on the spec.
type NewTimeValuesSequenceFn func(spec TimeSequenceSpec) TimeValuesSequence
type NewTagsValuesSequenceFn func() TagsSequence
type NewCountableSequenceFn func() CountableSequence
type TagsSpec struct {
Tags []*TagValuesSpec
Sample *sample
}
func newTagsSequenceFromTagsSpec(ts *TagsSpec) TagsSequence {
var keys []string
var vals []CountableSequence
for _, spec := range ts.Tags {
keys = append(keys, spec.TagKey)
vals = append(vals, spec.Values())
}
var opts []tagsValuesOption
if ts.Sample != nil && *ts.Sample != 1.0 {
opts = append(opts, TagValuesSampleOption(float64(*ts.Sample)))
}
return NewTagsValuesSequenceKeysValues(keys, vals, opts...)
}
type TagValuesSpec struct {
TagKey string
Values NewCountableSequenceFn
}
type FieldValuesSpec struct {
TimeSequenceSpec
Name string
DataType models.FieldType
Values NewTimeValuesSequenceFn
}
func newTimeValuesSequenceFromFieldValuesSpec(fs *FieldValuesSpec, tr TimeRange) TimeValuesSequence {
ts := fs.TimeSequenceSpec
ts.Start = tr.Start
ts.Delta = tr.End.Sub(tr.Start) / time.Duration(ts.Count)
ts.Delta = ts.Delta.Round(ts.Precision)
return fs.Values(ts)
}
func NewSpecFromToml(s string) (*Spec, error) {
var out Schema
if _, err := toml.Decode(s, &out); err != nil {
return nil, err
}
return NewSpecFromSchema(&out)
}
func NewSpecFromPath(p string) (*Spec, error) {
var err error
p, err = filepath.Abs(p)
if err != nil {
return nil, err
}
var out Schema
if _, err := toml.DecodeFile(p, &out); err != nil {
return nil, err
}
return newSpecFromSchema(&out, schemaDir(path.Dir(p)))
}
func NewSchemaFromPath(path string) (*Schema, error) {
var out Schema
if _, err := toml.DecodeFile(path, &out); err != nil {
return nil, err
}
return &out, nil
}
type schemaToSpecState int
const (
stateOk schemaToSpecState = iota
stateErr
)
type schemaToSpec struct {
schemaDir string
stack []interface{}
state schemaToSpecState
spec *Spec
err error
}
func (s *schemaToSpec) push(v interface{}) {
s.stack = append(s.stack, v)
}
func (s *schemaToSpec) pop() interface{} {
tail := len(s.stack) - 1
v := s.stack[tail]
s.stack[tail] = nil
s.stack = s.stack[:tail]
return v
}
func (s *schemaToSpec) peek() interface{} {
if len(s.stack) == 0 {
return nil
}
return s.stack[len(s.stack)-1]
}
func (s *schemaToSpec) Visit(node SchemaNode) (w Visitor) {
switch s.state {
case stateOk:
if s.visit(node) {
return s
}
s.state = stateErr
case stateErr:
s.visitErr(node)
}
return nil
}
func (s *schemaToSpec) visit(node SchemaNode) bool {
switch n := node.(type) {
case *Schema:
s.spec.Measurements = s.pop().([]MeasurementSpec)
if n.SeriesLimit != nil {
sl := int64(*n.SeriesLimit)
s.spec.SeriesLimit = &sl
}
case Measurements:
// flatten measurements
var mss []MeasurementSpec
for {
if specs, ok := s.peek().([]MeasurementSpec); ok {
s.pop()
mss = append(mss, specs...)
continue
}
break
}
sort.Slice(mss, func(i, j int) bool {
return mss[i].Name < mss[j].Name
})
// validate field types are homogeneous for a single measurement
mg := make(map[string]models.FieldType)
for i := range mss {
spec := &mss[i]
key := spec.Name + "." + spec.FieldValuesSpec.Name
ft := spec.FieldValuesSpec.DataType
if dt, ok := mg[key]; !ok {
mg[key] = ft
} else if dt != ft {
s.err = fmt.Errorf("field %q data-type conflict, found %s and %s",
key,
dt,
ft)
return false
}
}
s.push(mss)
case *Measurement:
var ms []MeasurementSpec
fields := s.pop().([]*FieldValuesSpec)
tagsSpec := s.pop().(*TagsSpec)
tagsSpec.Sample = n.Sample
// default: sample 50%
if n.Sample == nil {
s := sample(0.5)
tagsSpec.Sample = &s
}
for _, spec := range fields {
ms = append(ms, MeasurementSpec{
Name: n.Name,
SeriesLimit: n.SeriesLimit,
TagsSpec: tagsSpec,
FieldValuesSpec: spec,
})
}
// NOTE: sort each measurement name + field name to ensure series are produced
// in correct order
sort.Slice(ms, func(i, j int) bool {
return ms[i].FieldValuesSpec.Name < ms[j].FieldValuesSpec.Name
})
s.push(ms)
case Tags:
var ts TagsSpec
for {
if spec, ok := s.peek().(*TagValuesSpec); ok {
s.pop()
ts.Tags = append(ts.Tags, spec)
continue
}
break
}
// Tag keys must be sorted to produce a valid series key sequence
sort.Slice(ts.Tags, func(i, j int) bool {
return ts.Tags[i].TagKey < ts.Tags[j].TagKey
})
for i := 1; i < len(ts.Tags); i++ {
if ts.Tags[i-1].TagKey == ts.Tags[i].TagKey {
s.err = fmt.Errorf("duplicate tag keys %q", ts.Tags[i].TagKey)
return false
}
}
s.push(&ts)
case Fields:
// combine fields
var fs []*FieldValuesSpec
for {
if spec, ok := s.peek().(*FieldValuesSpec); ok {
s.pop()
fs = append(fs, spec)
continue
}
break
}
sort.Slice(fs, func(i, j int) bool {
return fs[i].Name < fs[j].Name
})
for i := 1; i < len(fs); i++ {
if fs[i-1].Name == fs[i].Name {
s.err = fmt.Errorf("duplicate field names %q", fs[i].Name)
return false
}
}
s.push(fs)
case *Field:
fs, ok := s.peek().(*FieldValuesSpec)
if !ok {
panic(fmt.Sprintf("unexpected type %T", fs))
}
fs.TimeSequenceSpec = TimeSequenceSpec{
Count: int(n.Count),
Precision: n.TimePrecision.ToDuration(),
}
fs.Name = n.Name
case *FieldConstantValue:
var fs FieldValuesSpec
switch v := n.Value.(type) {
case float64:
fs.DataType = models.Float
fs.Values = func(spec TimeSequenceSpec) TimeValuesSequence {
return NewTimeFloatValuesSequence(
spec.Count,
NewTimestampSequenceFromSpec(spec),
NewFloatConstantValuesSequence(v),
)
}
case int64:
fs.DataType = models.Integer
fs.Values = func(spec TimeSequenceSpec) TimeValuesSequence {
return NewTimeIntegerValuesSequence(
spec.Count,
NewTimestampSequenceFromSpec(spec),
NewIntegerConstantValuesSequence(v),
)
}
case string:
fs.DataType = models.String
fs.Values = func(spec TimeSequenceSpec) TimeValuesSequence {
return NewTimeStringValuesSequence(
spec.Count,
NewTimestampSequenceFromSpec(spec),
NewStringConstantValuesSequence(v),
)
}
case bool:
fs.DataType = models.Boolean
fs.Values = func(spec TimeSequenceSpec) TimeValuesSequence {
return NewTimeBooleanValuesSequence(
spec.Count,
NewTimestampSequenceFromSpec(spec),
NewBooleanConstantValuesSequence(v),
)
}
default:
panic(fmt.Sprintf("unexpected type %T", v))
}
s.push(&fs)
case *FieldArraySource:
var fs FieldValuesSpec
switch v := n.Value.(type) {
case []float64:
fs.DataType = models.Float
fs.Values = func(spec TimeSequenceSpec) TimeValuesSequence {
return NewTimeFloatValuesSequence(
spec.Count,
NewTimestampSequenceFromSpec(spec),
NewFloatArrayValuesSequence(v),
)
}
case []int64:
fs.DataType = models.Integer
fs.Values = func(spec TimeSequenceSpec) TimeValuesSequence {
return NewTimeIntegerValuesSequence(
spec.Count,
NewTimestampSequenceFromSpec(spec),
NewIntegerArrayValuesSequence(v),
)
}
case []string:
fs.DataType = models.String
fs.Values = func(spec TimeSequenceSpec) TimeValuesSequence {
return NewTimeStringValuesSequence(
spec.Count,
NewTimestampSequenceFromSpec(spec),
NewStringArrayValuesSequence(v),
)
}
case []bool:
fs.DataType = models.Boolean
fs.Values = func(spec TimeSequenceSpec) TimeValuesSequence {
return NewTimeBooleanValuesSequence(
spec.Count,
NewTimestampSequenceFromSpec(spec),
NewBooleanArrayValuesSequence(v),
)
}
default:
panic(fmt.Sprintf("unexpected type %T", v))
}
s.push(&fs)
case *FieldFloatRandomSource:
var fs FieldValuesSpec
fs.DataType = models.Float
fs.Values = NewTimeValuesSequenceFn(func(spec TimeSequenceSpec) TimeValuesSequence {
return NewTimeFloatValuesSequence(
spec.Count,
NewTimestampSequenceFromSpec(spec),
NewFloatRandomValuesSequence(n.Min, n.Max, rand.New(rand.NewSource(n.Seed))),
)
})
s.push(&fs)
case *FieldIntegerZipfSource:
var fs FieldValuesSpec
fs.DataType = models.Integer
fs.Values = NewTimeValuesSequenceFn(func(spec TimeSequenceSpec) TimeValuesSequence {
return NewTimeIntegerValuesSequence(
spec.Count,
NewTimestampSequenceFromSpec(spec),
NewIntegerZipfValuesSequence(n),
)
})
s.push(&fs)
case *Tag:
s.push(&TagValuesSpec{
TagKey: n.Name,
Values: s.pop().(NewCountableSequenceFn),
})
case *TagSequenceSource:
s.push(NewCountableSequenceFn(func() CountableSequence {
return NewCounterByteSequence(n.Format, int(n.Start), int(n.Start+n.Count))
}))
case *TagFileSource:
p, err := s.resolvePath(n.Path)
if err != nil {
s.err = err
return false
}
lines, err := s.readLines(p)
if err != nil {
s.err = err
return false
}
s.push(NewCountableSequenceFn(func() CountableSequence {
return NewStringArraySequence(lines)
}))
case *TagArraySource:
s.push(NewCountableSequenceFn(func() CountableSequence {
return NewStringArraySequence(n.Values)
}))
case nil:
default:
panic(fmt.Sprintf("unexpected type %T", node))
}
return true
}
func (s *schemaToSpec) visitErr(node SchemaNode) {
switch n := node.(type) {
case *Schema:
s.err = fmt.Errorf("error processing schema: %v", s.err)
case *Measurement:
s.err = fmt.Errorf("measurement %q: %v", n.Name, s.err)
case *Tag:
s.err = fmt.Errorf("tag %q: %v", n.Name, s.err)
case *Field:
s.err = fmt.Errorf("field %q: %v", n.Name, s.err)
}
}
func (s *schemaToSpec) resolvePath(p string) (string, error) {
fullPath := os.ExpandEnv(p)
if !filepath.IsAbs(fullPath) {
fullPath = filepath.Join(s.schemaDir, fullPath)
}
fi, err := os.Stat(fullPath)
if err != nil {
return "", fmt.Errorf("error resolving path %q: %v", p, err)
}
if fi.IsDir() {
return "", fmt.Errorf("path %q is not a file: resolved to %s", p, fullPath)
}
return fullPath, nil
}
func (s *schemaToSpec) readLines(p string) ([]string, error) {
fp, err := s.resolvePath(p)
if err != nil {
return nil, err
}
f, err := os.Open(fp)
if err != nil {
return nil, fmt.Errorf("path error: %v", err)
}
defer f.Close()
scan := bufio.NewScanner(f)
scan.Split(bufio.ScanLines)
n := 0
var lines []string
for scan.Scan() {
if len(scan.Bytes()) == 0 {
// skip empty lines
continue
}
if !utf8.Valid(scan.Bytes()) {
return nil, fmt.Errorf("path %q, invalid UTF-8 on line %d", p, n)
}
lines = append(lines, scan.Text())
}
if scan.Err() != nil {
return nil, scan.Err()
}
return lines, nil
}
type option func(s *schemaToSpec)
func schemaDir(p string) option {
return func(s *schemaToSpec) {
s.schemaDir = p
}
}
func NewSpecFromSchema(root *Schema) (*Spec, error) {
return newSpecFromSchema(root)
}
func newSpecFromSchema(root *Schema, opts ...option) (*Spec, error) {
var spec Spec
vis := &schemaToSpec{spec: &spec}
for _, o := range opts {
o(vis)
}
WalkUp(vis, root)
if vis.err != nil {
return nil, vis.err
}
return &spec, nil
}

View File

@ -0,0 +1,78 @@
package gen
import (
"testing"
"github.com/BurntSushi/toml"
)
func TestSpecFromSchema(t *testing.T) {
in := `
title = "example schema"
[[measurements]]
name = "m0"
tags = [
{ name = "tag0", source = [ "host1", "host2" ] },
{ name = "tag1", source = [ "process1", "process2" ] },
{ name = "tag2", source = { type = "sequence", format = "value%s", start = 0, count = 100 } }
]
fields = [
{ name = "f0", count = 5000, source = 0.5 },
{ name = "f1", count = 5000, source = 0.5 },
]
[[measurements]]
name = "m1"
tags = [
{ name = "tag0", source = [ "host1", "host2" ] },
]
fields = [
{ name = "f0", count = 5000, source = 0.5 },
]
`
var out Schema
if _, err := toml.Decode(in, &out); err != nil {
t.Fatalf("unxpected error: %v", err)
}
spec, err := NewSpecFromSchema(&out)
if err != nil {
t.Error(err)
}
t.Log(spec)
}
func TestSpecFromSchemaError(t *testing.T) {
in := `
title = "example schema"
[[measurements]]
name = "m0"
tags = [
{ name = "tag0", source = [ "host1", "host2" ] },
{ name = "tag1", source = { type = "sequence", format = "value%s", start = 0, count = 100 } },
]
fields = [
{ name = "f0", count = 5000, source = 0.5 },
]
[[measurements]]
name = "m1"
tags = [
{ name = "tag0", source = [ "host1", "host2" ] },
]
fields = [
{ name = "f0", count = 5000, source = 0.5 },
]
`
var out Schema
if _, err := toml.Decode(in, &out); err != nil {
t.Fatalf("unxpected error: %v", err)
}
spec, err := NewSpecFromSchema(&out)
if err != nil {
t.Error(err)
}
t.Log(spec)
}

View File

@ -0,0 +1,175 @@
package gen
import (
"fmt"
"math"
"math/rand"
"sort"
"github.com/influxdata/influxdb/models"
)
type TagsSequence interface {
Next() bool
Value() models.Tags
Count() int
}
type tagsValuesSequence struct {
tags models.Tags
vals []CountableSequence
n int
count int
sample float64
src rand.Source
nextFn func(*tagsValuesSequence) bool
}
type tagsValuesOption func(s *tagsValuesSequence)
func TagValuesLimitOption(n int) tagsValuesOption {
return func(s *tagsValuesSequence) {
if n >= s.count {
return
}
s.src = rand.NewSource(20040409)
s.sample = float64(n) / float64(s.count)
}
}
func TagValuesSampleOption(n float64) tagsValuesOption {
return func(s *tagsValuesSequence) {
if n <= 0.0 || n > 1.0 {
panic("expect: 0.0 < n ≤ 1.0")
}
s.src = rand.NewSource(int64(float64(math.MaxInt64>>1) * n))
s.sample = n
s.nextFn = (*tagsValuesSequence).nextSample
}
}
func NewTagsValuesSequenceKeysValues(keys []string, vals []CountableSequence, opts ...tagsValuesOption) TagsSequence {
tm := make(map[string]string, len(keys))
for _, k := range keys {
tm[k] = ""
}
count := 1
for i := range vals {
count *= vals[i].Count()
}
// models.Tags are ordered, so ensure vals are ordered with respect to keys
sort.Sort(keyValues{keys, vals})
s := &tagsValuesSequence{
tags: models.NewTags(tm),
vals: vals,
count: count,
nextFn: (*tagsValuesSequence).next,
}
for _, o := range opts {
o(s)
}
return s
}
func NewTagsValuesSequenceValues(prefix string, vals []CountableSequence) TagsSequence {
keys := make([]string, len(vals))
// max tag width
tw := int(math.Ceil(math.Log10(float64(len(vals)))))
tf := fmt.Sprintf("%s%%0%dd", prefix, tw)
for i := range vals {
keys[i] = fmt.Sprintf(tf, i)
}
return NewTagsValuesSequenceKeysValues(keys, vals)
}
func NewTagsValuesSequenceCounts(prefix string, counts []int) TagsSequence {
tv := make([]CountableSequence, len(counts))
for i := range counts {
tv[i] = NewCounterByteSequenceCount(counts[i])
}
return NewTagsValuesSequenceValues(prefix, tv)
}
func (s *tagsValuesSequence) next() bool {
if s.n >= s.count {
return false
}
for i := range s.vals {
s.tags[i].Value = []byte(s.vals[i].Value())
}
s.n++
i := s.n
for j := len(s.vals) - 1; j >= 0; j-- {
v := s.vals[j]
v.Next()
c := v.Count()
if r := i % c; r != 0 {
break
}
i /= c
}
return true
}
func (s *tagsValuesSequence) skip() bool {
return (float64(s.src.Int63()>>10))*(1.0/9007199254740992.0) > s.sample
}
func (s *tagsValuesSequence) nextSample() bool {
if s.n >= s.count {
return false
}
for i := range s.vals {
s.tags[i].Value = []byte(s.vals[i].Value())
}
for {
s.n++
i := s.n
for j := len(s.vals) - 1; j >= 0; j-- {
v := s.vals[j]
v.Next()
c := v.Count()
if r := i % c; r != 0 {
break
}
i /= c
}
if !s.skip() {
break
}
}
return true
}
func (s *tagsValuesSequence) Next() bool {
return s.nextFn(s)
}
func (s *tagsValuesSequence) Value() models.Tags { return s.tags }
func (s *tagsValuesSequence) Count() int { return s.count }
type keyValues struct {
keys []string
vals []CountableSequence
}
func (k keyValues) Len() int { return len(k.keys) }
func (k keyValues) Less(i, j int) bool { return k.keys[i] < k.keys[j] }
func (k keyValues) Swap(i, j int) {
k.keys[i], k.keys[j] = k.keys[j], k.keys[i]
k.vals[i], k.vals[j] = k.vals[j], k.vals[i]
}

View File

@ -0,0 +1,36 @@
package gen
type TimestampSequence interface {
Reset()
Write(ts []int64)
}
type timestampSequence struct {
t int64
start int64
delta int64
}
func NewTimestampSequenceFromSpec(spec TimeSequenceSpec) TimestampSequence {
return &timestampSequence{
t: spec.Start.UnixNano(),
start: spec.Start.UnixNano(),
delta: int64(spec.Delta),
}
}
func (g *timestampSequence) Reset() {
g.t = g.start
}
func (g *timestampSequence) Write(ts []int64) {
var (
t = g.t
d = g.delta
)
for i := 0; i < len(ts); i++ {
ts[i] = t
t += d
}
g.t = t
}

409
pkg/data/gen/toml.go Normal file
View File

@ -0,0 +1,409 @@
package gen
import (
"errors"
"fmt"
"strings"
"time"
"github.com/spf13/cast"
)
type SeriesLimit int64
func (s *SeriesLimit) UnmarshalTOML(data interface{}) error {
v, ok := data.(int64)
if !ok {
return errors.New("series-limit: invalid value")
}
if v < 0 {
return errors.New("series-limit: must be ≥ 0")
}
*s = SeriesLimit(v)
return nil
}
type sample float64
func (s *sample) UnmarshalTOML(data interface{}) error {
v, ok := data.(float64)
if !ok {
return errors.New("sample: must be a float")
}
if v <= 0 || v > 1.0 {
return errors.New("sample: must be 0 < sample ≤ 1.0")
}
*s = sample(v)
return nil
}
type precision byte
const (
precisionMillisecond precision = iota // default
precisionNanosecond
precisionMicrosecond
precisionSecond
precisionMinute
precisionHour
)
var precisionToDuration = [...]time.Duration{
time.Millisecond,
time.Nanosecond,
time.Microsecond,
time.Second,
time.Minute,
time.Minute * 60,
time.Nanosecond,
time.Nanosecond,
}
func (p *precision) ToDuration() time.Duration {
return precisionToDuration[*p&0x7]
}
func (p *precision) UnmarshalTOML(data interface{}) error {
d, ok := data.(string)
if !ok {
return fmt.Errorf("invalid precision, expect one of (ns, us, ms, s, m, h): %T", data)
}
d = strings.ToLower(d)
switch d {
case "ns", "nanosecond":
*p = precisionNanosecond
case "us", "microsecond", "µs":
*p = precisionMicrosecond
case "ms", "millisecond":
*p = precisionMillisecond
case "s", "second":
*p = precisionSecond
case "m", "minute":
*p = precisionMinute
case "h", "hour":
*p = precisionHour
default:
return fmt.Errorf("invalid precision, expect one of (ns, ms, s, m, h): %s", d)
}
return nil
}
func (t *Tag) UnmarshalTOML(data interface{}) error {
d, ok := data.(map[string]interface{})
if !ok {
return nil
}
if n, ok := d["name"].(string); !ok || n == "" {
return errors.New("tag: missing or invalid value for name")
} else {
t.Name = n
}
// infer source
if _, ok := d["source"]; !ok {
return fmt.Errorf("missing source for tag %q", t.Name)
}
switch v := d["source"].(type) {
case int64, string, float64, bool:
if src, err := decodeTagConstantSource(v); err != nil {
return err
} else {
t.Source = src
}
case []interface{}:
if src, err := decodeTagArraySource(v); err != nil {
return err
} else {
t.Source = src
}
case map[string]interface{}:
if src, err := decodeTagSource(v); err != nil {
return err
} else {
t.Source = src
}
default:
return fmt.Errorf("invalid source for tag %q: %T", t.Name, v)
}
return nil
}
func decodeTagConstantSource(data interface{}) (TagSource, error) {
switch data.(type) {
case int64, string, float64, bool:
if src, err := cast.ToStringE(data); err != nil {
return nil, err
} else {
return &TagArraySource{Values: []string{src}}, nil
}
}
return nil, errors.New("invalid constant tag source")
}
func decodeTagArraySource(data []interface{}) (TagSource, error) {
if len(data) == 0 {
return nil, errors.New("empty array source")
}
if src, err := cast.ToStringSliceE(data); err != nil {
return nil, err
} else {
return &TagArraySource{Values: src}, nil
}
}
func decodeTagSource(data map[string]interface{}) (TagSource, error) {
typ, ok := data["type"].(string)
if !ok {
return nil, errors.New("missing type field")
}
switch typ {
case "sequence":
return decodeTagSequenceSource(data)
case "file":
return decodeTagFileSource(data)
default:
return nil, fmt.Errorf("invalid type field %q", typ)
}
}
func decodeTagFileSource(data map[string]interface{}) (TagSource, error) {
var s TagFileSource
if v, ok := data["path"].(string); ok {
s.Path = v
} else {
return nil, errors.New("file: missing path")
}
return &s, nil
}
func decodeTagSequenceSource(data map[string]interface{}) (TagSource, error) {
var s TagSequenceSource
if v, ok := data["format"].(string); ok {
// TODO(sgc): validate format string
s.Format = v
} else {
s.Format = "value%s"
}
if v, ok := data["start"]; ok {
if v, err := cast.ToInt64E(v); err != nil {
return nil, fmt.Errorf("tag.sequence: invalid start, %v", err)
} else if v < 0 {
return nil, fmt.Errorf("tag.sequence: start must be ≥ 0")
} else {
s.Start = v
}
}
if v, ok := data["count"]; ok {
if v, err := cast.ToInt64E(v); err != nil {
return nil, fmt.Errorf("tag.sequence: invalid count, %v", err)
} else if v < 0 {
return nil, fmt.Errorf("tag.sequence: count must be > 0")
} else {
s.Count = v
}
} else {
return nil, fmt.Errorf("tag.sequence: missing count")
}
return &s, nil
}
func (t *Field) UnmarshalTOML(data interface{}) error {
d, ok := data.(map[string]interface{})
if !ok {
return nil
}
if n, ok := d["name"].(string); !ok || n == "" {
return errors.New("field: missing or invalid value for name")
} else {
t.Name = n
}
if n, ok := d["count"]; !ok {
return errors.New("field: missing value for count")
} else if count, err := cast.ToInt64E(n); err != nil {
return fmt.Errorf("field: invalid count, %v", err)
} else if count <= 0 {
return errors.New("field: count must be > 0")
} else {
t.Count = count
}
if n, ok := d["time-precision"]; ok {
if err := t.TimePrecision.UnmarshalTOML(n); err != nil {
return err
}
}
// infer source
if _, ok := d["source"]; !ok {
return fmt.Errorf("missing source for field %q", t.Name)
}
switch v := d["source"].(type) {
case int64, string, float64, bool:
t.Source = &FieldConstantValue{v}
case []interface{}:
if src, err := decodeFieldArraySource(v); err != nil {
return err
} else {
t.Source = src
}
case map[string]interface{}:
if src, err := decodeFieldSource(v); err != nil {
return err
} else {
t.Source = src
}
default:
// unknown
return fmt.Errorf("invalid source for tag %q: %T", t.Name, v)
}
return nil
}
func decodeFieldArraySource(data []interface{}) (FieldSource, error) {
if len(data) == 0 {
return nil, errors.New("empty array")
}
var (
src interface{}
err error
)
// use first value to determine slice type
switch data[0].(type) {
case int64:
src, err = toInt64SliceE(data)
case float64:
src, err = toFloat64SliceE(data)
case string:
src, err = cast.ToStringSliceE(data)
case bool:
src, err = cast.ToBoolSliceE(data)
default:
err = fmt.Errorf("unsupported field source data type: %T", data[0])
}
if err != nil {
return nil, err
}
return &FieldArraySource{Value: src}, nil
}
func decodeFieldSource(data map[string]interface{}) (FieldSource, error) {
typ, ok := data["type"].(string)
if !ok {
return nil, errors.New("missing type field")
}
switch typ {
case "rand<float>":
return decodeFloatRandomSource(data)
case "zipf<integer>":
return decodeIntegerZipfSource(data)
default:
return nil, fmt.Errorf("invalid type field %q", typ)
}
}
func decodeFloatRandomSource(data map[string]interface{}) (FieldSource, error) {
var s FieldFloatRandomSource
if v, ok := data["seed"]; ok {
if v, err := cast.ToInt64E(v); err != nil {
return nil, fmt.Errorf("rand<float>: invalid seed, %v", err)
} else {
s.Seed = v
}
}
if v, ok := data["min"]; ok {
if v, err := cast.ToFloat64E(v); err != nil {
return nil, fmt.Errorf("rand<float>: invalid min, %v", err)
} else {
s.Min = v
}
}
if v, ok := data["max"]; ok {
if v, err := cast.ToFloat64E(v); err != nil {
return nil, fmt.Errorf("rand<float>: invalid max, %v", err)
} else {
s.Max = v
}
} else {
s.Max = 1.0
}
if !(s.Min <= s.Max) {
return nil, errors.New("rand<float>: min ≤ max")
}
return &s, nil
}
func decodeIntegerZipfSource(data map[string]interface{}) (FieldSource, error) {
var s FieldIntegerZipfSource
if v, ok := data["seed"]; ok {
if v, err := cast.ToInt64E(v); err != nil {
return nil, fmt.Errorf("zipf<integer>: invalid seed, %v", err)
} else {
s.Seed = v
}
}
if v, ok := data["s"]; ok {
if v, err := cast.ToFloat64E(v); err != nil || v <= 1.0 {
return nil, fmt.Errorf("zipf<integer>: invalid value for s (s > 1), %v", err)
} else {
s.S = v
}
} else {
return nil, fmt.Errorf("zipf<integer>: missing value for s")
}
if v, ok := data["v"]; ok {
if v, err := cast.ToFloat64E(v); err != nil || v < 1.0 {
return nil, fmt.Errorf("zipf<integer>: invalid value for v (v ≥ 1), %v", err)
} else {
s.V = v
}
} else {
return nil, fmt.Errorf("zipf<integer>: missing value for v")
}
if v, ok := data["imax"]; ok {
if v, err := cast.ToUint64E(v); err != nil {
return nil, fmt.Errorf("zipf<integer>: invalid value for imax, %v", err)
} else {
s.IMAX = v
}
} else {
return nil, fmt.Errorf("zipf<integer>: missing value for imax")
}
return &s, nil
}

164
pkg/data/gen/toml_test.go Normal file
View File

@ -0,0 +1,164 @@
package gen
import (
"fmt"
"strings"
"testing"
"github.com/BurntSushi/toml"
"github.com/google/go-cmp/cmp"
)
func visit(root *Schema) string {
w := &strings.Builder{}
walkFn := func(node SchemaNode) bool {
switch n := node.(type) {
case *Schema:
case Measurements:
fmt.Fprintln(w, "Measurements: ")
case *Measurement:
fmt.Fprintln(w)
fmt.Fprintf(w, " Name: %s\n", n.Name)
case Tags:
fmt.Fprintln(w, " Tags:")
case Fields:
fmt.Fprintln(w, " Fields:")
case *Field:
fmt.Fprintf(w, " %s: %s, count=%d, time-precision=%s\n", n.Name, n.Source, n.Count, n.TimePrecision)
case *Tag:
fmt.Fprintf(w, " %s: %s\n", n.Name, n.Source)
}
return true
}
WalkDown(VisitorFn(walkFn), root)
return w.String()
}
func TestSchema(t *testing.T) {
in := `
title = "example schema"
series-limit = 10
[[measurements]]
name = "constant"
series-limit = 5
[[measurements.tags]]
name = "tag0"
source = [ "host1", "host2" ]
[[measurements.tags]]
name = "tag1"
source = { type = "file", path = "foo.txt" }
[[measurements.fields]]
name = "floatC"
count = 5000
source = 0.5
time-precision = "us"
[[measurements.fields]]
name = "integerC"
count = 5000
source = 3
time-precision = "hour"
[[measurements.fields]]
name = "stringC"
count = 5000
source = "hello"
[[measurements.fields]]
name = "stringA"
count = 5000
source = ["hello", "world"]
[[measurements.fields]]
name = "boolf"
count = 5000
source = false
[[measurements]]
name = "random"
[[measurements.tags]]
name = "tagSeq"
source = { type = "sequence", format = "value%s", start = 0, count = 100 }
[[measurements.fields]]
name = "floatR"
count = 5000
source = { type = "rand<float>", min = 0.5, max = 50.1, seed = 10 }
time-precision = "us"
[[measurements]]
name = "array"
[[measurements.tags]]
name = "tagSeq"
source = { type = "sequence", format = "value%s", start = 0, count = 100 }
[[measurements.tags]]
name = "tagFile"
source = { type = "file", path = "foo.txt" }
[[measurements.fields]]
name = "stringA"
count = 1000
source = ["this", "that"]
time-precision = "us"
[[measurements.fields]]
name = "integerA"
count = 1000
source = [5, 6, 7]
time-precision = "us"
`
var out Schema
_, err := toml.Decode(in, &out)
if err != nil {
t.Fatalf("unxpected error: %v", err)
}
exp := `Measurements:
Name: constant
Tags:
tag0: array, source=[]string{"host1", "host2"}
tag1: file, path=foo.txt
Fields:
floatC: constant, source=0.5, count=5000, time-precision=Microsecond
integerC: constant, source=3, count=5000, time-precision=Hour
stringC: constant, source="hello", count=5000, time-precision=Millisecond
stringA: array, source=[]string{"hello", "world"}, count=5000, time-precision=Millisecond
boolf: constant, source=false, count=5000, time-precision=Millisecond
Name: random
Tags:
tagSeq: sequence, prefix="value%s", range=[0,100)
Fields:
floatR: rand<float>, seed=10, min=50.100000, max=50.100000, count=5000, time-precision=Microsecond
Name: array
Tags:
tagSeq: sequence, prefix="value%s", range=[0,100)
tagFile: file, path=foo.txt
Fields:
stringA: array, source=[]string{"this", "that"}, count=1000, time-precision=Microsecond
integerA: array, source=[]int64{5, 6, 7}, count=1000, time-precision=Microsecond
`
if got := visit(&out); !cmp.Equal(got, exp) {
t.Errorf("unexpected value, -got/+exp\n%s", cmp.Diff(got, exp))
}
}

View File

@ -0,0 +1,30 @@
[
{
"Name":"Float",
"name":"float",
"Type":"float64",
"Rand":"Float64"
},
{
"Name":"Integer",
"name":"integer",
"Type":"int64",
"Rand":"Int64"
},
{
"Name":"Unsigned",
"name":"unsigned",
"Type":"uint64",
"Rand":"Uint64"
},
{
"Name":"String",
"name":"string",
"Type":"string"
},
{
"Name":"Boolean",
"name":"boolean",
"Type":"bool"
}
]

113
pkg/data/gen/util.go Normal file
View File

@ -0,0 +1,113 @@
package gen
import (
"fmt"
"reflect"
"sort"
"github.com/spf13/cast"
)
func min(a, b int) int {
if a < b {
return a
}
return b
}
func sortDedupStrings(in []string) []string {
sort.Strings(in)
j := 0
for i := 1; i < len(in); i++ {
if in[j] == in[i] {
continue
}
j++
in[j] = in[i]
}
return in[:j+1]
}
func sortDedupInts(in []int) []int {
sort.Ints(in)
j := 0
for i := 1; i < len(in); i++ {
if in[j] == in[i] {
continue
}
j++
in[j] = in[i]
}
return in[:j+1]
}
func sortDedupFloats(in []float64) []float64 {
sort.Float64s(in)
j := 0
for i := 1; i < len(in); i++ {
if in[j] == in[i] {
continue
}
j++
in[j] = in[i]
}
return in[:j+1]
}
// ToInt64SliceE casts an interface to a []int64 type.
func toInt64SliceE(i interface{}) ([]int64, error) {
if i == nil {
return []int64{}, fmt.Errorf("unable to cast %#v of type %T to []int64", i, i)
}
switch v := i.(type) {
case []int64:
return v, nil
}
kind := reflect.TypeOf(i).Kind()
switch kind {
case reflect.Slice, reflect.Array:
s := reflect.ValueOf(i)
a := make([]int64, s.Len())
for j := 0; j < s.Len(); j++ {
val, err := cast.ToInt64E(s.Index(j).Interface())
if err != nil {
return []int64{}, fmt.Errorf("unable to cast %#v of type %T to []int64", i, i)
}
a[j] = val
}
return a, nil
default:
return []int64{}, fmt.Errorf("unable to cast %#v of type %T to []int64", i, i)
}
}
// ToFloat64SliceE casts an interface to a []float64 type.
func toFloat64SliceE(i interface{}) ([]float64, error) {
if i == nil {
return []float64{}, fmt.Errorf("unable to cast %#v of type %T to []float64", i, i)
}
switch v := i.(type) {
case []float64:
return v, nil
}
kind := reflect.TypeOf(i).Kind()
switch kind {
case reflect.Slice, reflect.Array:
s := reflect.ValueOf(i)
a := make([]float64, s.Len())
for j := 0; j < s.Len(); j++ {
val, err := cast.ToFloat64E(s.Index(j).Interface())
if err != nil {
return []float64{}, fmt.Errorf("unable to cast %#v of type %T to []float64", i, i)
}
a[j] = val
}
return a, nil
default:
return []float64{}, fmt.Errorf("unable to cast %#v of type %T to []float64", i, i)
}
}

252
pkg/data/gen/values.gen.go Normal file
View File

@ -0,0 +1,252 @@
// Generated by tmpl
// https://github.com/benbjohnson/tmpl
//
// DO NOT EDIT!
// Source: values.gen.go.tmpl
package gen
type floatConstantValuesSequence struct {
v float64
}
func NewFloatConstantValuesSequence(v float64) FloatValuesSequence {
return &floatConstantValuesSequence{
v: v,
}
}
func (g *floatConstantValuesSequence) Reset() {
}
func (g *floatConstantValuesSequence) Write(vs []float64) {
for i := 0; i < len(vs); i++ {
vs[i] = g.v
}
}
type integerConstantValuesSequence struct {
v int64
}
func NewIntegerConstantValuesSequence(v int64) IntegerValuesSequence {
return &integerConstantValuesSequence{
v: v,
}
}
func (g *integerConstantValuesSequence) Reset() {
}
func (g *integerConstantValuesSequence) Write(vs []int64) {
for i := 0; i < len(vs); i++ {
vs[i] = g.v
}
}
type unsignedConstantValuesSequence struct {
v uint64
}
func NewUnsignedConstantValuesSequence(v uint64) UnsignedValuesSequence {
return &unsignedConstantValuesSequence{
v: v,
}
}
func (g *unsignedConstantValuesSequence) Reset() {
}
func (g *unsignedConstantValuesSequence) Write(vs []uint64) {
for i := 0; i < len(vs); i++ {
vs[i] = g.v
}
}
type stringConstantValuesSequence struct {
v string
}
func NewStringConstantValuesSequence(v string) StringValuesSequence {
return &stringConstantValuesSequence{
v: v,
}
}
func (g *stringConstantValuesSequence) Reset() {
}
func (g *stringConstantValuesSequence) Write(vs []string) {
for i := 0; i < len(vs); i++ {
vs[i] = g.v
}
}
type booleanConstantValuesSequence struct {
v bool
}
func NewBooleanConstantValuesSequence(v bool) BooleanValuesSequence {
return &booleanConstantValuesSequence{
v: v,
}
}
func (g *booleanConstantValuesSequence) Reset() {
}
func (g *booleanConstantValuesSequence) Write(vs []bool) {
for i := 0; i < len(vs); i++ {
vs[i] = g.v
}
}
type floatArrayValuesSequence struct {
v []float64
vi int
}
func NewFloatArrayValuesSequence(v []float64) FloatValuesSequence {
return &floatArrayValuesSequence{
v: v,
}
}
func (g *floatArrayValuesSequence) Reset() {
g.vi = 0
}
func (g *floatArrayValuesSequence) Write(vs []float64) {
var (
v = g.v
vi = g.vi
)
for i := 0; i < len(vs); i++ {
if vi >= len(v) {
vi = 0
}
vs[i] = v[vi]
vi += 1
}
g.vi = vi
}
type integerArrayValuesSequence struct {
v []int64
vi int
}
func NewIntegerArrayValuesSequence(v []int64) IntegerValuesSequence {
return &integerArrayValuesSequence{
v: v,
}
}
func (g *integerArrayValuesSequence) Reset() {
g.vi = 0
}
func (g *integerArrayValuesSequence) Write(vs []int64) {
var (
v = g.v
vi = g.vi
)
for i := 0; i < len(vs); i++ {
if vi >= len(v) {
vi = 0
}
vs[i] = v[vi]
vi += 1
}
g.vi = vi
}
type unsignedArrayValuesSequence struct {
v []uint64
vi int
}
func NewUnsignedArrayValuesSequence(v []uint64) UnsignedValuesSequence {
return &unsignedArrayValuesSequence{
v: v,
}
}
func (g *unsignedArrayValuesSequence) Reset() {
g.vi = 0
}
func (g *unsignedArrayValuesSequence) Write(vs []uint64) {
var (
v = g.v
vi = g.vi
)
for i := 0; i < len(vs); i++ {
if vi >= len(v) {
vi = 0
}
vs[i] = v[vi]
vi += 1
}
g.vi = vi
}
type stringArrayValuesSequence struct {
v []string
vi int
}
func NewStringArrayValuesSequence(v []string) StringValuesSequence {
return &stringArrayValuesSequence{
v: v,
}
}
func (g *stringArrayValuesSequence) Reset() {
g.vi = 0
}
func (g *stringArrayValuesSequence) Write(vs []string) {
var (
v = g.v
vi = g.vi
)
for i := 0; i < len(vs); i++ {
if vi >= len(v) {
vi = 0
}
vs[i] = v[vi]
vi += 1
}
g.vi = vi
}
type booleanArrayValuesSequence struct {
v []bool
vi int
}
func NewBooleanArrayValuesSequence(v []bool) BooleanValuesSequence {
return &booleanArrayValuesSequence{
v: v,
}
}
func (g *booleanArrayValuesSequence) Reset() {
g.vi = 0
}
func (g *booleanArrayValuesSequence) Write(vs []bool) {
var (
v = g.v
vi = g.vi
)
for i := 0; i < len(vs); i++ {
if vi >= len(v) {
vi = 0
}
vs[i] = v[vi]
vi += 1
}
g.vi = vi
}

View File

@ -0,0 +1,54 @@
package gen
{{range .}}
type {{.name}}ConstantValuesSequence struct {
v {{.Type}}
}
func New{{.Name}}ConstantValuesSequence(v {{.Type}}) {{.Name}}ValuesSequence {
return &{{.name}}ConstantValuesSequence{
v: v,
}
}
func (g *{{.name}}ConstantValuesSequence) Reset() {
}
func (g *{{.name}}ConstantValuesSequence) Write(vs []{{.Type}}) {
for i := 0; i < len(vs); i++ {
vs[i] = g.v
}
}
{{end}}
{{range .}}
type {{.name}}ArrayValuesSequence struct {
v []{{.Type}}
vi int
}
func New{{.Name}}ArrayValuesSequence(v []{{.Type}}) {{.Name}}ValuesSequence {
return &{{.name}}ArrayValuesSequence{
v: v,
}
}
func (g *{{.name}}ArrayValuesSequence) Reset() {
g.vi = 0
}
func (g *{{.name}}ArrayValuesSequence) Write(vs []{{.Type}}) {
var (
v = g.v
vi = g.vi
)
for i := 0; i < len(vs); i++ {
if vi >= len(v) {
vi = 0
}
vs[i] = v[vi]
vi += 1
}
g.vi = vi
}
{{end}}

46
pkg/data/gen/values.go Normal file
View File

@ -0,0 +1,46 @@
package gen
import (
"math/rand"
)
type floatRandomValuesSequence struct {
r *rand.Rand
a float64
b float64
}
func NewFloatRandomValuesSequence(min, max float64, r *rand.Rand) FloatValuesSequence {
return &floatRandomValuesSequence{r: r, a: max - min, b: min}
}
func (g *floatRandomValuesSequence) Reset() {}
func (g *floatRandomValuesSequence) Write(vs []float64) {
var (
a = g.a
b = g.b
)
for i := 0; i < len(vs); i++ {
vs[i] = a*g.r.Float64() + b // ax + b
}
}
type integerRandomValuesSequence struct {
r *rand.Zipf
}
// NewIntegerZipfValuesSequence produces int64 values using a Zipfian distribution
// described by s.
func NewIntegerZipfValuesSequence(s *FieldIntegerZipfSource) IntegerValuesSequence {
r := rand.New(rand.NewSource(s.Seed))
return &integerRandomValuesSequence{r: rand.NewZipf(r, s.S, s.V, s.IMAX)}
}
func (g *integerRandomValuesSequence) Reset() {}
func (g *integerRandomValuesSequence) Write(vs []int64) {
for i := 0; i < len(vs); i++ {
vs[i] = int64(g.r.Uint64())
}
}

View File

@ -0,0 +1,251 @@
// Generated by tmpl
// https://github.com/benbjohnson/tmpl
//
// DO NOT EDIT!
// Source: values_sequence.gen.go.tmpl
package gen
import (
"github.com/influxdata/influxdb/tsdb"
)
type FloatValuesSequence interface {
Reset()
Write(v []float64)
}
type timeFloatValuesSequence struct {
vals floatArray
ts TimestampSequence
vs FloatValuesSequence
count int
n int
}
func NewTimeFloatValuesSequence(count int, ts TimestampSequence, vs FloatValuesSequence) TimeValuesSequence {
return &timeFloatValuesSequence{
vals: *newFloatArrayLen(tsdb.DefaultMaxPointsPerBlock),
ts: ts,
vs: vs,
count: count,
n: count,
}
}
func (s *timeFloatValuesSequence) Reset() {
s.ts.Reset()
s.vs.Reset()
s.n = s.count
}
func (s *timeFloatValuesSequence) Next() bool {
if s.n > 0 {
c := min(s.n, tsdb.DefaultMaxPointsPerBlock)
s.n -= c
s.vals.Timestamps = s.vals.Timestamps[:c]
s.vals.Values = s.vals.Values[:c]
s.ts.Write(s.vals.Timestamps)
s.vs.Write(s.vals.Values)
return true
}
return false
}
func (s *timeFloatValuesSequence) Values() Values {
return &s.vals
}
type IntegerValuesSequence interface {
Reset()
Write(v []int64)
}
type timeIntegerValuesSequence struct {
vals integerArray
ts TimestampSequence
vs IntegerValuesSequence
count int
n int
}
func NewTimeIntegerValuesSequence(count int, ts TimestampSequence, vs IntegerValuesSequence) TimeValuesSequence {
return &timeIntegerValuesSequence{
vals: *newIntegerArrayLen(tsdb.DefaultMaxPointsPerBlock),
ts: ts,
vs: vs,
count: count,
n: count,
}
}
func (s *timeIntegerValuesSequence) Reset() {
s.ts.Reset()
s.vs.Reset()
s.n = s.count
}
func (s *timeIntegerValuesSequence) Next() bool {
if s.n > 0 {
c := min(s.n, tsdb.DefaultMaxPointsPerBlock)
s.n -= c
s.vals.Timestamps = s.vals.Timestamps[:c]
s.vals.Values = s.vals.Values[:c]
s.ts.Write(s.vals.Timestamps)
s.vs.Write(s.vals.Values)
return true
}
return false
}
func (s *timeIntegerValuesSequence) Values() Values {
return &s.vals
}
type UnsignedValuesSequence interface {
Reset()
Write(v []uint64)
}
type timeUnsignedValuesSequence struct {
vals unsignedArray
ts TimestampSequence
vs UnsignedValuesSequence
count int
n int
}
func NewTimeUnsignedValuesSequence(count int, ts TimestampSequence, vs UnsignedValuesSequence) TimeValuesSequence {
return &timeUnsignedValuesSequence{
vals: *newUnsignedArrayLen(tsdb.DefaultMaxPointsPerBlock),
ts: ts,
vs: vs,
count: count,
n: count,
}
}
func (s *timeUnsignedValuesSequence) Reset() {
s.ts.Reset()
s.vs.Reset()
s.n = s.count
}
func (s *timeUnsignedValuesSequence) Next() bool {
if s.n > 0 {
c := min(s.n, tsdb.DefaultMaxPointsPerBlock)
s.n -= c
s.vals.Timestamps = s.vals.Timestamps[:c]
s.vals.Values = s.vals.Values[:c]
s.ts.Write(s.vals.Timestamps)
s.vs.Write(s.vals.Values)
return true
}
return false
}
func (s *timeUnsignedValuesSequence) Values() Values {
return &s.vals
}
type StringValuesSequence interface {
Reset()
Write(v []string)
}
type timeStringValuesSequence struct {
vals stringArray
ts TimestampSequence
vs StringValuesSequence
count int
n int
}
func NewTimeStringValuesSequence(count int, ts TimestampSequence, vs StringValuesSequence) TimeValuesSequence {
return &timeStringValuesSequence{
vals: *newStringArrayLen(tsdb.DefaultMaxPointsPerBlock),
ts: ts,
vs: vs,
count: count,
n: count,
}
}
func (s *timeStringValuesSequence) Reset() {
s.ts.Reset()
s.vs.Reset()
s.n = s.count
}
func (s *timeStringValuesSequence) Next() bool {
if s.n > 0 {
c := min(s.n, tsdb.DefaultMaxPointsPerBlock)
s.n -= c
s.vals.Timestamps = s.vals.Timestamps[:c]
s.vals.Values = s.vals.Values[:c]
s.ts.Write(s.vals.Timestamps)
s.vs.Write(s.vals.Values)
return true
}
return false
}
func (s *timeStringValuesSequence) Values() Values {
return &s.vals
}
type BooleanValuesSequence interface {
Reset()
Write(v []bool)
}
type timeBooleanValuesSequence struct {
vals booleanArray
ts TimestampSequence
vs BooleanValuesSequence
count int
n int
}
func NewTimeBooleanValuesSequence(count int, ts TimestampSequence, vs BooleanValuesSequence) TimeValuesSequence {
return &timeBooleanValuesSequence{
vals: *newBooleanArrayLen(tsdb.DefaultMaxPointsPerBlock),
ts: ts,
vs: vs,
count: count,
n: count,
}
}
func (s *timeBooleanValuesSequence) Reset() {
s.ts.Reset()
s.vs.Reset()
s.n = s.count
}
func (s *timeBooleanValuesSequence) Next() bool {
if s.n > 0 {
c := min(s.n, tsdb.DefaultMaxPointsPerBlock)
s.n -= c
s.vals.Timestamps = s.vals.Timestamps[:c]
s.vals.Values = s.vals.Values[:c]
s.ts.Write(s.vals.Timestamps)
s.vs.Write(s.vals.Values)
return true
}
return false
}
func (s *timeBooleanValuesSequence) Values() Values {
return &s.vals
}

View File

@ -0,0 +1,55 @@
package gen
import (
"github.com/influxdata/influxdb/tsdb"
)
{{range .}}
type {{.Name}}ValuesSequence interface {
Reset()
Write(v []{{.Type}})
}
type time{{.Name}}ValuesSequence struct {
vals {{.name}}Array
ts TimestampSequence
vs {{.Name}}ValuesSequence
count int
n int
}
func NewTime{{.Name}}ValuesSequence(count int, ts TimestampSequence, vs {{.Name}}ValuesSequence) TimeValuesSequence {
return &time{{.Name}}ValuesSequence{
vals: *new{{.Name}}ArrayLen(tsdb.DefaultMaxPointsPerBlock),
ts: ts,
vs: vs,
count: count,
n: count,
}
}
func (s *time{{.Name}}ValuesSequence) Reset() {
s.ts.Reset()
s.vs.Reset()
s.n = s.count
}
func (s *time{{.Name}}ValuesSequence) Next() bool {
if s.n > 0 {
c := min(s.n, tsdb.DefaultMaxPointsPerBlock)
s.n -= c
s.vals.Timestamps = s.vals.Timestamps[:c]
s.vals.Values = s.vals.Values[:c]
s.ts.Write(s.vals.Timestamps)
s.vs.Write(s.vals.Values)
return true
}
return false
}
func (s *time{{.Name}}ValuesSequence) Values() Values {
return &s.vals
}
{{end}}