influxdb/pkg/data/gen/specs.go

585 lines
13 KiB
Go

package gen
import (
"bufio"
"fmt"
"math/rand"
"os"
"path"
"path/filepath"
"sort"
"unicode/utf8"
"github.com/BurntSushi/toml"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/tsdb"
"github.com/pkg/errors"
)
type Spec struct {
OrgID influxdb.ID
BucketID influxdb.ID
SeriesLimit *int64
Measurements []MeasurementSpec
}
type idType [influxdb.IDLength]byte
func NewSeriesGeneratorFromSpec(s *Spec, tr TimeRange) SeriesGenerator {
id := tsdb.EncodeName(s.OrgID, s.BucketID)
sg := make([]SeriesGenerator, len(s.Measurements))
for i := range s.Measurements {
sg[i] = newSeriesGeneratorFromMeasurementSpec(id, &s.Measurements[i], tr)
}
if s.SeriesLimit == nil {
return NewMergedSeriesGenerator(sg)
}
return NewMergedSeriesGeneratorLimit(sg, *s.SeriesLimit)
}
type MeasurementSpec struct {
Name string
SeriesLimit *SeriesLimit
TagsSpec *TagsSpec
FieldValuesSpec *FieldValuesSpec
}
func newSeriesGeneratorFromMeasurementSpec(id idType, ms *MeasurementSpec, tr TimeRange) SeriesGenerator {
if ms.SeriesLimit == nil {
return NewSeriesGenerator(
id,
[]byte(ms.FieldValuesSpec.Name),
newTimeValuesSequenceFromFieldValuesSpec(ms.FieldValuesSpec, tr),
newTagsSequenceFromTagsSpec(ms.Name, ms.FieldValuesSpec.Name, ms.TagsSpec))
}
return NewSeriesGeneratorLimit(
id,
[]byte(ms.FieldValuesSpec.Name),
newTimeValuesSequenceFromFieldValuesSpec(ms.FieldValuesSpec, tr),
newTagsSequenceFromTagsSpec(ms.Name, ms.FieldValuesSpec.Name, ms.TagsSpec),
int64(*ms.SeriesLimit))
}
// NewTimeValuesSequenceFn returns a TimeValuesSequence that will generate a
// sequence of values based on the spec.
type NewTimeValuesSequenceFn func(spec TimeSequenceSpec) TimeValuesSequence
type NewTagsValuesSequenceFn func() TagsSequence
type NewCountableSequenceFn func() CountableSequence
type TagsSpec struct {
Tags []*TagValuesSpec
Sample *sample
}
func newTagsSequenceFromTagsSpec(m, f string, ts *TagsSpec) TagsSequence {
var keys []string
var vals []CountableSequence
keys = append(keys, models.MeasurementTagKey)
vals = append(vals, NewStringConstantSequence(m))
for _, spec := range ts.Tags {
keys = append(keys, spec.TagKey)
vals = append(vals, spec.Values())
}
keys = append(keys, models.FieldKeyTagKey)
vals = append(vals, NewStringConstantSequence(f))
var opts []tagsValuesOption
if ts.Sample != nil && *ts.Sample != 1.0 {
opts = append(opts, TagValuesSampleOption(float64(*ts.Sample)))
}
return NewTagsValuesSequenceKeysValues(keys, vals, opts...)
}
type TagValuesSpec struct {
TagKey string
Values NewCountableSequenceFn
}
type FieldValuesSpec struct {
TimeSequenceSpec
Name string
DataType models.FieldType
Values NewTimeValuesSequenceFn
}
func newTimeValuesSequenceFromFieldValuesSpec(fs *FieldValuesSpec, tr TimeRange) TimeValuesSequence {
return fs.Values(fs.TimeSequenceSpec.ForTimeRange(tr))
}
func NewSpecFromToml(s string) (*Spec, error) {
var out Schema
if _, err := toml.Decode(s, &out); err != nil {
return nil, err
}
return NewSpecFromSchema(&out)
}
func NewSpecFromPath(p string) (*Spec, error) {
var err error
p, err = filepath.Abs(p)
if err != nil {
return nil, err
}
var out Schema
if _, err := toml.DecodeFile(p, &out); err != nil {
return nil, err
}
return newSpecFromSchema(&out, schemaDir(path.Dir(p)))
}
func NewSchemaFromPath(path string) (*Schema, error) {
var out Schema
if _, err := toml.DecodeFile(path, &out); err != nil {
return nil, err
}
return &out, nil
}
type schemaToSpecState int
const (
stateOk schemaToSpecState = iota
stateErr
)
type schemaToSpec struct {
schemaDir string
stack []interface{}
state schemaToSpecState
spec *Spec
err error
}
func (s *schemaToSpec) push(v interface{}) {
s.stack = append(s.stack, v)
}
func (s *schemaToSpec) pop() interface{} {
tail := len(s.stack) - 1
v := s.stack[tail]
s.stack[tail] = nil
s.stack = s.stack[:tail]
return v
}
func (s *schemaToSpec) peek() interface{} {
if len(s.stack) == 0 {
return nil
}
return s.stack[len(s.stack)-1]
}
func (s *schemaToSpec) Visit(node SchemaNode) (w Visitor) {
switch s.state {
case stateOk:
if s.visit(node) {
return s
}
s.state = stateErr
case stateErr:
s.visitErr(node)
}
return nil
}
func (s *schemaToSpec) visit(node SchemaNode) bool {
switch n := node.(type) {
case *Schema:
s.spec.Measurements = s.pop().([]MeasurementSpec)
if n.SeriesLimit != nil {
sl := int64(*n.SeriesLimit)
s.spec.SeriesLimit = &sl
}
case Measurements:
// flatten measurements
var mss []MeasurementSpec
for {
if specs, ok := s.peek().([]MeasurementSpec); ok {
s.pop()
mss = append(mss, specs...)
continue
}
break
}
sort.Slice(mss, func(i, j int) bool {
return mss[i].Name < mss[j].Name
})
// validate field types are homogeneous for a single measurement
mg := make(map[string]models.FieldType)
for i := range mss {
spec := &mss[i]
key := spec.Name + "." + spec.FieldValuesSpec.Name
ft := spec.FieldValuesSpec.DataType
if dt, ok := mg[key]; !ok {
mg[key] = ft
} else if dt != ft {
s.err = fmt.Errorf("field %q data-type conflict, found %s and %s",
key,
dt,
ft)
return false
}
}
s.push(mss)
case *Measurement:
if len(n.Name) == 0 {
s.err = errors.New("missing measurement name")
return false
}
fields := s.pop().([]*FieldValuesSpec)
tagsSpec := s.pop().(*TagsSpec)
tagsSpec.Sample = n.Sample
// default: sample 50%
if n.Sample == nil {
s := sample(0.5)
tagsSpec.Sample = &s
}
if *tagsSpec.Sample <= 0.0 || *tagsSpec.Sample > 1.0 {
s.err = errors.New("invalid sample, must be 0 < sample ≤ 1.0")
return false
}
var ms []MeasurementSpec
for _, spec := range fields {
ms = append(ms, MeasurementSpec{
Name: n.Name,
SeriesLimit: n.SeriesLimit,
TagsSpec: tagsSpec,
FieldValuesSpec: spec,
})
}
// NOTE: sort each measurement name + field name to ensure series are produced
// in correct order
sort.Slice(ms, func(i, j int) bool {
return ms[i].FieldValuesSpec.Name < ms[j].FieldValuesSpec.Name
})
s.push(ms)
case Tags:
var ts TagsSpec
for {
if spec, ok := s.peek().(*TagValuesSpec); ok {
s.pop()
ts.Tags = append(ts.Tags, spec)
continue
}
break
}
// Tag keys must be sorted to produce a valid series key sequence
sort.Slice(ts.Tags, func(i, j int) bool {
return ts.Tags[i].TagKey < ts.Tags[j].TagKey
})
for i := 1; i < len(ts.Tags); i++ {
if ts.Tags[i-1].TagKey == ts.Tags[i].TagKey {
s.err = fmt.Errorf("duplicate tag keys %q", ts.Tags[i].TagKey)
return false
}
}
s.push(&ts)
case Fields:
// combine fields
var fs []*FieldValuesSpec
for {
if spec, ok := s.peek().(*FieldValuesSpec); ok {
s.pop()
fs = append(fs, spec)
continue
}
break
}
sort.Slice(fs, func(i, j int) bool {
return fs[i].Name < fs[j].Name
})
for i := 1; i < len(fs); i++ {
if fs[i-1].Name == fs[i].Name {
s.err = fmt.Errorf("duplicate field names %q", fs[i].Name)
return false
}
}
s.push(fs)
case *Field:
fs, ok := s.peek().(*FieldValuesSpec)
if !ok {
panic(fmt.Sprintf("unexpected type %T", fs))
}
fs.TimeSequenceSpec = n.TimeSequenceSpec()
fs.Name = n.Name
case *FieldConstantValue:
var fs FieldValuesSpec
switch v := n.Value.(type) {
case float64:
fs.DataType = models.Float
fs.Values = func(spec TimeSequenceSpec) TimeValuesSequence {
return NewTimeFloatValuesSequence(
spec.Count,
NewTimestampSequenceFromSpec(spec),
NewFloatConstantValuesSequence(v),
)
}
case int64:
fs.DataType = models.Integer
fs.Values = func(spec TimeSequenceSpec) TimeValuesSequence {
return NewTimeIntegerValuesSequence(
spec.Count,
NewTimestampSequenceFromSpec(spec),
NewIntegerConstantValuesSequence(v),
)
}
case string:
fs.DataType = models.String
fs.Values = func(spec TimeSequenceSpec) TimeValuesSequence {
return NewTimeStringValuesSequence(
spec.Count,
NewTimestampSequenceFromSpec(spec),
NewStringConstantValuesSequence(v),
)
}
case bool:
fs.DataType = models.Boolean
fs.Values = func(spec TimeSequenceSpec) TimeValuesSequence {
return NewTimeBooleanValuesSequence(
spec.Count,
NewTimestampSequenceFromSpec(spec),
NewBooleanConstantValuesSequence(v),
)
}
default:
panic(fmt.Sprintf("unexpected type %T", v))
}
s.push(&fs)
case *FieldArraySource:
var fs FieldValuesSpec
switch v := n.Value.(type) {
case []float64:
fs.DataType = models.Float
fs.Values = func(spec TimeSequenceSpec) TimeValuesSequence {
return NewTimeFloatValuesSequence(
spec.Count,
NewTimestampSequenceFromSpec(spec),
NewFloatArrayValuesSequence(v),
)
}
case []int64:
fs.DataType = models.Integer
fs.Values = func(spec TimeSequenceSpec) TimeValuesSequence {
return NewTimeIntegerValuesSequence(
spec.Count,
NewTimestampSequenceFromSpec(spec),
NewIntegerArrayValuesSequence(v),
)
}
case []string:
fs.DataType = models.String
fs.Values = func(spec TimeSequenceSpec) TimeValuesSequence {
return NewTimeStringValuesSequence(
spec.Count,
NewTimestampSequenceFromSpec(spec),
NewStringArrayValuesSequence(v),
)
}
case []bool:
fs.DataType = models.Boolean
fs.Values = func(spec TimeSequenceSpec) TimeValuesSequence {
return NewTimeBooleanValuesSequence(
spec.Count,
NewTimestampSequenceFromSpec(spec),
NewBooleanArrayValuesSequence(v),
)
}
default:
panic(fmt.Sprintf("unexpected type %T", v))
}
s.push(&fs)
case *FieldFloatRandomSource:
var fs FieldValuesSpec
fs.DataType = models.Float
fs.Values = NewTimeValuesSequenceFn(func(spec TimeSequenceSpec) TimeValuesSequence {
return NewTimeFloatValuesSequence(
spec.Count,
NewTimestampSequenceFromSpec(spec),
NewFloatRandomValuesSequence(n.Min, n.Max, rand.New(rand.NewSource(n.Seed))),
)
})
s.push(&fs)
case *FieldIntegerZipfSource:
var fs FieldValuesSpec
fs.DataType = models.Integer
fs.Values = NewTimeValuesSequenceFn(func(spec TimeSequenceSpec) TimeValuesSequence {
return NewTimeIntegerValuesSequence(
spec.Count,
NewTimestampSequenceFromSpec(spec),
NewIntegerZipfValuesSequence(n),
)
})
s.push(&fs)
case *Tag:
s.push(&TagValuesSpec{
TagKey: n.Name,
Values: s.pop().(NewCountableSequenceFn),
})
case *TagSequenceSource:
s.push(NewCountableSequenceFn(func() CountableSequence {
return NewCounterByteSequence(n.Format, int(n.Start), int(n.Start+n.Count))
}))
case *TagFileSource:
p, err := s.resolvePath(n.Path)
if err != nil {
s.err = err
return false
}
lines, err := s.readLines(p)
if err != nil {
s.err = err
return false
}
s.push(NewCountableSequenceFn(func() CountableSequence {
return NewStringArraySequence(lines)
}))
case *TagArraySource:
s.push(NewCountableSequenceFn(func() CountableSequence {
return NewStringArraySequence(n.Values)
}))
case nil:
default:
panic(fmt.Sprintf("unexpected type %T", node))
}
return true
}
func (s *schemaToSpec) visitErr(node SchemaNode) {
switch n := node.(type) {
case *Schema:
s.err = fmt.Errorf("error processing schema: %v", s.err)
case *Measurement:
s.err = fmt.Errorf("measurement %q: %v", n.Name, s.err)
case *Tag:
s.err = fmt.Errorf("tag %q: %v", n.Name, s.err)
case *Field:
s.err = fmt.Errorf("field %q: %v", n.Name, s.err)
}
}
func (s *schemaToSpec) resolvePath(p string) (string, error) {
fullPath := os.ExpandEnv(p)
if !filepath.IsAbs(fullPath) {
fullPath = filepath.Join(s.schemaDir, fullPath)
}
fi, err := os.Stat(fullPath)
if err != nil {
return "", fmt.Errorf("error resolving path %q: %v", p, err)
}
if fi.IsDir() {
return "", fmt.Errorf("path %q is not a file: resolved to %s", p, fullPath)
}
return fullPath, nil
}
func (s *schemaToSpec) readLines(p string) ([]string, error) {
fp, err := s.resolvePath(p)
if err != nil {
return nil, err
}
f, err := os.Open(fp)
if err != nil {
return nil, fmt.Errorf("path error: %v", err)
}
defer f.Close()
scan := bufio.NewScanner(f)
scan.Split(bufio.ScanLines)
n := 0
var lines []string
for scan.Scan() {
if len(scan.Bytes()) == 0 {
// skip empty lines
continue
}
if !utf8.Valid(scan.Bytes()) {
return nil, fmt.Errorf("path %q, invalid UTF-8 on line %d", p, n)
}
lines = append(lines, scan.Text())
}
if scan.Err() != nil {
return nil, scan.Err()
}
return lines, nil
}
type option func(s *schemaToSpec)
func schemaDir(p string) option {
return func(s *schemaToSpec) {
s.schemaDir = p
}
}
func NewSpecFromSchema(root *Schema) (*Spec, error) {
return newSpecFromSchema(root)
}
func newSpecFromSchema(root *Schema, opts ...option) (*Spec, error) {
var spec Spec
vis := &schemaToSpec{spec: &spec}
for _, o := range opts {
o(vis)
}
WalkUp(vis, root)
if vis.err != nil {
return nil, vis.err
}
return &spec, nil
}