feat(mock): Add SeriesGenerator to ResultSet transformation

In addition, adds a ResultSetToLineProtocol function, which
transforms a `ResultSet` into InfluxDB line protocol.
pull/16843/head
Stuart Carnie 2020-02-12 14:10:41 -07:00
parent a5f508de77
commit f1990cc92c
No known key found for this signature in database
GPG Key ID: 848D9C9718D78B4F
3 changed files with 382 additions and 0 deletions

167
mock/reads_resultset.go Normal file
View File

@ -0,0 +1,167 @@
package mock
import (
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/pkg/data/gen"
"github.com/influxdata/influxdb/storage/reads"
"github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/influxdb/tsdb/cursors"
)
type GeneratorResultSet struct {
sg gen.SeriesGenerator
f floatTimeValuesGeneratorCursor
i integerTimeValuesGeneratorCursor
u unsignedTimeValuesGeneratorCursor
s stringTimeValuesGeneratorCursor
b booleanTimeValuesGeneratorCursor
cur cursors.Cursor
}
var _ reads.ResultSet = (*GeneratorResultSet)(nil)
// NewResultSetFromSeriesGenerator transforms a SeriesGenerator into a ResultSet,
// which is useful for mocking data when a client requires a ResultSet.
func NewResultSetFromSeriesGenerator(sg gen.SeriesGenerator) *GeneratorResultSet {
return &GeneratorResultSet{sg: sg}
}
func (g *GeneratorResultSet) Next() bool {
return g.sg.Next()
}
func (g *GeneratorResultSet) Cursor() cursors.Cursor {
switch g.sg.FieldType() {
case models.Float:
g.f.tv = g.sg.TimeValuesGenerator()
g.cur = &g.f
case models.Integer:
g.i.tv = g.sg.TimeValuesGenerator()
g.cur = &g.i
case models.Unsigned:
g.u.tv = g.sg.TimeValuesGenerator()
g.cur = &g.u
case models.String:
g.s.tv = g.sg.TimeValuesGenerator()
g.cur = &g.s
case models.Boolean:
g.b.tv = g.sg.TimeValuesGenerator()
g.cur = &g.b
default:
panic("unreachable")
}
return g.cur
}
func (g *GeneratorResultSet) Tags() models.Tags { return g.sg.Tags() }
func (g *GeneratorResultSet) Close() {}
func (g *GeneratorResultSet) Err() error { return nil }
func (g *GeneratorResultSet) Stats() cursors.CursorStats {
var stats cursors.CursorStats
stats.Add(g.f.Stats())
stats.Add(g.i.Stats())
stats.Add(g.u.Stats())
stats.Add(g.s.Stats())
stats.Add(g.b.Stats())
return stats
}
// cursors
type timeValuesGeneratorCursor struct {
tv gen.TimeValuesSequence
stats cursors.CursorStats
}
func (t timeValuesGeneratorCursor) Close() {}
func (t timeValuesGeneratorCursor) Err() error { return nil }
func (t timeValuesGeneratorCursor) Stats() cursors.CursorStats { return t.stats }
type floatTimeValuesGeneratorCursor struct {
timeValuesGeneratorCursor
a tsdb.FloatArray
}
func (c *floatTimeValuesGeneratorCursor) Next() *cursors.FloatArray {
if c.tv.Next() {
c.tv.Values().(gen.FloatValues).Copy(&c.a)
c.stats.ScannedBytes += len(c.a.Values) * 8
c.stats.ScannedValues += c.a.Len()
} else {
c.a.Timestamps = c.a.Timestamps[:0]
c.a.Values = c.a.Values[:0]
}
return &c.a
}
type integerTimeValuesGeneratorCursor struct {
timeValuesGeneratorCursor
a tsdb.IntegerArray
}
func (c *integerTimeValuesGeneratorCursor) Next() *cursors.IntegerArray {
if c.tv.Next() {
c.tv.Values().(gen.IntegerValues).Copy(&c.a)
c.stats.ScannedBytes += len(c.a.Values) * 8
c.stats.ScannedValues += c.a.Len()
} else {
c.a.Timestamps = c.a.Timestamps[:0]
c.a.Values = c.a.Values[:0]
}
return &c.a
}
type unsignedTimeValuesGeneratorCursor struct {
timeValuesGeneratorCursor
a tsdb.UnsignedArray
}
func (c *unsignedTimeValuesGeneratorCursor) Next() *cursors.UnsignedArray {
if c.tv.Next() {
c.tv.Values().(gen.UnsignedValues).Copy(&c.a)
c.stats.ScannedBytes += len(c.a.Values) * 8
c.stats.ScannedValues += c.a.Len()
} else {
c.a.Timestamps = c.a.Timestamps[:0]
c.a.Values = c.a.Values[:0]
}
return &c.a
}
type stringTimeValuesGeneratorCursor struct {
timeValuesGeneratorCursor
a tsdb.StringArray
}
func (c *stringTimeValuesGeneratorCursor) Next() *cursors.StringArray {
if c.tv.Next() {
c.tv.Values().(gen.StringValues).Copy(&c.a)
for _, v := range c.a.Values {
c.stats.ScannedBytes += len(v)
}
c.stats.ScannedValues += c.a.Len()
} else {
c.a.Timestamps = c.a.Timestamps[:0]
c.a.Values = c.a.Values[:0]
}
return &c.a
}
type booleanTimeValuesGeneratorCursor struct {
timeValuesGeneratorCursor
a tsdb.BooleanArray
}
func (c *booleanTimeValuesGeneratorCursor) Next() *cursors.BooleanArray {
if c.tv.Next() {
c.tv.Values().(gen.BooleanValues).Copy(&c.a)
c.stats.ScannedBytes += len(c.a.Values)
c.stats.ScannedValues += c.a.Len()
} else {
c.a.Timestamps = c.a.Timestamps[:0]
c.a.Values = c.a.Values[:0]
}
return &c.a
}

View File

@ -0,0 +1,86 @@
package mock_test
import (
"strings"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/influxdata/influxdb/mock"
"github.com/influxdata/influxdb/pkg/data/gen"
"github.com/influxdata/influxdb/storage/reads"
"github.com/influxdata/influxdb/tsdb/cursors"
)
func mustNewSpecFromToml(tb testing.TB, toml string) *gen.Spec {
tb.Helper()
spec, err := gen.NewSpecFromToml(toml)
if err != nil {
panic(err)
}
return spec
}
func TestNewResultSetFromSeriesGenerator(t *testing.T) {
checkResult := func(t *testing.T, sg gen.SeriesGenerator, expData string, expStats cursors.CursorStats) {
t.Helper()
rs := mock.NewResultSetFromSeriesGenerator(sg)
var sb strings.Builder
err := reads.ResultSetToLineProtocol(&sb, rs)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if got, exp := sb.String(), expData; !cmp.Equal(got, exp) {
t.Errorf("unexpected value -got/+exp\n%s", cmp.Diff(got, exp))
}
if got, exp := rs.Stats(), expStats; !cmp.Equal(got, exp) {
t.Errorf("unexpected value -got/+exp\n%s", cmp.Diff(got, exp))
}
}
t.Run("float", func(t *testing.T) {
spec := mustNewSpecFromToml(t, `
[[measurements]]
name = "m0"
sample = 1.0
tags = [
{ name = "tag0", source = { type = "sequence", start = 0, count = 3 } },
{ name = "tag1", source = { type = "sequence", start = 0, count = 2 } },
]
fields = [
{ name = "v0", count = 3, source = 1.0 },
]`)
sg := gen.NewSeriesGeneratorFromSpec(spec, gen.TimeRange{
Start: time.Unix(1000, 0),
End: time.Unix(2000, 0),
})
const expData = `m0,tag0=value0,tag1=value0 v0=1 1000000000000
m0,tag0=value0,tag1=value0 v0=1 1333333000000
m0,tag0=value0,tag1=value0 v0=1 1666666000000
m0,tag0=value0,tag1=value1 v0=1 1000000000000
m0,tag0=value0,tag1=value1 v0=1 1333333000000
m0,tag0=value0,tag1=value1 v0=1 1666666000000
m0,tag0=value1,tag1=value0 v0=1 1000000000000
m0,tag0=value1,tag1=value0 v0=1 1333333000000
m0,tag0=value1,tag1=value0 v0=1 1666666000000
m0,tag0=value1,tag1=value1 v0=1 1000000000000
m0,tag0=value1,tag1=value1 v0=1 1333333000000
m0,tag0=value1,tag1=value1 v0=1 1666666000000
m0,tag0=value2,tag1=value0 v0=1 1000000000000
m0,tag0=value2,tag1=value0 v0=1 1333333000000
m0,tag0=value2,tag1=value0 v0=1 1666666000000
m0,tag0=value2,tag1=value1 v0=1 1000000000000
m0,tag0=value2,tag1=value1 v0=1 1333333000000
m0,tag0=value2,tag1=value1 v0=1 1666666000000
`
expStats := cursors.CursorStats{ScannedValues: 18, ScannedBytes: 18 * 8}
checkResult(t, sg, expData, expStats)
})
}

View File

@ -0,0 +1,129 @@
package reads
import (
"errors"
"io"
"strconv"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/tsdb/cursors"
)
// ResultSetToLineProtocol transforms rs to line protocol and writes the
// output to wr.
func ResultSetToLineProtocol(wr io.Writer, rs ResultSet) (err error) {
defer rs.Close()
line := make([]byte, 0, 4096)
for rs.Next() {
tags := rs.Tags()
name := tags.Get(models.MeasurementTagKeyBytes)
field := tags.Get(models.FieldKeyTagKeyBytes)
if len(name) == 0 || len(field) == 0 {
return errors.New("missing measurement / field")
}
line = append(line[:0], name...)
if tags.Len() > 2 {
tags = tags[1 : len(tags)-1] // take first and last elements which are measurement and field keys
line = tags.AppendHashKey(line)
}
line = append(line, ' ')
line = append(line, field...)
line = append(line, '=')
err = cursorToLineProtocol(wr, line, rs.Cursor())
if err != nil {
return err
}
}
return rs.Err()
}
func cursorToLineProtocol(wr io.Writer, line []byte, cur cursors.Cursor) error {
var newLine = []byte{'\n'}
switch ccur := cur.(type) {
case cursors.IntegerArrayCursor:
for {
a := ccur.Next()
if a.Len() > 0 {
for i := range a.Timestamps {
buf := strconv.AppendInt(line, a.Values[i], 10)
buf = append(buf, 'i', ' ')
buf = strconv.AppendInt(buf, a.Timestamps[i], 10)
wr.Write(buf)
wr.Write(newLine)
}
} else {
break
}
}
case cursors.FloatArrayCursor:
for {
a := ccur.Next()
if a.Len() > 0 {
for i := range a.Timestamps {
buf := strconv.AppendFloat(line, a.Values[i], 'f', -1, 64)
buf = append(buf, ' ')
buf = strconv.AppendInt(buf, a.Timestamps[i], 10)
wr.Write(buf)
wr.Write(newLine)
}
} else {
break
}
}
case cursors.UnsignedArrayCursor:
for {
a := ccur.Next()
if a.Len() > 0 {
for i := range a.Timestamps {
buf := strconv.AppendUint(line, a.Values[i], 10)
buf = append(buf, 'u', ' ')
buf = strconv.AppendInt(buf, a.Timestamps[i], 10)
wr.Write(buf)
wr.Write(newLine)
}
} else {
break
}
}
case cursors.BooleanArrayCursor:
for {
a := ccur.Next()
if a.Len() > 0 {
for i := range a.Timestamps {
buf := strconv.AppendBool(line, a.Values[i])
buf = append(buf, ' ')
buf = strconv.AppendInt(buf, a.Timestamps[i], 10)
wr.Write(buf)
wr.Write(newLine)
}
} else {
break
}
}
case cursors.StringArrayCursor:
for {
a := ccur.Next()
if a.Len() > 0 {
for i := range a.Timestamps {
buf := strconv.AppendQuote(line, a.Values[i])
buf = append(buf, 'i', ' ')
buf = strconv.AppendInt(buf, a.Timestamps[i], 10)
wr.Write(buf)
wr.Write(newLine)
}
} else {
break
}
}
default:
panic("unreachable")
}
cur.Close()
return cur.Err()
}