diff --git a/Gopkg.lock b/Gopkg.lock index 251b5aff4e..d862169707 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -812,6 +812,14 @@ revision = "c6db9435477f3cb658e2dd0fa93e02118c870251" version = "v0.2.0" +[[projects]] + digest = "1:08d65904057412fc0270fc4812a1c90c594186819243160dc779a402d4b6d0bc" + name = "github.com/spf13/cast" + packages = ["."] + pruneopts = "UT" + revision = "8c9545af88b134710ab1cd196795e7f2388358d7" + version = "v1.3.0" + [[projects]] digest = "1:eaa6698f44de8f2977e93c9b946e60a8af75f565058658aad2df8032b55c84e5" name = "github.com/tinylib/msgp" @@ -1122,6 +1130,7 @@ "github.com/prometheus/client_golang/prometheus", "github.com/prometheus/client_golang/prometheus/promhttp", "github.com/retailnext/hllpp", + "github.com/spf13/cast", "github.com/tinylib/msgp/msgp", "github.com/xlab/treeprint", "go.uber.org/zap", diff --git a/Gopkg.toml b/Gopkg.toml index 338bdf9413..6c6b429b6b 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -75,3 +75,7 @@ required = ["github.com/influxdata/platform/pkg/data/gen"] [[constraint]] name = "github.com/influxdata/flux" version = "0.12.0" + +[[constraint]] + name = "github.com/spf13/cast" + version = "1.3.x" \ No newline at end of file diff --git a/cmd/influx_tools/generate/exec/command.go b/cmd/influx_tools/generate/exec/command.go index 6219a1a744..9ffb9a305a 100644 --- a/cmd/influx_tools/generate/exec/command.go +++ b/cmd/influx_tools/generate/exec/command.go @@ -6,15 +6,16 @@ import ( "flag" "fmt" "io" - "math" "os" + "strings" + "text/template" "time" "github.com/influxdata/influxdb/cmd/influx_tools/generate" "github.com/influxdata/influxdb/cmd/influx_tools/internal/profile" "github.com/influxdata/influxdb/cmd/influx_tools/server" + "github.com/influxdata/influxdb/pkg/data/gen" "github.com/influxdata/influxdb/services/meta" - "github.com/influxdata/platform/pkg/data/gen" ) // Command represents the program execution for "store query". @@ -28,14 +29,17 @@ type Command struct { configPath string printOnly bool + example bool noTSI bool concurrency int - spec generate.Spec + schemaPath string + storageSpec generate.StorageSpec + schemaSpec generate.SchemaSpec profile profile.Config } -type SeriesGeneratorFilter func(sgi meta.ShardGroupInfo, g SeriesGenerator) SeriesGenerator +type SeriesGeneratorFilter func(sgi meta.ShardGroupInfo, g gen.SeriesGenerator) gen.SeriesGenerator type Dependencies struct { Server server.Interface @@ -62,73 +66,121 @@ func (cmd *Command) Run(args []string) (err error) { return err } + if cmd.example { + return cmd.printExample() + } + err = cmd.server.Open(cmd.configPath) if err != nil { return err } - plan, err := cmd.spec.Plan(cmd.server) + storagePlan, err := cmd.storageSpec.Plan(cmd.server) if err != nil { return err } - plan.PrintPlan(cmd.Stdout) + storagePlan.PrintPlan(cmd.Stdout) + + var spec *gen.Spec + if cmd.schemaPath != "" { + var err error + spec, err = gen.NewSpecFromPath(cmd.schemaPath) + if err != nil { + return err + } + } else { + schemaPlan, err := cmd.schemaSpec.Plan(storagePlan) + if err != nil { + return err + } + + schemaPlan.PrintPlan(cmd.Stdout) + spec = cmd.planToSpec(schemaPlan) + } if cmd.printOnly { return nil } - if err = plan.InitFileSystem(cmd.server.MetaClient()); err != nil { + if err = storagePlan.InitFileSystem(cmd.server.MetaClient()); err != nil { return err } - return cmd.exec(plan) + return cmd.exec(storagePlan, spec) } func (cmd *Command) parseFlags(args []string) error { fs := flag.NewFlagSet("gen-init", flag.ContinueOnError) fs.StringVar(&cmd.configPath, "config", "", "Config file") + fs.StringVar(&cmd.schemaPath, "schema", "", "Schema TOML file") fs.BoolVar(&cmd.printOnly, "print", false, "Print data spec only") fs.BoolVar(&cmd.noTSI, "no-tsi", false, "Skip building TSI index") + fs.BoolVar(&cmd.example, "example", false, "Print an example toml schema to STDOUT") fs.IntVar(&cmd.concurrency, "c", 1, "Number of shards to generate concurrently") fs.StringVar(&cmd.profile.CPU, "cpuprofile", "", "Collect a CPU profile") fs.StringVar(&cmd.profile.Memory, "memprofile", "", "Collect a memory profile") - cmd.spec.AddFlags(fs) + cmd.storageSpec.AddFlags(fs) + cmd.schemaSpec.AddFlags(fs) if err := fs.Parse(args); err != nil { return err } - if cmd.spec.Database == "" { + if cmd.example { + return nil + } + + if cmd.storageSpec.Database == "" { return errors.New("database is required") } - if cmd.spec.Retention == "" { + if cmd.storageSpec.Retention == "" { return errors.New("retention policy is required") } return nil } -func (cmd *Command) exec(p *generate.Plan) error { - groups := p.ShardGroups() - gens := make([]SeriesGenerator, len(groups)) +var ( + tomlSchema = template.Must(template.New("schema").Parse(` +title = "CLI schema" + +[[measurements]] +name = "m0" +sample = 1.0 +tags = [ +{{- range $i, $e := .Tags }} + { name = "tag{{$i}}", source = { type = "sequence", format = "value%s", start = 0, count = {{$e}} } },{{ end }} +] +fields = [ + { name = "v0", count = {{ .PointsPerSeriesPerShard }}, source = 1.0 }, +]`)) +) + +func (cmd *Command) planToSpec(p *generate.SchemaPlan) *gen.Spec { + var sb strings.Builder + if err := tomlSchema.Execute(&sb, p); err != nil { + panic(err) + } + + spec, err := gen.NewSpecFromToml(sb.String()) + if err != nil { + panic(err) + } + return spec +} + +func (cmd *Command) exec(storagePlan *generate.StoragePlan, spec *gen.Spec) error { + groups := storagePlan.ShardGroups() + gens := make([]gen.SeriesGenerator, len(groups)) for i := range gens { - var ( - name []byte - keys []string - tv []gen.CountableSequence - ) - - name = []byte("m0") - tv = make([]gen.CountableSequence, len(p.Tags)) - setTagVals(p.Tags, tv) - keys = make([]string, len(p.Tags)) - setTagKeys("tag", keys) - sgi := groups[i] - vg := gen.NewIntegerConstantValuesSequence(p.PointsPerSeriesPerShard, sgi.StartTime, p.ShardDuration/time.Duration(p.PointsPerSeriesPerShard), 1) - gens[i] = NewSeriesGenerator(name, []byte("v0"), vg, gen.NewTagsValuesSequenceKeysValues(keys, tv)) + tr := gen.TimeRange{ + Start: sgi.StartTime, + End: sgi.EndTime, + } + gens[i] = gen.NewSeriesGeneratorFromSpec(spec, tr) if cmd.filter != nil { gens[i] = cmd.filter(sgi, gens[i]) } @@ -145,19 +197,136 @@ func (cmd *Command) exec(p *generate.Plan) error { }() g := Generator{Concurrency: cmd.concurrency, BuildTSI: !cmd.noTSI} - return g.Run(context.Background(), p.Database, p.ShardPath(), p.NodeShardGroups(), gens) + return g.Run(context.Background(), storagePlan.Database, storagePlan.ShardPath(), storagePlan.NodeShardGroups(), gens) } -func setTagVals(tags []int, tv []gen.CountableSequence) { - for j := range tags { - tv[j] = gen.NewCounterByteSequenceCount(tags[j]) - } -} +const exampleSchema = `title = "CLI schema" -func setTagKeys(prefix string, keys []string) { - tw := int(math.Ceil(math.Log10(float64(len(keys))))) - tf := fmt.Sprintf("%s%%0%dd", prefix, tw) - for i := range keys { - keys[i] = fmt.Sprintf(tf, i) - } +# limit the maximum number of series generated across all measurements +# +# series-limit: integer, optional (default: unlimited) +# multiple measurements are merged together +[[measurements]] +# name of measurement + +name = "cpu" + +# sample: float; where 0 < sample ≤ 1.0 (default: 0.5) +# sample a subset of the tag set +# +# sample 25% of the tags +# +# sample = 0.25 + +# Keys for defining a tag +# +# name: string, required +# Name of field +# +# source: array or object +# +# A literal array of string values defines the tag values. +# +# An object defines more complex generators. The type key determines the +# type of generator. +# +# source types: +# +# type: "sequence" +# generate a sequence of tag values +# +# format: string +# a format string for the values (default: "value%s") +# start: int (default: 0) +# beginning value +# count: int, required +# ending value +# +# type: "file" +# generate a sequence of tag values from a file source. +# The data in the file is sorted, deduplicated and verified is valid UTF-8 +# +# path: string +# absolute path or relative path to current toml file +tags = [ + # example sequence tag source. The range of values are automatically prefixed with 0s + # to ensure correct sort behavior. + { name = "host", source = { type = "sequence", format = "host-%s", start = 0, count = 5 } }, + + # tags can also be sourced from a file. The path is relative to the schema.toml. + # Each value must be on a new line. The file is also sorted, validated for UTF-8 and deduplicated. + # { name = "region", source = { type = "file", path = "files/regions.txt" } }, + + # Example string array source, which is also deduplicated and sorted + { name = "region", source = ["us-west-01","us-west-02","us-east"] }, +] + +# Keys for defining a field +# +# name: string, required +# Name of field +# +# count: int, required +# Number of values to generate. When multiple fields have the same +# count, they will share timestamps. +# +# time-precision: string (default: ms) +# The precision for generated timestamps. +# One of ns, us, ms, s, m, h +# +# source: int, float, boolean, string, array or object +# +# A literal int, float, boolean or string will produce +# a constant value of the same data type. +# +# A literal array of homogeneous values will generate a repeating +# sequence. +# +# An object defines more complex generators. The type key determines the +# type of generator. +# +# source types: +# +# type: "rand" +# generate random float values +# seed: seed to random number generator (default: 0) +# min: minimum value (default: 0.0) +# max: maximum value (default: 1.0) +# +# type: "zipf" +# generate random integer values using a Zipf distribution +# The generator generates values k ∈ [0, imax] such that P(k) +# is proportional to (v + k) ** (-s). Requirements: s > 1 and v ≥ 1. +# See https://golang.org/pkg/math/rand/#NewZipf for more information. +# +# seed: seed to random number generator (default: 0) +# s: float > 1 (required) +# v: float ≥ 1 (required) +# imax: integer (required) +# +fields = [ + # Example constant float + { name = "system", count = 5000, source = 2.5 }, + + # Example random floats + { name = "user", count = 5000, source = { type = "rand", seed = 10, min = 0.0, max = 1.0 } }, +] + +# Multiple measurements may be defined. +[[measurements]] +name = "mem" +tags = [ + { name = "host", source = { type = "sequence", format = "host-%s", start = 0, count = 5 } }, + { name = "region", source = ["us-west-01","us-west-02","us-east"] }, +] +fields = [ + # An example of a sequence of integer values + { name = "free", count = 17, source = [10,15,20,25,30,35,30], time-precision = "ms" }, + { name = "low_mem", count = 17, source = [false,true,true], time-precision = "ms" }, +] +` + +func (cmd *Command) printExample() error { + fmt.Fprint(cmd.Stdout, exampleSchema) + return nil } diff --git a/cmd/influx_tools/generate/exec/generator.go b/cmd/influx_tools/generate/exec/generator.go index c70bb7ff59..afc3d55381 100644 --- a/cmd/influx_tools/generate/exec/generator.go +++ b/cmd/influx_tools/generate/exec/generator.go @@ -11,6 +11,7 @@ import ( "github.com/influxdata/influxdb/cmd/influx_tools/internal/errlist" "github.com/influxdata/influxdb/cmd/influx_tools/internal/shard" "github.com/influxdata/influxdb/models" + "github.com/influxdata/influxdb/pkg/data/gen" "github.com/influxdata/influxdb/services/meta" "github.com/influxdata/influxdb/tsdb" "github.com/influxdata/influxdb/tsdb/engine/tsm1" @@ -24,7 +25,7 @@ type Generator struct { sfile *tsdb.SeriesFile } -func (g *Generator) Run(ctx context.Context, database, shardPath string, groups []meta.ShardGroupInfo, gens []SeriesGenerator) (err error) { +func (g *Generator) Run(ctx context.Context, database, shardPath string, groups []meta.ShardGroupInfo, gens []gen.SeriesGenerator) (err error) { limit := make(chan struct{}, g.Concurrency) for i := 0; i < g.Concurrency; i++ { limit <- struct{}{} @@ -127,7 +128,7 @@ func (g *Generator) Run(ctx context.Context, database, shardPath string, groups // seriesBatchSize specifies the number of series keys passed to the index. const seriesBatchSize = 1000 -func (g *Generator) writeShard(idx seriesIndex, sg SeriesGenerator, id uint64, path string) error { +func (g *Generator) writeShard(idx seriesIndex, sg gen.SeriesGenerator, id uint64, path string) error { sw := shard.NewWriter(id, path) defer sw.Close() @@ -152,7 +153,7 @@ func (g *Generator) writeShard(idx seriesIndex, sg SeriesGenerator, id uint64, p tags = tags[:0] } - vg := sg.ValuesGenerator() + vg := sg.TimeValuesGenerator() key := tsm1.SeriesFieldKeyBytes(string(seriesKey), string(sg.Field())) for vg.Next() { diff --git a/cmd/influx_tools/generate/exec/series_generator.go b/cmd/influx_tools/generate/exec/series_generator.go deleted file mode 100644 index 7aed0629cd..0000000000 --- a/cmd/influx_tools/generate/exec/series_generator.go +++ /dev/null @@ -1,89 +0,0 @@ -package exec - -import ( - "github.com/influxdata/influxdb/models" - "github.com/influxdata/platform/pkg/data/gen" -) - -type SeriesGenerator interface { - // Next advances the series generator to the next series key. - Next() bool - - // Key returns the series key. - // The returned value may be cached. - Key() []byte - - // Name returns the name of the measurement. - // The returned value may be cached. - Name() []byte - - // Tags returns the tag set. - // The returned value may be cached. - Tags() models.Tags - - // Field returns the name of the field. - // The returned value may be cached. - Field() []byte - - // ValuesGenerator returns a values sequence for the current series. - ValuesGenerator() gen.ValuesSequence -} - -type cache struct { - key []byte - tags models.Tags -} - -type seriesGenerator struct { - name []byte - tags gen.TagsSequence - field []byte - vg gen.ValuesSequence - - c cache -} - -func NewSeriesGenerator(name []byte, field []byte, vg gen.ValuesSequence, tags gen.TagsSequence) SeriesGenerator { - return &seriesGenerator{ - name: name, - field: field, - vg: vg, - tags: tags, - } -} - -func (g *seriesGenerator) Next() bool { - if g.tags.Next() { - g.c = cache{} - g.vg.Reset() - return true - } - - return false -} - -func (g *seriesGenerator) Key() []byte { - if len(g.c.key) == 0 { - g.c.key = models.MakeKey(g.name, g.tags.Value()) - } - return g.c.key -} - -func (g *seriesGenerator) Name() []byte { - return g.name -} - -func (g *seriesGenerator) Tags() models.Tags { - if len(g.c.tags) == 0 { - g.c.tags = g.tags.Value().Clone() - } - return g.c.tags -} - -func (g *seriesGenerator) Field() []byte { - return g.field -} - -func (g *seriesGenerator) ValuesGenerator() gen.ValuesSequence { - return g.vg -} diff --git a/cmd/influx_tools/generate/init/command.go b/cmd/influx_tools/generate/init/command.go index 134b943842..2b82e5183b 100644 --- a/cmd/influx_tools/generate/init/command.go +++ b/cmd/influx_tools/generate/init/command.go @@ -19,7 +19,7 @@ type Command struct { configPath string printOnly bool - spec generate.Spec + spec generate.StorageSpec } // NewCommand returns a new instance of Command. diff --git a/cmd/influx_tools/generate/plan.go b/cmd/influx_tools/generate/plan.go index 26a36b0cd8..de9acb9c40 100644 --- a/cmd/influx_tools/generate/plan.go +++ b/cmd/influx_tools/generate/plan.go @@ -16,35 +16,28 @@ import ( "github.com/pkg/errors" ) -type Plan struct { - Database string - Retention string - ReplicaN int - StartTime time.Time - ShardCount int - ShardDuration time.Duration - Tags TagCardinalities - PointsPerSeriesPerShard int - DatabasePath string +type StoragePlan struct { + Database string + Retention string + ReplicaN int + StartTime time.Time + ShardCount int + ShardDuration time.Duration + DatabasePath string info *meta.DatabaseInfo groups []meta.ShardGroupInfo } -func (p *Plan) String() string { +func (p *StoragePlan) String() string { sb := new(strings.Builder) p.PrintPlan(sb) return sb.String() } -func (p *Plan) PrintPlan(w io.Writer) { +func (p *StoragePlan) PrintPlan(w io.Writer) { tw := tabwriter.NewWriter(w, 25, 4, 2, ' ', 0) fmt.Fprintf(tw, "Data Path\t%s\n", p.ShardPath()) - fmt.Fprintf(tw, "Tag cardinalities\t%s\n", p.Tags) - fmt.Fprintf(tw, "Points per series per shard\t%d\n", p.PointsPerSeriesPerShard) - fmt.Fprintf(tw, "Total points per shard\t%d\n", p.Tags.Cardinality()*p.PointsPerSeriesPerShard) - fmt.Fprintf(tw, "Total series\t%d\n", p.Tags.Cardinality()) - fmt.Fprintf(tw, "Total points\t%d\n", p.Tags.Cardinality()*p.ShardCount*p.PointsPerSeriesPerShard) fmt.Fprintf(tw, "Shard Count\t%d\n", p.ShardCount) fmt.Fprintf(tw, "Database\t%s/%s (Shard duration: %s)\n", p.Database, p.Retention, p.ShardDuration) fmt.Fprintf(tw, "Start time\t%s\n", p.StartTime) @@ -52,20 +45,20 @@ func (p *Plan) PrintPlan(w io.Writer) { tw.Flush() } -func (p *Plan) ShardPath() string { +func (p *StoragePlan) ShardPath() string { return filepath.Join(p.DatabasePath, p.Retention) } // TimeSpan returns the total duration for which the data set. -func (p *Plan) TimeSpan() time.Duration { +func (p *StoragePlan) TimeSpan() time.Duration { return p.ShardDuration * time.Duration(p.ShardCount) } -func (p *Plan) EndTime() time.Time { +func (p *StoragePlan) EndTime() time.Time { return p.StartTime.Add(p.TimeSpan()) } -func (p *Plan) InitMetadata(client server.MetaClient) (err error) { +func (p *StoragePlan) InitMetadata(client server.MetaClient) (err error) { if err = client.DropDatabase(p.Database); err != nil { return err } @@ -85,7 +78,7 @@ func (p *Plan) InitMetadata(client server.MetaClient) (err error) { // InitFileSystem initializes the file system structure, cleaning up // existing files and re-creating the appropriate shard directories. -func (p *Plan) InitFileSystem(client server.MetaClient) error { +func (p *StoragePlan) InitFileSystem(client server.MetaClient) error { var err error if err = os.RemoveAll(p.DatabasePath); err != nil { return err @@ -117,15 +110,15 @@ func (p *Plan) InitFileSystem(client server.MetaClient) error { } // NodeShardGroups returns ShardGroupInfo with Shards limited to the current node -func (p *Plan) NodeShardGroups() []meta.ShardGroupInfo { +func (p *StoragePlan) NodeShardGroups() []meta.ShardGroupInfo { return p.groups } -func (p *Plan) ShardGroups() []meta.ShardGroupInfo { +func (p *StoragePlan) ShardGroups() []meta.ShardGroupInfo { return p.info.RetentionPolicy(p.info.DefaultRetentionPolicy).ShardGroups } -func (p *Plan) createShardGroupMetadata(client server.MetaClient, rp string) error { +func (p *StoragePlan) createShardGroupMetadata(client server.MetaClient, rp string) error { ts := p.StartTime.Truncate(p.ShardDuration).UTC() var err error @@ -141,13 +134,13 @@ func (p *Plan) createShardGroupMetadata(client server.MetaClient, rp string) err return nil } -func (p *Plan) TimeRange() (start, end time.Time) { +func (p *StoragePlan) TimeRange() (start, end time.Time) { start = p.StartTime.Truncate(p.ShardDuration).UTC() end = start.Add(time.Duration(p.ShardDuration.Nanoseconds() * int64(p.ShardCount))) return start, end } -func (p *Plan) Validate() error { +func (p *StoragePlan) Validate() error { // build default values def := &planDefaults{} WalkPlan(def, p) @@ -164,7 +157,7 @@ type Visitor interface { type Node interface{ node() } -func (*Plan) node() {} +func (*StoragePlan) node() {} func WalkPlan(v Visitor, node Node) { if v = v.Visit(node); v == nil { @@ -172,7 +165,7 @@ func WalkPlan(v Visitor, node Node) { } switch n := node.(type) { - case *Plan: + case *StoragePlan: default: panic(fmt.Sprintf("WalkConfig: unexpected node type %T", n)) @@ -185,7 +178,7 @@ type planValidator struct { func (v *planValidator) Visit(node Node) Visitor { switch n := node.(type) { - case *Plan: + case *StoragePlan: if n.DatabasePath == "" { v.errs.Add(errors.New("missing DataPath")) } @@ -205,7 +198,7 @@ type planDefaults struct{} func (v *planDefaults) Visit(node Node) Visitor { switch n := node.(type) { - case *Plan: + case *StoragePlan: if n.DatabasePath == "" { n.DatabasePath = "${HOME}/.influxdb/data" } @@ -228,3 +221,25 @@ func (v *planDefaults) Visit(node Node) Visitor { return v } + +type SchemaPlan struct { + StoragePlan *StoragePlan + Tags TagCardinalities + PointsPerSeriesPerShard int +} + +func (p *SchemaPlan) String() string { + sb := new(strings.Builder) + p.PrintPlan(sb) + return sb.String() +} + +func (p *SchemaPlan) PrintPlan(w io.Writer) { + tw := tabwriter.NewWriter(w, 25, 4, 2, ' ', 0) + fmt.Fprintf(tw, "Tag cardinalities\t%s\n", p.Tags) + fmt.Fprintf(tw, "Points per series per shard\t%d\n", p.PointsPerSeriesPerShard) + fmt.Fprintf(tw, "Total points per shard\t%d\n", p.Tags.Cardinality()*p.PointsPerSeriesPerShard) + fmt.Fprintf(tw, "Total series\t%d\n", p.Tags.Cardinality()) + fmt.Fprintf(tw, "Total points\t%d\n", p.Tags.Cardinality()*p.StoragePlan.ShardCount*p.PointsPerSeriesPerShard) + _ = tw.Flush() +} diff --git a/cmd/influx_tools/generate/spec.go b/cmd/influx_tools/generate/spec.go index 975cca6211..f21e1e7a38 100644 --- a/cmd/influx_tools/generate/spec.go +++ b/cmd/influx_tools/generate/spec.go @@ -41,39 +41,32 @@ func (t *TagCardinalities) Set(tags string) error { return nil } -type Spec struct { - StartTime string - Database string - Retention string - ReplicaN int - ShardCount int - ShardDuration time.Duration - Tags TagCardinalities - PointsPerSeriesPerShard int +type StorageSpec struct { + StartTime string + Database string + Retention string + ReplicaN int + ShardCount int + ShardDuration time.Duration } -func (a *Spec) AddFlags(fs *flag.FlagSet) { +func (a *StorageSpec) AddFlags(fs *flag.FlagSet) { fs.StringVar(&a.StartTime, "start-time", "", "Start time") fs.StringVar(&a.Database, "db", "db", "Name of database to create") fs.StringVar(&a.Retention, "rp", "rp", "Name of retention policy") fs.IntVar(&a.ReplicaN, "rf", 1, "Replication factor") fs.IntVar(&a.ShardCount, "shards", 1, "Number of shards to create") fs.DurationVar(&a.ShardDuration, "shard-duration", 24*time.Hour, "Shard duration (default 24h)") - a.Tags = []int{10, 10, 10} - fs.Var(&a.Tags, "t", "Tag cardinality") - fs.IntVar(&a.PointsPerSeriesPerShard, "p", 100, "Points per series per shard") } -func (a *Spec) Plan(server server.Interface) (*Plan, error) { - plan := &Plan{ - Database: a.Database, - Retention: a.Retention, - ReplicaN: a.ReplicaN, - ShardCount: a.ShardCount, - ShardDuration: a.ShardDuration, - Tags: a.Tags, - PointsPerSeriesPerShard: a.PointsPerSeriesPerShard, - DatabasePath: filepath.Join(server.TSDBConfig().Dir, a.Database), +func (a *StorageSpec) Plan(server server.Interface) (*StoragePlan, error) { + plan := &StoragePlan{ + Database: a.Database, + Retention: a.Retention, + ReplicaN: a.ReplicaN, + ShardCount: a.ShardCount, + ShardDuration: a.ShardDuration, + DatabasePath: filepath.Join(server.TSDBConfig().Dir, a.Database), } if a.StartTime != "" { @@ -90,3 +83,22 @@ func (a *Spec) Plan(server server.Interface) (*Plan, error) { return plan, nil } + +type SchemaSpec struct { + Tags TagCardinalities + PointsPerSeriesPerShard int +} + +func (s *SchemaSpec) AddFlags(fs *flag.FlagSet) { + s.Tags = []int{10, 10, 10} + fs.Var(&s.Tags, "t", "Tag cardinality") + fs.IntVar(&s.PointsPerSeriesPerShard, "p", 100, "Points per series per shard") +} + +func (s *SchemaSpec) Plan(sp *StoragePlan) (*SchemaPlan, error) { + return &SchemaPlan{ + StoragePlan: sp, + Tags: s.Tags, + PointsPerSeriesPerShard: s.PointsPerSeriesPerShard, + }, nil +} diff --git a/cmd/influx_tools/internal/shard/writer.go b/cmd/influx_tools/internal/shard/writer.go index 8fafd20d59..1212b4baeb 100644 --- a/cmd/influx_tools/internal/shard/writer.go +++ b/cmd/influx_tools/internal/shard/writer.go @@ -7,8 +7,8 @@ import ( "strconv" "github.com/influxdata/influxdb/cmd/influx_tools/internal/errlist" + "github.com/influxdata/influxdb/pkg/data/gen" "github.com/influxdata/influxdb/tsdb/engine/tsm1" - "github.com/influxdata/platform/pkg/data/gen" ) const ( diff --git a/models/fieldtype_string.go b/models/fieldtype_string.go new file mode 100644 index 0000000000..3d181aa991 --- /dev/null +++ b/models/fieldtype_string.go @@ -0,0 +1,16 @@ +// Code generated by "stringer -type=FieldType"; DO NOT EDIT. + +package models + +import "strconv" + +const _FieldType_name = "IntegerFloatBooleanStringEmptyUnsigned" + +var _FieldType_index = [...]uint8{0, 7, 12, 19, 25, 30, 38} + +func (i FieldType) String() string { + if i < 0 || i >= FieldType(len(_FieldType_index)-1) { + return "FieldType(" + strconv.FormatInt(int64(i), 10) + ")" + } + return _FieldType_name[_FieldType_index[i]:_FieldType_index[i+1]] +} diff --git a/models/gen.go b/models/gen.go new file mode 100644 index 0000000000..0aaa43f203 --- /dev/null +++ b/models/gen.go @@ -0,0 +1,3 @@ +package models + +//go:generate stringer -type=FieldType diff --git a/pkg/data/gen/arrays.gen.go b/pkg/data/gen/arrays.gen.go new file mode 100644 index 0000000000..7564157982 --- /dev/null +++ b/pkg/data/gen/arrays.gen.go @@ -0,0 +1,97 @@ +// Generated by tmpl +// https://github.com/benbjohnson/tmpl +// +// DO NOT EDIT! +// Source: arrays.gen.go.tmpl + +package gen + +import ( + "github.com/influxdata/influxdb/tsdb" + "github.com/influxdata/influxdb/tsdb/engine/tsm1" +) + +type floatArray struct { + tsdb.FloatArray +} + +func newFloatArrayLen(sz int) *floatArray { + return &floatArray{ + FloatArray: tsdb.FloatArray{ + Timestamps: make([]int64, sz), + Values: make([]float64, sz), + }, + } +} + +func (a *floatArray) Encode(b []byte) ([]byte, error) { + return tsm1.EncodeFloatArrayBlock(&a.FloatArray, b) +} + +type integerArray struct { + tsdb.IntegerArray +} + +func newIntegerArrayLen(sz int) *integerArray { + return &integerArray{ + IntegerArray: tsdb.IntegerArray{ + Timestamps: make([]int64, sz), + Values: make([]int64, sz), + }, + } +} + +func (a *integerArray) Encode(b []byte) ([]byte, error) { + return tsm1.EncodeIntegerArrayBlock(&a.IntegerArray, b) +} + +type unsignedArray struct { + tsdb.UnsignedArray +} + +func newUnsignedArrayLen(sz int) *unsignedArray { + return &unsignedArray{ + UnsignedArray: tsdb.UnsignedArray{ + Timestamps: make([]int64, sz), + Values: make([]uint64, sz), + }, + } +} + +func (a *unsignedArray) Encode(b []byte) ([]byte, error) { + return tsm1.EncodeUnsignedArrayBlock(&a.UnsignedArray, b) +} + +type stringArray struct { + tsdb.StringArray +} + +func newStringArrayLen(sz int) *stringArray { + return &stringArray{ + StringArray: tsdb.StringArray{ + Timestamps: make([]int64, sz), + Values: make([]string, sz), + }, + } +} + +func (a *stringArray) Encode(b []byte) ([]byte, error) { + return tsm1.EncodeStringArrayBlock(&a.StringArray, b) +} + +type booleanArray struct { + tsdb.BooleanArray +} + +func newBooleanArrayLen(sz int) *booleanArray { + return &booleanArray{ + BooleanArray: tsdb.BooleanArray{ + Timestamps: make([]int64, sz), + Values: make([]bool, sz), + }, + } +} + +func (a *booleanArray) Encode(b []byte) ([]byte, error) { + return tsm1.EncodeBooleanArrayBlock(&a.BooleanArray, b) +} diff --git a/pkg/data/gen/arrays.gen.go.tmpl b/pkg/data/gen/arrays.gen.go.tmpl new file mode 100644 index 0000000000..3570c4b25a --- /dev/null +++ b/pkg/data/gen/arrays.gen.go.tmpl @@ -0,0 +1,27 @@ +package gen + +import ( + "github.com/influxdata/influxdb/tsdb" + "github.com/influxdata/influxdb/tsdb/engine/tsm1" +) + +{{range .}} +{{ $typename := print .name "Array" }} +{{ $tsdbname := print .Name "Array" }} +type {{$typename}} struct { + tsdb.{{$tsdbname}} +} + +func new{{$tsdbname}}Len(sz int) *{{$typename}} { + return &{{$typename}}{ + {{$tsdbname}}: tsdb.{{$tsdbname}}{ + Timestamps: make([]int64, sz), + Values: make([]{{.Type}}, sz), + }, + } +} + +func (a *{{$typename}}) Encode(b []byte) ([]byte, error) { + return tsm1.Encode{{$tsdbname}}Block(&a.{{$tsdbname}}, b) +} +{{end}} \ No newline at end of file diff --git a/pkg/data/gen/gen.go b/pkg/data/gen/gen.go new file mode 100644 index 0000000000..7af8a05d80 --- /dev/null +++ b/pkg/data/gen/gen.go @@ -0,0 +1,4 @@ +package gen + +//go:generate tmpl -data=@types.tmpldata arrays.gen.go.tmpl values.gen.go.tmpl values_sequence.gen.go.tmpl +//go:generate stringer -type=precision -trimprefix=precision diff --git a/pkg/data/gen/mergedseriesgenerator.go b/pkg/data/gen/mergedseriesgenerator.go new file mode 100644 index 0000000000..e995e7798e --- /dev/null +++ b/pkg/data/gen/mergedseriesgenerator.go @@ -0,0 +1,141 @@ +package gen + +import ( + "container/heap" + "math" + + "github.com/influxdata/influxdb/models" +) + +type mergedSeriesGenerator struct { + heap seriesGeneratorHeap + last constSeries + err error + n int64 + first bool +} + +func NewMergedSeriesGenerator(s []SeriesGenerator) SeriesGenerator { + if len(s) == 0 { + return nil + } else if len(s) == 1 { + return s[0] + } + + msg := &mergedSeriesGenerator{first: true, n: math.MaxInt64} + msg.heap.init(s) + return msg +} + +func NewMergedSeriesGeneratorLimit(s []SeriesGenerator, n int64) SeriesGenerator { + if len(s) == 0 { + return nil + } + + msg := &mergedSeriesGenerator{first: true, n: n} + msg.heap.init(s) + return msg +} + +func (s *mergedSeriesGenerator) Next() bool { + if len(s.heap.items) == 0 { + return false + } + + if s.n > 0 { + s.n-- + if !s.first { + top := s.heap.items[0] + s.last.CopyFrom(top) // capture last key for duplicate checking + + for { + if top.Next() { + if len(s.heap.items) > 1 { + heap.Fix(&s.heap, 0) + } + } else { + heap.Pop(&s.heap) + if len(s.heap.items) == 0 { + return false + } + } + + top = s.heap.items[0] + if CompareSeries(&s.last, top) == 0 { + // duplicate key, get next + continue + } + return true + } + } + + s.first = false + return true + } + + return false +} + +func (s *mergedSeriesGenerator) Key() []byte { + return s.heap.items[0].Key() +} + +func (s *mergedSeriesGenerator) Name() []byte { + return s.heap.items[0].Name() +} + +func (s *mergedSeriesGenerator) Tags() models.Tags { + return s.heap.items[0].Tags() +} + +func (s *mergedSeriesGenerator) Field() []byte { + return s.heap.items[0].Field() +} + +func (s *mergedSeriesGenerator) TimeValuesGenerator() TimeValuesSequence { + return s.heap.items[0].TimeValuesGenerator() +} + +type seriesGeneratorHeap struct { + items []SeriesGenerator +} + +func (h *seriesGeneratorHeap) init(results []SeriesGenerator) { + if cap(h.items) < len(results) { + h.items = make([]SeriesGenerator, 0, len(results)) + } else { + h.items = h.items[:0] + } + + for _, rs := range results { + if rs.Next() { + h.items = append(h.items, rs) + } + } + heap.Init(h) +} + +func (h *seriesGeneratorHeap) Less(i, j int) bool { + return CompareSeries(h.items[i], h.items[j]) == -1 +} + +func (h *seriesGeneratorHeap) Len() int { + return len(h.items) +} + +func (h *seriesGeneratorHeap) Swap(i, j int) { + h.items[i], h.items[j] = h.items[j], h.items[i] +} + +func (h *seriesGeneratorHeap) Push(x interface{}) { + panic("not implemented") +} + +func (h *seriesGeneratorHeap) Pop() interface{} { + old := h.items + n := len(old) + item := old[n-1] + old[n-1] = nil + h.items = old[0 : n-1] + return item +} diff --git a/pkg/data/gen/mergedseriesgenerator_test.go b/pkg/data/gen/mergedseriesgenerator_test.go new file mode 100644 index 0000000000..0d22907b4f --- /dev/null +++ b/pkg/data/gen/mergedseriesgenerator_test.go @@ -0,0 +1,213 @@ +package gen + +import ( + "fmt" + "math" + "strings" + "testing" + "time" + + "github.com/google/go-cmp/cmp" +) + +func sg(name, prefix, field string, counts ...int) SeriesGenerator { + spec := TimeSequenceSpec{Count: 1, Start: time.Unix(0, 0), Delta: time.Second} + ts := NewTimestampSequenceFromSpec(spec) + vs := NewFloatConstantValuesSequence(1) + vg := NewTimeFloatValuesSequence(spec.Count, ts, vs) + return NewSeriesGenerator([]byte(name), []byte(field), vg, NewTagsValuesSequenceCounts(prefix, counts)) +} + +func tags(sb *strings.Builder, prefix string, vals []int) { + sb.WriteByte(',') + + // max tag width + tw := int(math.Ceil(math.Log10(float64(len(vals))))) + tf := fmt.Sprintf("%s%%0%dd=value%%d", prefix, tw) + tvs := make([]string, len(vals)) + for i := range vals { + tvs[i] = fmt.Sprintf(tf, i, vals[i]) + } + sb.WriteString(strings.Join(tvs, ",")) +} + +func line(name, prefix, field string, vals ...int) string { + var sb strings.Builder + sb.WriteString(name) + tags(&sb, prefix, vals) + sb.WriteString("#!~#") + sb.WriteString(field) + return sb.String() +} + +func seriesGeneratorString(sg SeriesGenerator) []string { + var lines []string + for sg.Next() { + lines = append(lines, fmt.Sprintf("%s#!~#%s", string(sg.Key()), string(sg.Field()))) + } + return lines +} + +func TestNewMergedSeriesGenerator(t *testing.T) { + tests := []struct { + n string + s []SeriesGenerator + exp []string + }{ + { + n: "single", + s: []SeriesGenerator{ + sg("cpu", "t", "f0", 2, 1), + }, + exp: []string{ + line("cpu", "t", "f0", 0, 0), + line("cpu", "t", "f0", 1, 0), + }, + }, + { + n: "multiple,interleaved", + s: []SeriesGenerator{ + sg("cpu", "t", "f0", 2, 1), + sg("cpu", "t", "f1", 2, 1), + }, + exp: []string{ + line("cpu", "t", "f0", 0, 0), + line("cpu", "t", "f1", 0, 0), + line("cpu", "t", "f0", 1, 0), + line("cpu", "t", "f1", 1, 0), + }, + }, + { + n: "multiple,sequential", + s: []SeriesGenerator{ + sg("cpu", "t", "f0", 2), + sg("cpu", "u", "f0", 2, 1), + }, + exp: []string{ + line("cpu", "t", "f0", 0), + line("cpu", "t", "f0", 1), + line("cpu", "u", "f0", 0, 0), + line("cpu", "u", "f0", 1, 0), + }, + }, + { + n: "multiple,sequential", + s: []SeriesGenerator{ + sg("m1", "t", "f0", 2, 1), + sg("m0", "t", "f0", 2, 1), + }, + exp: []string{ + line("m0", "t", "f0", 0, 0), + line("m0", "t", "f0", 1, 0), + line("m1", "t", "f0", 0, 0), + line("m1", "t", "f0", 1, 0), + }, + }, + { + // ensure duplicates are removed + n: "duplicates", + s: []SeriesGenerator{ + sg("cpu", "t", "f0", 2, 1), + sg("cpu", "t", "f0", 2, 1), + }, + exp: []string{ + line("cpu", "t", "f0", 0, 0), + line("cpu", "t", "f0", 1, 0), + }, + }, + { + // ensure duplicates are removed, but non-dupes from same SeriesGenerator + // are still included + n: "duplicates,multiple,interleaved", + s: []SeriesGenerator{ + sg("cpu", "t", "f0", 2, 1), + sg("cpu", "t", "f1", 2, 1), + sg("cpu", "t", "f0", 2, 1), + sg("cpu", "t", "f1", 3, 1), + }, + exp: []string{ + line("cpu", "t", "f0", 0, 0), + line("cpu", "t", "f1", 0, 0), + line("cpu", "t", "f0", 1, 0), + line("cpu", "t", "f1", 1, 0), + line("cpu", "t", "f1", 2, 0), + }, + }, + } + for _, tt := range tests { + t.Run(tt.n, func(t *testing.T) { + sg := NewMergedSeriesGenerator(tt.s) + if got := seriesGeneratorString(sg); !cmp.Equal(got, tt.exp) { + t.Errorf("unpexected -got/+exp\n%s", cmp.Diff(got, tt.exp)) + } + }) + } +} + +func TestNewMergedSeriesGeneratorLimit(t *testing.T) { + tests := []struct { + n string + s []SeriesGenerator + lim int64 + exp []string + }{ + { + n: "single", + s: []SeriesGenerator{ + sg("cpu", "t", "f0", 4, 1), + }, + lim: 2, + exp: []string{ + line("cpu", "t", "f0", 0, 0), + line("cpu", "t", "f0", 1, 0), + }, + }, + { + n: "multiple,interleaved", + s: []SeriesGenerator{ + sg("cpu", "t", "f0", 2, 1), + sg("cpu", "t", "f1", 2, 1), + }, + lim: 3, + exp: []string{ + line("cpu", "t", "f0", 0, 0), + line("cpu", "t", "f1", 0, 0), + line("cpu", "t", "f0", 1, 0), + }, + }, + { + n: "multiple,sequential", + s: []SeriesGenerator{ + sg("cpu", "t", "f0", 2), + sg("cpu", "u", "f0", 2, 1), + }, + lim: 2, + exp: []string{ + line("cpu", "t", "f0", 0), + line("cpu", "t", "f0", 1), + }, + }, + { + n: "multiple,sequential", + s: []SeriesGenerator{ + sg("m1", "t", "f0", 2, 1), + sg("m0", "t", "f0", 2, 1), + }, + lim: 4, + exp: []string{ + line("m0", "t", "f0", 0, 0), + line("m0", "t", "f0", 1, 0), + line("m1", "t", "f0", 0, 0), + line("m1", "t", "f0", 1, 0), + }, + }, + } + for _, tt := range tests { + t.Run(tt.n, func(t *testing.T) { + sg := NewMergedSeriesGeneratorLimit(tt.s, tt.lim) + if got := seriesGeneratorString(sg); !cmp.Equal(got, tt.exp) { + t.Errorf("unpexected -got/+exp\n%s", cmp.Diff(got, tt.exp)) + } + }) + } +} diff --git a/pkg/data/gen/precision_string.go b/pkg/data/gen/precision_string.go new file mode 100644 index 0000000000..8c78f3fc97 --- /dev/null +++ b/pkg/data/gen/precision_string.go @@ -0,0 +1,16 @@ +// Code generated by "stringer -type=precision -trimprefix=precision"; DO NOT EDIT. + +package gen + +import "strconv" + +const _precision_name = "MillisecondNanosecondMicrosecondSecondMinuteHour" + +var _precision_index = [...]uint8{0, 11, 21, 32, 38, 44, 48} + +func (i precision) String() string { + if i >= precision(len(_precision_index)-1) { + return "precision(" + strconv.FormatInt(int64(i), 10) + ")" + } + return _precision_name[_precision_index[i]:_precision_index[i+1]] +} diff --git a/pkg/data/gen/schema.go b/pkg/data/gen/schema.go new file mode 100644 index 0000000000..6334e2cb1b --- /dev/null +++ b/pkg/data/gen/schema.go @@ -0,0 +1,238 @@ +package gen + +import ( + "fmt" +) + +type Visitor interface { + Visit(node SchemaNode) (w Visitor) +} + +type SchemaNode interface { + node() +} + +type Schema struct { + Title string + Version string + SeriesLimit *SeriesLimit `toml:"series-limit"` + Measurements Measurements +} + +func (*Schema) node() {} + +type Measurements []Measurement + +func (Measurements) node() {} + +type Tags []Tag + +func (Tags) node() {} + +type Fields []Field + +func (Fields) node() {} + +type Measurement struct { + Name string + SeriesLimit *SeriesLimit `toml:"series-limit"` + Sample *sample + Tags Tags + Fields Fields +} + +func (*Measurement) node() {} + +type TagSource interface { + fmt.Stringer + SchemaNode + tagsource() +} + +type Tag struct { + Name string + Source TagSource +} + +func (*Tag) node() {} + +type TagArraySource struct { + Values []string +} + +func (*TagArraySource) node() {} +func (*TagArraySource) tagsource() {} + +func (s *TagArraySource) String() string { + return fmt.Sprintf("array, source=%#v", s.Values) +} + +type TagSequenceSource struct { + Format string + Start int64 + Count int64 +} + +func (*TagSequenceSource) node() {} +func (*TagSequenceSource) tagsource() {} + +func (t *TagSequenceSource) String() string { + return fmt.Sprintf("sequence, prefix=%q, range=[%d,%d)", t.Format, t.Start, t.Start+t.Count) +} + +type TagFileSource struct { + Path string +} + +func (*TagFileSource) node() {} +func (*TagFileSource) tagsource() {} + +func (s *TagFileSource) String() string { + return fmt.Sprintf("file, path=%s", s.Path) +} + +type FieldSource interface { + fmt.Stringer + SchemaNode + fieldsource() +} + +type Field struct { + Name string + Count int64 + TimePrecision precision `toml:"time-precision"` // TimePrecision determines the precision for generated timestamp values + Source FieldSource +} + +func (*Field) node() {} + +type FieldConstantValue struct { + Value interface{} +} + +func (*FieldConstantValue) node() {} +func (*FieldConstantValue) fieldsource() {} + +func (f *FieldConstantValue) String() string { + return fmt.Sprintf("constant, source=%#v", f.Value) +} + +type FieldArraySource struct { + Value interface{} +} + +func (*FieldArraySource) node() {} +func (*FieldArraySource) fieldsource() {} + +func (f *FieldArraySource) String() string { + return fmt.Sprintf("array, source=%#v", f.Value) +} + +type FieldFloatRandomSource struct { + Seed int64 + Min, Max float64 +} + +func (*FieldFloatRandomSource) node() {} +func (*FieldFloatRandomSource) fieldsource() {} + +func (f *FieldFloatRandomSource) String() string { + return fmt.Sprintf("rand, seed=%d, min=%f, max=%f", f.Seed, f.Max, f.Max) +} + +type FieldIntegerZipfSource struct { + Seed int64 + S, V float64 + IMAX uint64 +} + +func (*FieldIntegerZipfSource) node() {} +func (*FieldIntegerZipfSource) fieldsource() {} + +func (f *FieldIntegerZipfSource) String() string { + return fmt.Sprintf("rand, seed=%d, s=%f, v=%f, imax=%d", f.Seed, f.S, f.V, f.IMAX) +} + +type VisitorFn func(node SchemaNode) bool + +func (fn VisitorFn) Visit(node SchemaNode) (w Visitor) { + if fn(node) { + return fn + } + return nil +} + +// WalkDown performs a pre-order, depth-first traversal of the graph, calling v for each node. +// Pre-order starts by calling the visitor for the root and each child as it traverses down +// the graph to the leaves. +func WalkDown(v Visitor, node SchemaNode) { + walk(v, node, false) +} + +// WalkUp performs a post-order, depth-first traversal of the graph, calling v for each node. +// Post-order starts by calling the visitor for the leaves then each parent as it traverses up +// the graph to the root. +func WalkUp(v Visitor, node SchemaNode) { + walk(v, node, true) +} + +func walk(v Visitor, node SchemaNode, up bool) Visitor { + if v == nil { + return nil + } + + if !up { + if v = v.Visit(node); v == nil { + return nil + } + } + + switch n := node.(type) { + case *Schema: + walk(v, n.Measurements, up) + + case Measurements: + v := v + for i := range n { + v = walk(v, &n[i], up) + } + + case *Measurement: + v := v + v = walk(v, n.Tags, up) + v = walk(v, n.Fields, up) + + case Fields: + v := v + for i := 0; i < len(n); i++ { + v = walk(v, &n[i], up) + } + + case Tags: + v := v + for i := 0; i < len(n); i++ { + v = walk(v, &n[i], up) + } + + case *Tag: + walk(v, n.Source, up) + + case *TagArraySource, *TagSequenceSource, *TagFileSource: + // nothing to do + + case *Field: + walk(v, n.Source, up) + + case *FieldConstantValue, *FieldArraySource, *FieldFloatRandomSource, *FieldIntegerZipfSource: + // nothing to do + + default: + panic(fmt.Sprintf("schema.Walk: unexpected node type %T", n)) + } + + if up && v != nil { + v = v.Visit(node) + } + + return v +} diff --git a/pkg/data/gen/sequence.go b/pkg/data/gen/sequence.go new file mode 100644 index 0000000000..4fc469af85 --- /dev/null +++ b/pkg/data/gen/sequence.go @@ -0,0 +1,84 @@ +package gen + +import ( + "fmt" + "math" +) + +type Sequence interface { + Next() bool + Value() string +} + +type CountableSequence interface { + Sequence + Count() int +} + +type CounterByteSequence struct { + format string + nfmt string + val string + s int + i int + end int +} + +func NewCounterByteSequenceCount(n int) *CounterByteSequence { + return NewCounterByteSequence("value%s", 0, n) +} + +func NewCounterByteSequence(format string, start, end int) *CounterByteSequence { + s := &CounterByteSequence{ + format: format, + nfmt: fmt.Sprintf("%%0%dd", int(math.Ceil(math.Log10(float64(end))))), + s: start, + i: start, + end: end, + } + s.update() + return s +} + +func (s *CounterByteSequence) Next() bool { + s.i++ + if s.i >= s.end { + s.i = s.s + } + s.update() + return true +} + +func (s *CounterByteSequence) update() { + s.val = fmt.Sprintf(s.format, fmt.Sprintf(s.nfmt, s.i)) +} + +func (s *CounterByteSequence) Value() string { return s.val } +func (s *CounterByteSequence) Count() int { return s.end - s.s } + +type StringArraySequence struct { + vals []string + c int + i int +} + +func NewStringArraySequence(vals []string) *StringArraySequence { + return &StringArraySequence{vals: sortDedupStrings(vals)} +} + +func (s *StringArraySequence) Next() bool { + s.i++ + if s.i == len(s.vals) { + s.i = 0 + } + s.c = s.i + return true +} + +func (s *StringArraySequence) Value() string { + return s.vals[s.c] +} + +func (s *StringArraySequence) Count() int { + return len(s.vals) +} diff --git a/pkg/data/gen/series.go b/pkg/data/gen/series.go new file mode 100644 index 0000000000..c62bbe6332 --- /dev/null +++ b/pkg/data/gen/series.go @@ -0,0 +1,63 @@ +package gen + +import ( + "bytes" +) + +type Series interface { + // Key returns the series key. + // The returned value may be cached. + Key() []byte + + // Field returns the name of the field. + // The returned value may be modified by a subsequent call to Next. + Field() []byte +} + +type constSeries struct { + key []byte + field []byte +} + +func (s *constSeries) Key() []byte { return s.key } +func (s *constSeries) Field() []byte { return s.field } + +var nilSeries Series = &constSeries{} + +// Compare returns an integer comparing two SeriesGenerator instances +// lexicographically. +// The result will be 0 if a==b, -1 if a < b, and +1 if a > b. +// A nil argument is equivalent to an empty SeriesGenerator. +func CompareSeries(a, b Series) int { + if a == nil { + a = nilSeries + } + if b == nil { + b = nilSeries + } + + switch res := bytes.Compare(a.Key(), b.Key()); res { + case 0: + return bytes.Compare(a.Field(), b.Field()) + default: + return res + } +} + +func (s *constSeries) CopyFrom(a Series) { + key := a.Key() + if cap(s.key) < len(key) { + s.key = make([]byte, len(key)) + } else { + s.key = s.key[:len(key)] + } + copy(s.key, key) + + field := a.Field() + if cap(s.field) < len(field) { + s.field = make([]byte, len(field)) + } else { + s.field = s.field[:len(field)] + } + copy(s.field, field) +} diff --git a/pkg/data/gen/series_test.go b/pkg/data/gen/series_test.go new file mode 100644 index 0000000000..5f7c3d8bb3 --- /dev/null +++ b/pkg/data/gen/series_test.go @@ -0,0 +1,74 @@ +package gen + +import ( + "testing" + + "github.com/google/go-cmp/cmp" +) + +func TestCompareSeries(t *testing.T) { + mk := func(k, f string) Series { + return &constSeries{key: []byte(k), field: []byte(f)} + } + + tests := []struct { + name string + a Series + b Series + exp int + }{ + { + name: "nil a,b", + exp: 0, + }, + { + name: "a(nil) < b", + a: nil, + b: mk("cpu,t0=v0", "f0"), + exp: -1, + }, + { + name: "a > b(nil)", + a: mk("cpu,t0=v0", "f0"), + b: nil, + exp: 1, + }, + { + name: "a = b", + a: mk("cpu,t0=v0", "f0"), + b: mk("cpu,t0=v0", "f0"), + exp: 0, + }, + { + name: "a(f0) < b(f1)", + a: mk("cpu,t0=v0", "f0"), + b: mk("cpu,t0=v0", "f1"), + exp: -1, + }, + { + name: "a(v0) < b(v1)", + a: mk("cpu,t0=v0", "f0"), + b: mk("cpu,t0=v1", "f0"), + exp: -1, + }, + { + name: "a(f1) > b(f0)", + a: mk("cpu,t0=v0", "f1"), + b: mk("cpu,t0=v0", "f0"), + exp: 1, + }, + { + name: "a(v1) > b(v0)", + a: mk("cpu,t0=v1", "f0"), + b: mk("cpu,t0=v0", "f0"), + exp: 1, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := CompareSeries(tt.a, tt.b); got != tt.exp { + t.Errorf("unexpected value -got/+exp\n%s", cmp.Diff(got, tt.exp)) + } + }) + } +} diff --git a/pkg/data/gen/seriesgenerator.go b/pkg/data/gen/seriesgenerator.go new file mode 100644 index 0000000000..b7df0b44d1 --- /dev/null +++ b/pkg/data/gen/seriesgenerator.go @@ -0,0 +1,133 @@ +package gen + +import ( + "math" + "time" + + "github.com/influxdata/influxdb/models" + "github.com/influxdata/platform/pkg/data/gen" +) + +type SeriesGenerator interface { + // Next advances the series generator to the next series key. + Next() bool + + // Key returns the series key. + // The returned value may be cached. + Key() []byte + + // Name returns the name of the measurement. + // The returned value may be modified by a subsequent call to Next. + Name() []byte + + // Tags returns the tag set. + // The returned value may be modified by a subsequent call to Next. + Tags() models.Tags + + // Field returns the name of the field. + // The returned value may be modified by a subsequent call to Next. + Field() []byte + + // TimeValuesGenerator returns a values sequence for the current series. + TimeValuesGenerator() TimeValuesSequence +} + +type TimeSequenceSpec struct { + // Count specifies the number of values to generate. + Count int + + // Start specifies the starting time for the values. + Start time.Time + + // Delta specifies the interval between time stamps. + Delta time.Duration + + // Precision specifies the precision of timestamp intervals + Precision time.Duration +} + +type TimeRange struct { + Start time.Time + End time.Time +} + +type TimeValuesSequence interface { + Reset() + Next() bool + Values() Values +} + +type Values interface { + MinTime() int64 + MaxTime() int64 + Encode([]byte) ([]byte, error) +} + +type cache struct { + key []byte + tags models.Tags +} + +type seriesGenerator struct { + name []byte + tags gen.TagsSequence + field []byte + vg TimeValuesSequence + n int64 + + c cache +} + +func NewSeriesGenerator(name []byte, field []byte, vg TimeValuesSequence, tags TagsSequence) SeriesGenerator { + return NewSeriesGeneratorLimit(name, field, vg, tags, math.MaxInt64) +} + +func NewSeriesGeneratorLimit(name []byte, field []byte, vg TimeValuesSequence, tags TagsSequence, n int64) SeriesGenerator { + return &seriesGenerator{ + name: name, + field: field, + tags: tags, + vg: vg, + n: n, + } +} + +func (g *seriesGenerator) Next() bool { + if g.n > 0 { + g.n-- + if g.tags.Next() { + g.c = cache{} + g.vg.Reset() + return true + } + g.n = 0 + } + + return false +} + +func (g *seriesGenerator) Key() []byte { + if len(g.c.key) == 0 { + g.c.key = models.MakeKey(g.name, g.tags.Value()) + } + return g.c.key +} + +func (g *seriesGenerator) Name() []byte { + return g.name +} + +func (g *seriesGenerator) Tags() models.Tags { + if len(g.c.tags) == 0 { + g.c.tags = g.tags.Value().Clone() + } + return g.c.tags +} + +func (g *seriesGenerator) Field() []byte { + return g.field +} + +func (g *seriesGenerator) TimeValuesGenerator() TimeValuesSequence { + return g.vg +} diff --git a/pkg/data/gen/specs.go b/pkg/data/gen/specs.go new file mode 100644 index 0000000000..2ce657c5d0 --- /dev/null +++ b/pkg/data/gen/specs.go @@ -0,0 +1,569 @@ +package gen + +import ( + "bufio" + "fmt" + "math/rand" + "os" + "path" + "path/filepath" + "sort" + "time" + "unicode/utf8" + + "github.com/BurntSushi/toml" + "github.com/influxdata/influxdb/models" +) + +type Spec struct { + SeriesLimit *int64 + Measurements []MeasurementSpec +} + +func NewSeriesGeneratorFromSpec(s *Spec, tr TimeRange) SeriesGenerator { + sg := make([]SeriesGenerator, len(s.Measurements)) + for i := range s.Measurements { + sg[i] = newSeriesGeneratorFromMeasurementSpec(&s.Measurements[i], tr) + } + if s.SeriesLimit == nil { + return NewMergedSeriesGenerator(sg) + } + return NewMergedSeriesGeneratorLimit(sg, *s.SeriesLimit) +} + +type MeasurementSpec struct { + Name string + SeriesLimit *SeriesLimit + TagsSpec *TagsSpec + FieldValuesSpec *FieldValuesSpec +} + +func newSeriesGeneratorFromMeasurementSpec(ms *MeasurementSpec, tr TimeRange) SeriesGenerator { + if ms.SeriesLimit == nil { + return NewSeriesGenerator( + []byte(ms.Name), + []byte(ms.FieldValuesSpec.Name), + newTimeValuesSequenceFromFieldValuesSpec(ms.FieldValuesSpec, tr), + newTagsSequenceFromTagsSpec(ms.TagsSpec)) + } + return NewSeriesGeneratorLimit( + []byte(ms.Name), + []byte(ms.FieldValuesSpec.Name), + newTimeValuesSequenceFromFieldValuesSpec(ms.FieldValuesSpec, tr), + newTagsSequenceFromTagsSpec(ms.TagsSpec), + int64(*ms.SeriesLimit)) +} + +// NewTimeValuesSequenceFn returns a TimeValuesSequence that will generate a +// sequence of values based on the spec. +type NewTimeValuesSequenceFn func(spec TimeSequenceSpec) TimeValuesSequence + +type NewTagsValuesSequenceFn func() TagsSequence + +type NewCountableSequenceFn func() CountableSequence + +type TagsSpec struct { + Tags []*TagValuesSpec + Sample *sample +} + +func newTagsSequenceFromTagsSpec(ts *TagsSpec) TagsSequence { + var keys []string + var vals []CountableSequence + for _, spec := range ts.Tags { + keys = append(keys, spec.TagKey) + vals = append(vals, spec.Values()) + } + + var opts []tagsValuesOption + if ts.Sample != nil && *ts.Sample != 1.0 { + opts = append(opts, TagValuesSampleOption(float64(*ts.Sample))) + } + + return NewTagsValuesSequenceKeysValues(keys, vals, opts...) +} + +type TagValuesSpec struct { + TagKey string + Values NewCountableSequenceFn +} + +type FieldValuesSpec struct { + TimeSequenceSpec + Name string + DataType models.FieldType + Values NewTimeValuesSequenceFn +} + +func newTimeValuesSequenceFromFieldValuesSpec(fs *FieldValuesSpec, tr TimeRange) TimeValuesSequence { + ts := fs.TimeSequenceSpec + ts.Start = tr.Start + ts.Delta = tr.End.Sub(tr.Start) / time.Duration(ts.Count) + ts.Delta = ts.Delta.Round(ts.Precision) + + return fs.Values(ts) +} + +func NewSpecFromToml(s string) (*Spec, error) { + var out Schema + if _, err := toml.Decode(s, &out); err != nil { + return nil, err + } + return NewSpecFromSchema(&out) +} + +func NewSpecFromPath(p string) (*Spec, error) { + var err error + p, err = filepath.Abs(p) + if err != nil { + return nil, err + } + + var out Schema + if _, err := toml.DecodeFile(p, &out); err != nil { + return nil, err + } + return newSpecFromSchema(&out, schemaDir(path.Dir(p))) +} + +func NewSchemaFromPath(path string) (*Schema, error) { + var out Schema + if _, err := toml.DecodeFile(path, &out); err != nil { + return nil, err + } + return &out, nil +} + +type schemaToSpecState int + +const ( + stateOk schemaToSpecState = iota + stateErr +) + +type schemaToSpec struct { + schemaDir string + stack []interface{} + state schemaToSpecState + spec *Spec + err error +} + +func (s *schemaToSpec) push(v interface{}) { + s.stack = append(s.stack, v) +} + +func (s *schemaToSpec) pop() interface{} { + tail := len(s.stack) - 1 + v := s.stack[tail] + s.stack[tail] = nil + s.stack = s.stack[:tail] + return v +} + +func (s *schemaToSpec) peek() interface{} { + if len(s.stack) == 0 { + return nil + } + return s.stack[len(s.stack)-1] +} + +func (s *schemaToSpec) Visit(node SchemaNode) (w Visitor) { + switch s.state { + case stateOk: + if s.visit(node) { + return s + } + s.state = stateErr + + case stateErr: + s.visitErr(node) + } + + return nil +} + +func (s *schemaToSpec) visit(node SchemaNode) bool { + switch n := node.(type) { + case *Schema: + s.spec.Measurements = s.pop().([]MeasurementSpec) + if n.SeriesLimit != nil { + sl := int64(*n.SeriesLimit) + s.spec.SeriesLimit = &sl + } + + case Measurements: + // flatten measurements + var mss []MeasurementSpec + for { + if specs, ok := s.peek().([]MeasurementSpec); ok { + s.pop() + mss = append(mss, specs...) + continue + } + break + } + sort.Slice(mss, func(i, j int) bool { + return mss[i].Name < mss[j].Name + }) + + // validate field types are homogeneous for a single measurement + mg := make(map[string]models.FieldType) + for i := range mss { + spec := &mss[i] + key := spec.Name + "." + spec.FieldValuesSpec.Name + ft := spec.FieldValuesSpec.DataType + if dt, ok := mg[key]; !ok { + mg[key] = ft + } else if dt != ft { + s.err = fmt.Errorf("field %q data-type conflict, found %s and %s", + key, + dt, + ft) + return false + } + } + + s.push(mss) + + case *Measurement: + var ms []MeasurementSpec + + fields := s.pop().([]*FieldValuesSpec) + tagsSpec := s.pop().(*TagsSpec) + + tagsSpec.Sample = n.Sample + + // default: sample 50% + if n.Sample == nil { + s := sample(0.5) + tagsSpec.Sample = &s + } + + for _, spec := range fields { + ms = append(ms, MeasurementSpec{ + Name: n.Name, + SeriesLimit: n.SeriesLimit, + TagsSpec: tagsSpec, + FieldValuesSpec: spec, + }) + } + + // NOTE: sort each measurement name + field name to ensure series are produced + // in correct order + sort.Slice(ms, func(i, j int) bool { + return ms[i].FieldValuesSpec.Name < ms[j].FieldValuesSpec.Name + }) + s.push(ms) + + case Tags: + var ts TagsSpec + for { + if spec, ok := s.peek().(*TagValuesSpec); ok { + s.pop() + ts.Tags = append(ts.Tags, spec) + continue + } + break + } + // Tag keys must be sorted to produce a valid series key sequence + sort.Slice(ts.Tags, func(i, j int) bool { + return ts.Tags[i].TagKey < ts.Tags[j].TagKey + }) + + for i := 1; i < len(ts.Tags); i++ { + if ts.Tags[i-1].TagKey == ts.Tags[i].TagKey { + s.err = fmt.Errorf("duplicate tag keys %q", ts.Tags[i].TagKey) + return false + } + } + + s.push(&ts) + + case Fields: + // combine fields + var fs []*FieldValuesSpec + for { + if spec, ok := s.peek().(*FieldValuesSpec); ok { + s.pop() + fs = append(fs, spec) + continue + } + break + } + + sort.Slice(fs, func(i, j int) bool { + return fs[i].Name < fs[j].Name + }) + + for i := 1; i < len(fs); i++ { + if fs[i-1].Name == fs[i].Name { + s.err = fmt.Errorf("duplicate field names %q", fs[i].Name) + return false + } + } + + s.push(fs) + + case *Field: + fs, ok := s.peek().(*FieldValuesSpec) + if !ok { + panic(fmt.Sprintf("unexpected type %T", fs)) + } + + fs.TimeSequenceSpec = TimeSequenceSpec{ + Count: int(n.Count), + Precision: n.TimePrecision.ToDuration(), + } + fs.Name = n.Name + + case *FieldConstantValue: + var fs FieldValuesSpec + switch v := n.Value.(type) { + case float64: + fs.DataType = models.Float + fs.Values = func(spec TimeSequenceSpec) TimeValuesSequence { + return NewTimeFloatValuesSequence( + spec.Count, + NewTimestampSequenceFromSpec(spec), + NewFloatConstantValuesSequence(v), + ) + } + case int64: + fs.DataType = models.Integer + fs.Values = func(spec TimeSequenceSpec) TimeValuesSequence { + return NewTimeIntegerValuesSequence( + spec.Count, + NewTimestampSequenceFromSpec(spec), + NewIntegerConstantValuesSequence(v), + ) + } + case string: + fs.DataType = models.String + fs.Values = func(spec TimeSequenceSpec) TimeValuesSequence { + return NewTimeStringValuesSequence( + spec.Count, + NewTimestampSequenceFromSpec(spec), + NewStringConstantValuesSequence(v), + ) + } + case bool: + fs.DataType = models.Boolean + fs.Values = func(spec TimeSequenceSpec) TimeValuesSequence { + return NewTimeBooleanValuesSequence( + spec.Count, + NewTimestampSequenceFromSpec(spec), + NewBooleanConstantValuesSequence(v), + ) + } + default: + panic(fmt.Sprintf("unexpected type %T", v)) + } + + s.push(&fs) + + case *FieldArraySource: + var fs FieldValuesSpec + switch v := n.Value.(type) { + case []float64: + fs.DataType = models.Float + fs.Values = func(spec TimeSequenceSpec) TimeValuesSequence { + return NewTimeFloatValuesSequence( + spec.Count, + NewTimestampSequenceFromSpec(spec), + NewFloatArrayValuesSequence(v), + ) + } + case []int64: + fs.DataType = models.Integer + fs.Values = func(spec TimeSequenceSpec) TimeValuesSequence { + return NewTimeIntegerValuesSequence( + spec.Count, + NewTimestampSequenceFromSpec(spec), + NewIntegerArrayValuesSequence(v), + ) + } + case []string: + fs.DataType = models.String + fs.Values = func(spec TimeSequenceSpec) TimeValuesSequence { + return NewTimeStringValuesSequence( + spec.Count, + NewTimestampSequenceFromSpec(spec), + NewStringArrayValuesSequence(v), + ) + } + case []bool: + fs.DataType = models.Boolean + fs.Values = func(spec TimeSequenceSpec) TimeValuesSequence { + return NewTimeBooleanValuesSequence( + spec.Count, + NewTimestampSequenceFromSpec(spec), + NewBooleanArrayValuesSequence(v), + ) + } + default: + panic(fmt.Sprintf("unexpected type %T", v)) + } + + s.push(&fs) + + case *FieldFloatRandomSource: + var fs FieldValuesSpec + fs.DataType = models.Float + fs.Values = NewTimeValuesSequenceFn(func(spec TimeSequenceSpec) TimeValuesSequence { + return NewTimeFloatValuesSequence( + spec.Count, + NewTimestampSequenceFromSpec(spec), + NewFloatRandomValuesSequence(n.Min, n.Max, rand.New(rand.NewSource(n.Seed))), + ) + }) + s.push(&fs) + + case *FieldIntegerZipfSource: + var fs FieldValuesSpec + fs.DataType = models.Integer + fs.Values = NewTimeValuesSequenceFn(func(spec TimeSequenceSpec) TimeValuesSequence { + return NewTimeIntegerValuesSequence( + spec.Count, + NewTimestampSequenceFromSpec(spec), + NewIntegerZipfValuesSequence(n), + ) + }) + s.push(&fs) + + case *Tag: + s.push(&TagValuesSpec{ + TagKey: n.Name, + Values: s.pop().(NewCountableSequenceFn), + }) + + case *TagSequenceSource: + s.push(NewCountableSequenceFn(func() CountableSequence { + return NewCounterByteSequence(n.Format, int(n.Start), int(n.Start+n.Count)) + })) + + case *TagFileSource: + p, err := s.resolvePath(n.Path) + if err != nil { + s.err = err + return false + } + + lines, err := s.readLines(p) + if err != nil { + s.err = err + return false + } + + s.push(NewCountableSequenceFn(func() CountableSequence { + return NewStringArraySequence(lines) + })) + + case *TagArraySource: + s.push(NewCountableSequenceFn(func() CountableSequence { + return NewStringArraySequence(n.Values) + })) + + case nil: + + default: + panic(fmt.Sprintf("unexpected type %T", node)) + } + + return true +} + +func (s *schemaToSpec) visitErr(node SchemaNode) { + switch n := node.(type) { + case *Schema: + s.err = fmt.Errorf("error processing schema: %v", s.err) + case *Measurement: + s.err = fmt.Errorf("measurement %q: %v", n.Name, s.err) + case *Tag: + s.err = fmt.Errorf("tag %q: %v", n.Name, s.err) + case *Field: + s.err = fmt.Errorf("field %q: %v", n.Name, s.err) + } +} + +func (s *schemaToSpec) resolvePath(p string) (string, error) { + fullPath := os.ExpandEnv(p) + if !filepath.IsAbs(fullPath) { + fullPath = filepath.Join(s.schemaDir, fullPath) + } + + fi, err := os.Stat(fullPath) + if err != nil { + return "", fmt.Errorf("error resolving path %q: %v", p, err) + } + + if fi.IsDir() { + return "", fmt.Errorf("path %q is not a file: resolved to %s", p, fullPath) + } + + return fullPath, nil +} + +func (s *schemaToSpec) readLines(p string) ([]string, error) { + fp, err := s.resolvePath(p) + if err != nil { + return nil, err + } + + f, err := os.Open(fp) + if err != nil { + return nil, fmt.Errorf("path error: %v", err) + } + defer f.Close() + scan := bufio.NewScanner(f) + scan.Split(bufio.ScanLines) + + n := 0 + var lines []string + + for scan.Scan() { + if len(scan.Bytes()) == 0 { + // skip empty lines + continue + } + + if !utf8.Valid(scan.Bytes()) { + return nil, fmt.Errorf("path %q, invalid UTF-8 on line %d", p, n) + } + lines = append(lines, scan.Text()) + } + + if scan.Err() != nil { + return nil, scan.Err() + } + + return lines, nil +} + +type option func(s *schemaToSpec) + +func schemaDir(p string) option { + return func(s *schemaToSpec) { + s.schemaDir = p + } +} + +func NewSpecFromSchema(root *Schema) (*Spec, error) { + return newSpecFromSchema(root) +} + +func newSpecFromSchema(root *Schema, opts ...option) (*Spec, error) { + var spec Spec + + vis := &schemaToSpec{spec: &spec} + for _, o := range opts { + o(vis) + } + + WalkUp(vis, root) + if vis.err != nil { + return nil, vis.err + } + + return &spec, nil +} diff --git a/pkg/data/gen/specs_test.go b/pkg/data/gen/specs_test.go new file mode 100644 index 0000000000..7bfcf61a15 --- /dev/null +++ b/pkg/data/gen/specs_test.go @@ -0,0 +1,78 @@ +package gen + +import ( + "testing" + + "github.com/BurntSushi/toml" +) + +func TestSpecFromSchema(t *testing.T) { + in := ` +title = "example schema" + +[[measurements]] +name = "m0" +tags = [ + { name = "tag0", source = [ "host1", "host2" ] }, + { name = "tag1", source = [ "process1", "process2" ] }, + { name = "tag2", source = { type = "sequence", format = "value%s", start = 0, count = 100 } } +] +fields = [ + { name = "f0", count = 5000, source = 0.5 }, + { name = "f1", count = 5000, source = 0.5 }, +] +[[measurements]] +name = "m1" +tags = [ + { name = "tag0", source = [ "host1", "host2" ] }, +] +fields = [ + { name = "f0", count = 5000, source = 0.5 }, +] +` + var out Schema + if _, err := toml.Decode(in, &out); err != nil { + t.Fatalf("unxpected error: %v", err) + } + + spec, err := NewSpecFromSchema(&out) + if err != nil { + t.Error(err) + } + t.Log(spec) +} + +func TestSpecFromSchemaError(t *testing.T) { + in := ` +title = "example schema" + +[[measurements]] +name = "m0" +tags = [ + { name = "tag0", source = [ "host1", "host2" ] }, + { name = "tag1", source = { type = "sequence", format = "value%s", start = 0, count = 100 } }, +] +fields = [ + { name = "f0", count = 5000, source = 0.5 }, +] +[[measurements]] +name = "m1" +tags = [ + { name = "tag0", source = [ "host1", "host2" ] }, +] +fields = [ + { name = "f0", count = 5000, source = 0.5 }, +] +` + + var out Schema + if _, err := toml.Decode(in, &out); err != nil { + t.Fatalf("unxpected error: %v", err) + } + + spec, err := NewSpecFromSchema(&out) + if err != nil { + t.Error(err) + } + t.Log(spec) +} diff --git a/pkg/data/gen/tags_sequence.go b/pkg/data/gen/tags_sequence.go new file mode 100644 index 0000000000..3330cc4063 --- /dev/null +++ b/pkg/data/gen/tags_sequence.go @@ -0,0 +1,175 @@ +package gen + +import ( + "fmt" + "math" + "math/rand" + "sort" + + "github.com/influxdata/influxdb/models" +) + +type TagsSequence interface { + Next() bool + Value() models.Tags + Count() int +} + +type tagsValuesSequence struct { + tags models.Tags + vals []CountableSequence + n int + count int + sample float64 + src rand.Source + nextFn func(*tagsValuesSequence) bool +} + +type tagsValuesOption func(s *tagsValuesSequence) + +func TagValuesLimitOption(n int) tagsValuesOption { + return func(s *tagsValuesSequence) { + if n >= s.count { + return + } + + s.src = rand.NewSource(20040409) + s.sample = float64(n) / float64(s.count) + } +} + +func TagValuesSampleOption(n float64) tagsValuesOption { + return func(s *tagsValuesSequence) { + if n <= 0.0 || n > 1.0 { + panic("expect: 0.0 < n ≤ 1.0") + } + + s.src = rand.NewSource(int64(float64(math.MaxInt64>>1) * n)) + s.sample = n + s.nextFn = (*tagsValuesSequence).nextSample + } +} + +func NewTagsValuesSequenceKeysValues(keys []string, vals []CountableSequence, opts ...tagsValuesOption) TagsSequence { + tm := make(map[string]string, len(keys)) + for _, k := range keys { + tm[k] = "" + } + + count := 1 + for i := range vals { + count *= vals[i].Count() + } + + // models.Tags are ordered, so ensure vals are ordered with respect to keys + sort.Sort(keyValues{keys, vals}) + + s := &tagsValuesSequence{ + tags: models.NewTags(tm), + vals: vals, + count: count, + nextFn: (*tagsValuesSequence).next, + } + + for _, o := range opts { + o(s) + } + + return s +} + +func NewTagsValuesSequenceValues(prefix string, vals []CountableSequence) TagsSequence { + keys := make([]string, len(vals)) + // max tag width + tw := int(math.Ceil(math.Log10(float64(len(vals))))) + tf := fmt.Sprintf("%s%%0%dd", prefix, tw) + for i := range vals { + keys[i] = fmt.Sprintf(tf, i) + } + return NewTagsValuesSequenceKeysValues(keys, vals) +} + +func NewTagsValuesSequenceCounts(prefix string, counts []int) TagsSequence { + tv := make([]CountableSequence, len(counts)) + for i := range counts { + tv[i] = NewCounterByteSequenceCount(counts[i]) + } + return NewTagsValuesSequenceValues(prefix, tv) +} + +func (s *tagsValuesSequence) next() bool { + if s.n >= s.count { + return false + } + + for i := range s.vals { + s.tags[i].Value = []byte(s.vals[i].Value()) + } + + s.n++ + i := s.n + for j := len(s.vals) - 1; j >= 0; j-- { + v := s.vals[j] + v.Next() + c := v.Count() + if r := i % c; r != 0 { + break + } + i /= c + } + + return true +} + +func (s *tagsValuesSequence) skip() bool { + return (float64(s.src.Int63()>>10))*(1.0/9007199254740992.0) > s.sample +} + +func (s *tagsValuesSequence) nextSample() bool { + if s.n >= s.count { + return false + } + + for i := range s.vals { + s.tags[i].Value = []byte(s.vals[i].Value()) + } + + for { + s.n++ + i := s.n + for j := len(s.vals) - 1; j >= 0; j-- { + v := s.vals[j] + v.Next() + c := v.Count() + if r := i % c; r != 0 { + break + } + i /= c + } + + if !s.skip() { + break + } + } + + return true +} + +func (s *tagsValuesSequence) Next() bool { + return s.nextFn(s) +} + +func (s *tagsValuesSequence) Value() models.Tags { return s.tags } +func (s *tagsValuesSequence) Count() int { return s.count } + +type keyValues struct { + keys []string + vals []CountableSequence +} + +func (k keyValues) Len() int { return len(k.keys) } +func (k keyValues) Less(i, j int) bool { return k.keys[i] < k.keys[j] } +func (k keyValues) Swap(i, j int) { + k.keys[i], k.keys[j] = k.keys[j], k.keys[i] + k.vals[i], k.vals[j] = k.vals[j], k.vals[i] +} diff --git a/pkg/data/gen/timestamp_sequence.go b/pkg/data/gen/timestamp_sequence.go new file mode 100644 index 0000000000..e2b7aee66d --- /dev/null +++ b/pkg/data/gen/timestamp_sequence.go @@ -0,0 +1,36 @@ +package gen + +type TimestampSequence interface { + Reset() + Write(ts []int64) +} + +type timestampSequence struct { + t int64 + start int64 + delta int64 +} + +func NewTimestampSequenceFromSpec(spec TimeSequenceSpec) TimestampSequence { + return ×tampSequence{ + t: spec.Start.UnixNano(), + start: spec.Start.UnixNano(), + delta: int64(spec.Delta), + } +} + +func (g *timestampSequence) Reset() { + g.t = g.start +} + +func (g *timestampSequence) Write(ts []int64) { + var ( + t = g.t + d = g.delta + ) + for i := 0; i < len(ts); i++ { + ts[i] = t + t += d + } + g.t = t +} diff --git a/pkg/data/gen/toml.go b/pkg/data/gen/toml.go new file mode 100644 index 0000000000..fd227edd22 --- /dev/null +++ b/pkg/data/gen/toml.go @@ -0,0 +1,409 @@ +package gen + +import ( + "errors" + "fmt" + "strings" + "time" + + "github.com/spf13/cast" +) + +type SeriesLimit int64 + +func (s *SeriesLimit) UnmarshalTOML(data interface{}) error { + v, ok := data.(int64) + if !ok { + return errors.New("series-limit: invalid value") + } + + if v < 0 { + return errors.New("series-limit: must be ≥ 0") + } + + *s = SeriesLimit(v) + return nil +} + +type sample float64 + +func (s *sample) UnmarshalTOML(data interface{}) error { + v, ok := data.(float64) + if !ok { + return errors.New("sample: must be a float") + } + + if v <= 0 || v > 1.0 { + return errors.New("sample: must be 0 < sample ≤ 1.0") + } + + *s = sample(v) + + return nil +} + +type precision byte + +const ( + precisionMillisecond precision = iota // default + precisionNanosecond + precisionMicrosecond + precisionSecond + precisionMinute + precisionHour +) + +var precisionToDuration = [...]time.Duration{ + time.Millisecond, + time.Nanosecond, + time.Microsecond, + time.Second, + time.Minute, + time.Minute * 60, + time.Nanosecond, + time.Nanosecond, +} + +func (p *precision) ToDuration() time.Duration { + return precisionToDuration[*p&0x7] +} + +func (p *precision) UnmarshalTOML(data interface{}) error { + d, ok := data.(string) + if !ok { + return fmt.Errorf("invalid precision, expect one of (ns, us, ms, s, m, h): %T", data) + } + + d = strings.ToLower(d) + + switch d { + case "ns", "nanosecond": + *p = precisionNanosecond + case "us", "microsecond", "µs": + *p = precisionMicrosecond + case "ms", "millisecond": + *p = precisionMillisecond + case "s", "second": + *p = precisionSecond + case "m", "minute": + *p = precisionMinute + case "h", "hour": + *p = precisionHour + default: + return fmt.Errorf("invalid precision, expect one of (ns, ms, s, m, h): %s", d) + } + return nil +} + +func (t *Tag) UnmarshalTOML(data interface{}) error { + d, ok := data.(map[string]interface{}) + if !ok { + return nil + } + + if n, ok := d["name"].(string); !ok || n == "" { + return errors.New("tag: missing or invalid value for name") + } else { + t.Name = n + } + + // infer source + + if _, ok := d["source"]; !ok { + return fmt.Errorf("missing source for tag %q", t.Name) + } + + switch v := d["source"].(type) { + case int64, string, float64, bool: + if src, err := decodeTagConstantSource(v); err != nil { + return err + } else { + t.Source = src + } + case []interface{}: + if src, err := decodeTagArraySource(v); err != nil { + return err + } else { + t.Source = src + } + case map[string]interface{}: + if src, err := decodeTagSource(v); err != nil { + return err + } else { + t.Source = src + } + default: + return fmt.Errorf("invalid source for tag %q: %T", t.Name, v) + } + + return nil +} + +func decodeTagConstantSource(data interface{}) (TagSource, error) { + switch data.(type) { + case int64, string, float64, bool: + if src, err := cast.ToStringE(data); err != nil { + return nil, err + } else { + return &TagArraySource{Values: []string{src}}, nil + } + } + + return nil, errors.New("invalid constant tag source") +} + +func decodeTagArraySource(data []interface{}) (TagSource, error) { + if len(data) == 0 { + return nil, errors.New("empty array source") + } + + if src, err := cast.ToStringSliceE(data); err != nil { + return nil, err + } else { + return &TagArraySource{Values: src}, nil + } +} + +func decodeTagSource(data map[string]interface{}) (TagSource, error) { + typ, ok := data["type"].(string) + if !ok { + return nil, errors.New("missing type field") + } + switch typ { + case "sequence": + return decodeTagSequenceSource(data) + case "file": + return decodeTagFileSource(data) + default: + return nil, fmt.Errorf("invalid type field %q", typ) + } +} + +func decodeTagFileSource(data map[string]interface{}) (TagSource, error) { + var s TagFileSource + + if v, ok := data["path"].(string); ok { + s.Path = v + } else { + return nil, errors.New("file: missing path") + } + + return &s, nil +} + +func decodeTagSequenceSource(data map[string]interface{}) (TagSource, error) { + var s TagSequenceSource + + if v, ok := data["format"].(string); ok { + // TODO(sgc): validate format string + s.Format = v + } else { + s.Format = "value%s" + } + + if v, ok := data["start"]; ok { + if v, err := cast.ToInt64E(v); err != nil { + return nil, fmt.Errorf("tag.sequence: invalid start, %v", err) + } else if v < 0 { + return nil, fmt.Errorf("tag.sequence: start must be ≥ 0") + } else { + s.Start = v + } + } + + if v, ok := data["count"]; ok { + if v, err := cast.ToInt64E(v); err != nil { + return nil, fmt.Errorf("tag.sequence: invalid count, %v", err) + } else if v < 0 { + return nil, fmt.Errorf("tag.sequence: count must be > 0") + } else { + s.Count = v + } + } else { + return nil, fmt.Errorf("tag.sequence: missing count") + } + + return &s, nil +} + +func (t *Field) UnmarshalTOML(data interface{}) error { + d, ok := data.(map[string]interface{}) + if !ok { + return nil + } + + if n, ok := d["name"].(string); !ok || n == "" { + return errors.New("field: missing or invalid value for name") + } else { + t.Name = n + } + + if n, ok := d["count"]; !ok { + return errors.New("field: missing value for count") + } else if count, err := cast.ToInt64E(n); err != nil { + return fmt.Errorf("field: invalid count, %v", err) + } else if count <= 0 { + return errors.New("field: count must be > 0") + } else { + t.Count = count + } + + if n, ok := d["time-precision"]; ok { + if err := t.TimePrecision.UnmarshalTOML(n); err != nil { + return err + } + } + + // infer source + if _, ok := d["source"]; !ok { + return fmt.Errorf("missing source for field %q", t.Name) + } + + switch v := d["source"].(type) { + case int64, string, float64, bool: + t.Source = &FieldConstantValue{v} + case []interface{}: + if src, err := decodeFieldArraySource(v); err != nil { + return err + } else { + t.Source = src + } + case map[string]interface{}: + if src, err := decodeFieldSource(v); err != nil { + return err + } else { + t.Source = src + } + default: + // unknown + return fmt.Errorf("invalid source for tag %q: %T", t.Name, v) + } + + return nil +} + +func decodeFieldArraySource(data []interface{}) (FieldSource, error) { + if len(data) == 0 { + return nil, errors.New("empty array") + } + + var ( + src interface{} + err error + ) + + // use first value to determine slice type + switch data[0].(type) { + case int64: + src, err = toInt64SliceE(data) + case float64: + src, err = toFloat64SliceE(data) + case string: + src, err = cast.ToStringSliceE(data) + case bool: + src, err = cast.ToBoolSliceE(data) + default: + err = fmt.Errorf("unsupported field source data type: %T", data[0]) + } + + if err != nil { + return nil, err + } + + return &FieldArraySource{Value: src}, nil +} + +func decodeFieldSource(data map[string]interface{}) (FieldSource, error) { + typ, ok := data["type"].(string) + if !ok { + return nil, errors.New("missing type field") + } + switch typ { + case "rand": + return decodeFloatRandomSource(data) + case "zipf": + return decodeIntegerZipfSource(data) + default: + return nil, fmt.Errorf("invalid type field %q", typ) + } +} + +func decodeFloatRandomSource(data map[string]interface{}) (FieldSource, error) { + var s FieldFloatRandomSource + + if v, ok := data["seed"]; ok { + if v, err := cast.ToInt64E(v); err != nil { + return nil, fmt.Errorf("rand: invalid seed, %v", err) + } else { + s.Seed = v + } + } + + if v, ok := data["min"]; ok { + if v, err := cast.ToFloat64E(v); err != nil { + return nil, fmt.Errorf("rand: invalid min, %v", err) + } else { + s.Min = v + } + } + + if v, ok := data["max"]; ok { + if v, err := cast.ToFloat64E(v); err != nil { + return nil, fmt.Errorf("rand: invalid max, %v", err) + } else { + s.Max = v + } + } else { + s.Max = 1.0 + } + + if !(s.Min <= s.Max) { + return nil, errors.New("rand: min ≤ max") + } + + return &s, nil +} + +func decodeIntegerZipfSource(data map[string]interface{}) (FieldSource, error) { + var s FieldIntegerZipfSource + + if v, ok := data["seed"]; ok { + if v, err := cast.ToInt64E(v); err != nil { + return nil, fmt.Errorf("zipf: invalid seed, %v", err) + } else { + s.Seed = v + } + } + + if v, ok := data["s"]; ok { + if v, err := cast.ToFloat64E(v); err != nil || v <= 1.0 { + return nil, fmt.Errorf("zipf: invalid value for s (s > 1), %v", err) + } else { + s.S = v + } + } else { + return nil, fmt.Errorf("zipf: missing value for s") + } + + if v, ok := data["v"]; ok { + if v, err := cast.ToFloat64E(v); err != nil || v < 1.0 { + return nil, fmt.Errorf("zipf: invalid value for v (v ≥ 1), %v", err) + } else { + s.V = v + } + } else { + return nil, fmt.Errorf("zipf: missing value for v") + } + + if v, ok := data["imax"]; ok { + if v, err := cast.ToUint64E(v); err != nil { + return nil, fmt.Errorf("zipf: invalid value for imax, %v", err) + } else { + s.IMAX = v + } + } else { + return nil, fmt.Errorf("zipf: missing value for imax") + } + + return &s, nil +} diff --git a/pkg/data/gen/toml_test.go b/pkg/data/gen/toml_test.go new file mode 100644 index 0000000000..63fe099635 --- /dev/null +++ b/pkg/data/gen/toml_test.go @@ -0,0 +1,164 @@ +package gen + +import ( + "fmt" + "strings" + "testing" + + "github.com/BurntSushi/toml" + "github.com/google/go-cmp/cmp" +) + +func visit(root *Schema) string { + w := &strings.Builder{} + + walkFn := func(node SchemaNode) bool { + switch n := node.(type) { + case *Schema: + + case Measurements: + fmt.Fprintln(w, "Measurements: ") + + case *Measurement: + fmt.Fprintln(w) + fmt.Fprintf(w, " Name: %s\n", n.Name) + + case Tags: + fmt.Fprintln(w, " Tags:") + + case Fields: + fmt.Fprintln(w, " Fields:") + + case *Field: + fmt.Fprintf(w, " %s: %s, count=%d, time-precision=%s\n", n.Name, n.Source, n.Count, n.TimePrecision) + + case *Tag: + fmt.Fprintf(w, " %s: %s\n", n.Name, n.Source) + + } + + return true + } + + WalkDown(VisitorFn(walkFn), root) + + return w.String() +} + +func TestSchema(t *testing.T) { + in := ` +title = "example schema" +series-limit = 10 + +[[measurements]] + name = "constant" + series-limit = 5 + + [[measurements.tags]] + name = "tag0" + source = [ "host1", "host2" ] + + [[measurements.tags]] + name = "tag1" + source = { type = "file", path = "foo.txt" } + + [[measurements.fields]] + name = "floatC" + count = 5000 + source = 0.5 + time-precision = "us" + + [[measurements.fields]] + name = "integerC" + count = 5000 + source = 3 + time-precision = "hour" + + [[measurements.fields]] + name = "stringC" + count = 5000 + source = "hello" + + [[measurements.fields]] + name = "stringA" + count = 5000 + source = ["hello", "world"] + + [[measurements.fields]] + name = "boolf" + count = 5000 + source = false + +[[measurements]] +name = "random" + + [[measurements.tags]] + name = "tagSeq" + source = { type = "sequence", format = "value%s", start = 0, count = 100 } + + [[measurements.fields]] + name = "floatR" + count = 5000 + source = { type = "rand", min = 0.5, max = 50.1, seed = 10 } + time-precision = "us" + +[[measurements]] +name = "array" + + [[measurements.tags]] + name = "tagSeq" + source = { type = "sequence", format = "value%s", start = 0, count = 100 } + + [[measurements.tags]] + name = "tagFile" + source = { type = "file", path = "foo.txt" } + + [[measurements.fields]] + name = "stringA" + count = 1000 + source = ["this", "that"] + time-precision = "us" + + [[measurements.fields]] + name = "integerA" + count = 1000 + source = [5, 6, 7] + time-precision = "us" +` + var out Schema + _, err := toml.Decode(in, &out) + if err != nil { + t.Fatalf("unxpected error: %v", err) + } + + exp := `Measurements: + + Name: constant + Tags: + tag0: array, source=[]string{"host1", "host2"} + tag1: file, path=foo.txt + Fields: + floatC: constant, source=0.5, count=5000, time-precision=Microsecond + integerC: constant, source=3, count=5000, time-precision=Hour + stringC: constant, source="hello", count=5000, time-precision=Millisecond + stringA: array, source=[]string{"hello", "world"}, count=5000, time-precision=Millisecond + boolf: constant, source=false, count=5000, time-precision=Millisecond + + Name: random + Tags: + tagSeq: sequence, prefix="value%s", range=[0,100) + Fields: + floatR: rand, seed=10, min=50.100000, max=50.100000, count=5000, time-precision=Microsecond + + Name: array + Tags: + tagSeq: sequence, prefix="value%s", range=[0,100) + tagFile: file, path=foo.txt + Fields: + stringA: array, source=[]string{"this", "that"}, count=1000, time-precision=Microsecond + integerA: array, source=[]int64{5, 6, 7}, count=1000, time-precision=Microsecond +` + if got := visit(&out); !cmp.Equal(got, exp) { + t.Errorf("unexpected value, -got/+exp\n%s", cmp.Diff(got, exp)) + } +} diff --git a/pkg/data/gen/types.tmpldata b/pkg/data/gen/types.tmpldata new file mode 100644 index 0000000000..82651de253 --- /dev/null +++ b/pkg/data/gen/types.tmpldata @@ -0,0 +1,30 @@ +[ + { + "Name":"Float", + "name":"float", + "Type":"float64", + "Rand":"Float64" + }, + { + "Name":"Integer", + "name":"integer", + "Type":"int64", + "Rand":"Int64" + }, + { + "Name":"Unsigned", + "name":"unsigned", + "Type":"uint64", + "Rand":"Uint64" + }, + { + "Name":"String", + "name":"string", + "Type":"string" + }, + { + "Name":"Boolean", + "name":"boolean", + "Type":"bool" + } +] diff --git a/pkg/data/gen/util.go b/pkg/data/gen/util.go new file mode 100644 index 0000000000..178d367c87 --- /dev/null +++ b/pkg/data/gen/util.go @@ -0,0 +1,113 @@ +package gen + +import ( + "fmt" + "reflect" + "sort" + + "github.com/spf13/cast" +) + +func min(a, b int) int { + if a < b { + return a + } + return b +} + +func sortDedupStrings(in []string) []string { + sort.Strings(in) + j := 0 + for i := 1; i < len(in); i++ { + if in[j] == in[i] { + continue + } + j++ + in[j] = in[i] + } + return in[:j+1] +} + +func sortDedupInts(in []int) []int { + sort.Ints(in) + j := 0 + for i := 1; i < len(in); i++ { + if in[j] == in[i] { + continue + } + j++ + in[j] = in[i] + } + return in[:j+1] +} + +func sortDedupFloats(in []float64) []float64 { + sort.Float64s(in) + j := 0 + for i := 1; i < len(in); i++ { + if in[j] == in[i] { + continue + } + j++ + in[j] = in[i] + } + return in[:j+1] +} + +// ToInt64SliceE casts an interface to a []int64 type. +func toInt64SliceE(i interface{}) ([]int64, error) { + if i == nil { + return []int64{}, fmt.Errorf("unable to cast %#v of type %T to []int64", i, i) + } + + switch v := i.(type) { + case []int64: + return v, nil + } + + kind := reflect.TypeOf(i).Kind() + switch kind { + case reflect.Slice, reflect.Array: + s := reflect.ValueOf(i) + a := make([]int64, s.Len()) + for j := 0; j < s.Len(); j++ { + val, err := cast.ToInt64E(s.Index(j).Interface()) + if err != nil { + return []int64{}, fmt.Errorf("unable to cast %#v of type %T to []int64", i, i) + } + a[j] = val + } + return a, nil + default: + return []int64{}, fmt.Errorf("unable to cast %#v of type %T to []int64", i, i) + } +} + +// ToFloat64SliceE casts an interface to a []float64 type. +func toFloat64SliceE(i interface{}) ([]float64, error) { + if i == nil { + return []float64{}, fmt.Errorf("unable to cast %#v of type %T to []float64", i, i) + } + + switch v := i.(type) { + case []float64: + return v, nil + } + + kind := reflect.TypeOf(i).Kind() + switch kind { + case reflect.Slice, reflect.Array: + s := reflect.ValueOf(i) + a := make([]float64, s.Len()) + for j := 0; j < s.Len(); j++ { + val, err := cast.ToFloat64E(s.Index(j).Interface()) + if err != nil { + return []float64{}, fmt.Errorf("unable to cast %#v of type %T to []float64", i, i) + } + a[j] = val + } + return a, nil + default: + return []float64{}, fmt.Errorf("unable to cast %#v of type %T to []float64", i, i) + } +} diff --git a/pkg/data/gen/values.gen.go b/pkg/data/gen/values.gen.go new file mode 100644 index 0000000000..4cc16b7647 --- /dev/null +++ b/pkg/data/gen/values.gen.go @@ -0,0 +1,252 @@ +// Generated by tmpl +// https://github.com/benbjohnson/tmpl +// +// DO NOT EDIT! +// Source: values.gen.go.tmpl + +package gen + +type floatConstantValuesSequence struct { + v float64 +} + +func NewFloatConstantValuesSequence(v float64) FloatValuesSequence { + return &floatConstantValuesSequence{ + v: v, + } +} + +func (g *floatConstantValuesSequence) Reset() { +} + +func (g *floatConstantValuesSequence) Write(vs []float64) { + for i := 0; i < len(vs); i++ { + vs[i] = g.v + } +} + +type integerConstantValuesSequence struct { + v int64 +} + +func NewIntegerConstantValuesSequence(v int64) IntegerValuesSequence { + return &integerConstantValuesSequence{ + v: v, + } +} + +func (g *integerConstantValuesSequence) Reset() { +} + +func (g *integerConstantValuesSequence) Write(vs []int64) { + for i := 0; i < len(vs); i++ { + vs[i] = g.v + } +} + +type unsignedConstantValuesSequence struct { + v uint64 +} + +func NewUnsignedConstantValuesSequence(v uint64) UnsignedValuesSequence { + return &unsignedConstantValuesSequence{ + v: v, + } +} + +func (g *unsignedConstantValuesSequence) Reset() { +} + +func (g *unsignedConstantValuesSequence) Write(vs []uint64) { + for i := 0; i < len(vs); i++ { + vs[i] = g.v + } +} + +type stringConstantValuesSequence struct { + v string +} + +func NewStringConstantValuesSequence(v string) StringValuesSequence { + return &stringConstantValuesSequence{ + v: v, + } +} + +func (g *stringConstantValuesSequence) Reset() { +} + +func (g *stringConstantValuesSequence) Write(vs []string) { + for i := 0; i < len(vs); i++ { + vs[i] = g.v + } +} + +type booleanConstantValuesSequence struct { + v bool +} + +func NewBooleanConstantValuesSequence(v bool) BooleanValuesSequence { + return &booleanConstantValuesSequence{ + v: v, + } +} + +func (g *booleanConstantValuesSequence) Reset() { +} + +func (g *booleanConstantValuesSequence) Write(vs []bool) { + for i := 0; i < len(vs); i++ { + vs[i] = g.v + } +} + +type floatArrayValuesSequence struct { + v []float64 + vi int +} + +func NewFloatArrayValuesSequence(v []float64) FloatValuesSequence { + return &floatArrayValuesSequence{ + v: v, + } +} + +func (g *floatArrayValuesSequence) Reset() { + g.vi = 0 +} + +func (g *floatArrayValuesSequence) Write(vs []float64) { + var ( + v = g.v + vi = g.vi + ) + for i := 0; i < len(vs); i++ { + if vi >= len(v) { + vi = 0 + } + vs[i] = v[vi] + vi += 1 + } + g.vi = vi +} + +type integerArrayValuesSequence struct { + v []int64 + vi int +} + +func NewIntegerArrayValuesSequence(v []int64) IntegerValuesSequence { + return &integerArrayValuesSequence{ + v: v, + } +} + +func (g *integerArrayValuesSequence) Reset() { + g.vi = 0 +} + +func (g *integerArrayValuesSequence) Write(vs []int64) { + var ( + v = g.v + vi = g.vi + ) + for i := 0; i < len(vs); i++ { + if vi >= len(v) { + vi = 0 + } + vs[i] = v[vi] + vi += 1 + } + g.vi = vi +} + +type unsignedArrayValuesSequence struct { + v []uint64 + vi int +} + +func NewUnsignedArrayValuesSequence(v []uint64) UnsignedValuesSequence { + return &unsignedArrayValuesSequence{ + v: v, + } +} + +func (g *unsignedArrayValuesSequence) Reset() { + g.vi = 0 +} + +func (g *unsignedArrayValuesSequence) Write(vs []uint64) { + var ( + v = g.v + vi = g.vi + ) + for i := 0; i < len(vs); i++ { + if vi >= len(v) { + vi = 0 + } + vs[i] = v[vi] + vi += 1 + } + g.vi = vi +} + +type stringArrayValuesSequence struct { + v []string + vi int +} + +func NewStringArrayValuesSequence(v []string) StringValuesSequence { + return &stringArrayValuesSequence{ + v: v, + } +} + +func (g *stringArrayValuesSequence) Reset() { + g.vi = 0 +} + +func (g *stringArrayValuesSequence) Write(vs []string) { + var ( + v = g.v + vi = g.vi + ) + for i := 0; i < len(vs); i++ { + if vi >= len(v) { + vi = 0 + } + vs[i] = v[vi] + vi += 1 + } + g.vi = vi +} + +type booleanArrayValuesSequence struct { + v []bool + vi int +} + +func NewBooleanArrayValuesSequence(v []bool) BooleanValuesSequence { + return &booleanArrayValuesSequence{ + v: v, + } +} + +func (g *booleanArrayValuesSequence) Reset() { + g.vi = 0 +} + +func (g *booleanArrayValuesSequence) Write(vs []bool) { + var ( + v = g.v + vi = g.vi + ) + for i := 0; i < len(vs); i++ { + if vi >= len(v) { + vi = 0 + } + vs[i] = v[vi] + vi += 1 + } + g.vi = vi +} diff --git a/pkg/data/gen/values.gen.go.tmpl b/pkg/data/gen/values.gen.go.tmpl new file mode 100644 index 0000000000..a76e5a09fe --- /dev/null +++ b/pkg/data/gen/values.gen.go.tmpl @@ -0,0 +1,54 @@ +package gen + +{{range .}} +type {{.name}}ConstantValuesSequence struct { + v {{.Type}} +} + +func New{{.Name}}ConstantValuesSequence(v {{.Type}}) {{.Name}}ValuesSequence { + return &{{.name}}ConstantValuesSequence{ + v: v, + } +} + +func (g *{{.name}}ConstantValuesSequence) Reset() { +} + +func (g *{{.name}}ConstantValuesSequence) Write(vs []{{.Type}}) { + for i := 0; i < len(vs); i++ { + vs[i] = g.v + } +} +{{end}} + +{{range .}} +type {{.name}}ArrayValuesSequence struct { + v []{{.Type}} + vi int +} + +func New{{.Name}}ArrayValuesSequence(v []{{.Type}}) {{.Name}}ValuesSequence { + return &{{.name}}ArrayValuesSequence{ + v: v, + } +} + +func (g *{{.name}}ArrayValuesSequence) Reset() { + g.vi = 0 +} + +func (g *{{.name}}ArrayValuesSequence) Write(vs []{{.Type}}) { + var ( + v = g.v + vi = g.vi + ) + for i := 0; i < len(vs); i++ { + if vi >= len(v) { + vi = 0 + } + vs[i] = v[vi] + vi += 1 + } + g.vi = vi +} +{{end}} diff --git a/pkg/data/gen/values.go b/pkg/data/gen/values.go new file mode 100644 index 0000000000..9dfc1cb809 --- /dev/null +++ b/pkg/data/gen/values.go @@ -0,0 +1,46 @@ +package gen + +import ( + "math/rand" +) + +type floatRandomValuesSequence struct { + r *rand.Rand + a float64 + b float64 +} + +func NewFloatRandomValuesSequence(min, max float64, r *rand.Rand) FloatValuesSequence { + return &floatRandomValuesSequence{r: r, a: max - min, b: min} +} + +func (g *floatRandomValuesSequence) Reset() {} + +func (g *floatRandomValuesSequence) Write(vs []float64) { + var ( + a = g.a + b = g.b + ) + for i := 0; i < len(vs); i++ { + vs[i] = a*g.r.Float64() + b // ax + b + } +} + +type integerRandomValuesSequence struct { + r *rand.Zipf +} + +// NewIntegerZipfValuesSequence produces int64 values using a Zipfian distribution +// described by s. +func NewIntegerZipfValuesSequence(s *FieldIntegerZipfSource) IntegerValuesSequence { + r := rand.New(rand.NewSource(s.Seed)) + return &integerRandomValuesSequence{r: rand.NewZipf(r, s.S, s.V, s.IMAX)} +} + +func (g *integerRandomValuesSequence) Reset() {} + +func (g *integerRandomValuesSequence) Write(vs []int64) { + for i := 0; i < len(vs); i++ { + vs[i] = int64(g.r.Uint64()) + } +} diff --git a/pkg/data/gen/values_sequence.gen.go b/pkg/data/gen/values_sequence.gen.go new file mode 100644 index 0000000000..c7ab00b545 --- /dev/null +++ b/pkg/data/gen/values_sequence.gen.go @@ -0,0 +1,251 @@ +// Generated by tmpl +// https://github.com/benbjohnson/tmpl +// +// DO NOT EDIT! +// Source: values_sequence.gen.go.tmpl + +package gen + +import ( + "github.com/influxdata/influxdb/tsdb" +) + +type FloatValuesSequence interface { + Reset() + Write(v []float64) +} + +type timeFloatValuesSequence struct { + vals floatArray + ts TimestampSequence + vs FloatValuesSequence + count int + n int +} + +func NewTimeFloatValuesSequence(count int, ts TimestampSequence, vs FloatValuesSequence) TimeValuesSequence { + return &timeFloatValuesSequence{ + vals: *newFloatArrayLen(tsdb.DefaultMaxPointsPerBlock), + ts: ts, + vs: vs, + count: count, + n: count, + } +} + +func (s *timeFloatValuesSequence) Reset() { + s.ts.Reset() + s.vs.Reset() + s.n = s.count +} + +func (s *timeFloatValuesSequence) Next() bool { + if s.n > 0 { + c := min(s.n, tsdb.DefaultMaxPointsPerBlock) + s.n -= c + s.vals.Timestamps = s.vals.Timestamps[:c] + s.vals.Values = s.vals.Values[:c] + + s.ts.Write(s.vals.Timestamps) + s.vs.Write(s.vals.Values) + return true + } + + return false +} + +func (s *timeFloatValuesSequence) Values() Values { + return &s.vals +} + +type IntegerValuesSequence interface { + Reset() + Write(v []int64) +} + +type timeIntegerValuesSequence struct { + vals integerArray + ts TimestampSequence + vs IntegerValuesSequence + count int + n int +} + +func NewTimeIntegerValuesSequence(count int, ts TimestampSequence, vs IntegerValuesSequence) TimeValuesSequence { + return &timeIntegerValuesSequence{ + vals: *newIntegerArrayLen(tsdb.DefaultMaxPointsPerBlock), + ts: ts, + vs: vs, + count: count, + n: count, + } +} + +func (s *timeIntegerValuesSequence) Reset() { + s.ts.Reset() + s.vs.Reset() + s.n = s.count +} + +func (s *timeIntegerValuesSequence) Next() bool { + if s.n > 0 { + c := min(s.n, tsdb.DefaultMaxPointsPerBlock) + s.n -= c + s.vals.Timestamps = s.vals.Timestamps[:c] + s.vals.Values = s.vals.Values[:c] + + s.ts.Write(s.vals.Timestamps) + s.vs.Write(s.vals.Values) + return true + } + + return false +} + +func (s *timeIntegerValuesSequence) Values() Values { + return &s.vals +} + +type UnsignedValuesSequence interface { + Reset() + Write(v []uint64) +} + +type timeUnsignedValuesSequence struct { + vals unsignedArray + ts TimestampSequence + vs UnsignedValuesSequence + count int + n int +} + +func NewTimeUnsignedValuesSequence(count int, ts TimestampSequence, vs UnsignedValuesSequence) TimeValuesSequence { + return &timeUnsignedValuesSequence{ + vals: *newUnsignedArrayLen(tsdb.DefaultMaxPointsPerBlock), + ts: ts, + vs: vs, + count: count, + n: count, + } +} + +func (s *timeUnsignedValuesSequence) Reset() { + s.ts.Reset() + s.vs.Reset() + s.n = s.count +} + +func (s *timeUnsignedValuesSequence) Next() bool { + if s.n > 0 { + c := min(s.n, tsdb.DefaultMaxPointsPerBlock) + s.n -= c + s.vals.Timestamps = s.vals.Timestamps[:c] + s.vals.Values = s.vals.Values[:c] + + s.ts.Write(s.vals.Timestamps) + s.vs.Write(s.vals.Values) + return true + } + + return false +} + +func (s *timeUnsignedValuesSequence) Values() Values { + return &s.vals +} + +type StringValuesSequence interface { + Reset() + Write(v []string) +} + +type timeStringValuesSequence struct { + vals stringArray + ts TimestampSequence + vs StringValuesSequence + count int + n int +} + +func NewTimeStringValuesSequence(count int, ts TimestampSequence, vs StringValuesSequence) TimeValuesSequence { + return &timeStringValuesSequence{ + vals: *newStringArrayLen(tsdb.DefaultMaxPointsPerBlock), + ts: ts, + vs: vs, + count: count, + n: count, + } +} + +func (s *timeStringValuesSequence) Reset() { + s.ts.Reset() + s.vs.Reset() + s.n = s.count +} + +func (s *timeStringValuesSequence) Next() bool { + if s.n > 0 { + c := min(s.n, tsdb.DefaultMaxPointsPerBlock) + s.n -= c + s.vals.Timestamps = s.vals.Timestamps[:c] + s.vals.Values = s.vals.Values[:c] + + s.ts.Write(s.vals.Timestamps) + s.vs.Write(s.vals.Values) + return true + } + + return false +} + +func (s *timeStringValuesSequence) Values() Values { + return &s.vals +} + +type BooleanValuesSequence interface { + Reset() + Write(v []bool) +} + +type timeBooleanValuesSequence struct { + vals booleanArray + ts TimestampSequence + vs BooleanValuesSequence + count int + n int +} + +func NewTimeBooleanValuesSequence(count int, ts TimestampSequence, vs BooleanValuesSequence) TimeValuesSequence { + return &timeBooleanValuesSequence{ + vals: *newBooleanArrayLen(tsdb.DefaultMaxPointsPerBlock), + ts: ts, + vs: vs, + count: count, + n: count, + } +} + +func (s *timeBooleanValuesSequence) Reset() { + s.ts.Reset() + s.vs.Reset() + s.n = s.count +} + +func (s *timeBooleanValuesSequence) Next() bool { + if s.n > 0 { + c := min(s.n, tsdb.DefaultMaxPointsPerBlock) + s.n -= c + s.vals.Timestamps = s.vals.Timestamps[:c] + s.vals.Values = s.vals.Values[:c] + + s.ts.Write(s.vals.Timestamps) + s.vs.Write(s.vals.Values) + return true + } + + return false +} + +func (s *timeBooleanValuesSequence) Values() Values { + return &s.vals +} diff --git a/pkg/data/gen/values_sequence.gen.go.tmpl b/pkg/data/gen/values_sequence.gen.go.tmpl new file mode 100644 index 0000000000..ff30bc8b33 --- /dev/null +++ b/pkg/data/gen/values_sequence.gen.go.tmpl @@ -0,0 +1,55 @@ +package gen + +import ( + "github.com/influxdata/influxdb/tsdb" +) + +{{range .}} +type {{.Name}}ValuesSequence interface { + Reset() + Write(v []{{.Type}}) +} + +type time{{.Name}}ValuesSequence struct { + vals {{.name}}Array + ts TimestampSequence + vs {{.Name}}ValuesSequence + count int + n int +} + +func NewTime{{.Name}}ValuesSequence(count int, ts TimestampSequence, vs {{.Name}}ValuesSequence) TimeValuesSequence { + return &time{{.Name}}ValuesSequence{ + vals: *new{{.Name}}ArrayLen(tsdb.DefaultMaxPointsPerBlock), + ts: ts, + vs: vs, + count: count, + n: count, + } +} + +func (s *time{{.Name}}ValuesSequence) Reset() { + s.ts.Reset() + s.vs.Reset() + s.n = s.count +} + +func (s *time{{.Name}}ValuesSequence) Next() bool { + if s.n > 0 { + c := min(s.n, tsdb.DefaultMaxPointsPerBlock) + s.n -= c + s.vals.Timestamps = s.vals.Timestamps[:c] + s.vals.Values = s.vals.Values[:c] + + s.ts.Write(s.vals.Timestamps) + s.vs.Write(s.vals.Values) + return true + } + + return false +} + +func (s *time{{.Name}}ValuesSequence) Values() Values { + return &s.vals +} +{{end}} \ No newline at end of file