From f1990cc92c077353949973a9c4ae25c55fea914e Mon Sep 17 00:00:00 2001 From: Stuart Carnie Date: Wed, 12 Feb 2020 14:10:41 -0700 Subject: [PATCH] feat(mock): Add SeriesGenerator to ResultSet transformation In addition, adds a ResultSetToLineProtocol function, which transforms a `ResultSet` into InfluxDB line protocol. --- mock/reads_resultset.go | 167 ++++++++++++++++++++++++ mock/reads_resultset_test.go | 86 ++++++++++++ storage/reads/resultset_lineprotocol.go | 129 ++++++++++++++++++ 3 files changed, 382 insertions(+) create mode 100644 mock/reads_resultset.go create mode 100644 mock/reads_resultset_test.go create mode 100644 storage/reads/resultset_lineprotocol.go diff --git a/mock/reads_resultset.go b/mock/reads_resultset.go new file mode 100644 index 0000000000..572eedcb61 --- /dev/null +++ b/mock/reads_resultset.go @@ -0,0 +1,167 @@ +package mock + +import ( + "github.com/influxdata/influxdb/models" + "github.com/influxdata/influxdb/pkg/data/gen" + "github.com/influxdata/influxdb/storage/reads" + "github.com/influxdata/influxdb/tsdb" + "github.com/influxdata/influxdb/tsdb/cursors" +) + +type GeneratorResultSet struct { + sg gen.SeriesGenerator + f floatTimeValuesGeneratorCursor + i integerTimeValuesGeneratorCursor + u unsignedTimeValuesGeneratorCursor + s stringTimeValuesGeneratorCursor + b booleanTimeValuesGeneratorCursor + cur cursors.Cursor +} + +var _ reads.ResultSet = (*GeneratorResultSet)(nil) + +// NewResultSetFromSeriesGenerator transforms a SeriesGenerator into a ResultSet, +// which is useful for mocking data when a client requires a ResultSet. +func NewResultSetFromSeriesGenerator(sg gen.SeriesGenerator) *GeneratorResultSet { + return &GeneratorResultSet{sg: sg} +} + +func (g *GeneratorResultSet) Next() bool { + return g.sg.Next() +} + +func (g *GeneratorResultSet) Cursor() cursors.Cursor { + switch g.sg.FieldType() { + case models.Float: + g.f.tv = g.sg.TimeValuesGenerator() + g.cur = &g.f + case models.Integer: + g.i.tv = g.sg.TimeValuesGenerator() + g.cur = &g.i + case models.Unsigned: + g.u.tv = g.sg.TimeValuesGenerator() + g.cur = &g.u + case models.String: + g.s.tv = g.sg.TimeValuesGenerator() + g.cur = &g.s + case models.Boolean: + g.b.tv = g.sg.TimeValuesGenerator() + g.cur = &g.b + default: + panic("unreachable") + } + + return g.cur +} + +func (g *GeneratorResultSet) Tags() models.Tags { return g.sg.Tags() } +func (g *GeneratorResultSet) Close() {} +func (g *GeneratorResultSet) Err() error { return nil } + +func (g *GeneratorResultSet) Stats() cursors.CursorStats { + var stats cursors.CursorStats + stats.Add(g.f.Stats()) + stats.Add(g.i.Stats()) + stats.Add(g.u.Stats()) + stats.Add(g.s.Stats()) + stats.Add(g.b.Stats()) + return stats +} + +// cursors + +type timeValuesGeneratorCursor struct { + tv gen.TimeValuesSequence + stats cursors.CursorStats +} + +func (t timeValuesGeneratorCursor) Close() {} +func (t timeValuesGeneratorCursor) Err() error { return nil } +func (t timeValuesGeneratorCursor) Stats() cursors.CursorStats { return t.stats } + +type floatTimeValuesGeneratorCursor struct { + timeValuesGeneratorCursor + a tsdb.FloatArray +} + +func (c *floatTimeValuesGeneratorCursor) Next() *cursors.FloatArray { + if c.tv.Next() { + c.tv.Values().(gen.FloatValues).Copy(&c.a) + c.stats.ScannedBytes += len(c.a.Values) * 8 + c.stats.ScannedValues += c.a.Len() + } else { + c.a.Timestamps = c.a.Timestamps[:0] + c.a.Values = c.a.Values[:0] + } + return &c.a +} + +type integerTimeValuesGeneratorCursor struct { + timeValuesGeneratorCursor + a tsdb.IntegerArray +} + +func (c *integerTimeValuesGeneratorCursor) Next() *cursors.IntegerArray { + if c.tv.Next() { + c.tv.Values().(gen.IntegerValues).Copy(&c.a) + c.stats.ScannedBytes += len(c.a.Values) * 8 + c.stats.ScannedValues += c.a.Len() + } else { + c.a.Timestamps = c.a.Timestamps[:0] + c.a.Values = c.a.Values[:0] + } + return &c.a +} + +type unsignedTimeValuesGeneratorCursor struct { + timeValuesGeneratorCursor + a tsdb.UnsignedArray +} + +func (c *unsignedTimeValuesGeneratorCursor) Next() *cursors.UnsignedArray { + if c.tv.Next() { + c.tv.Values().(gen.UnsignedValues).Copy(&c.a) + c.stats.ScannedBytes += len(c.a.Values) * 8 + c.stats.ScannedValues += c.a.Len() + } else { + c.a.Timestamps = c.a.Timestamps[:0] + c.a.Values = c.a.Values[:0] + } + return &c.a +} + +type stringTimeValuesGeneratorCursor struct { + timeValuesGeneratorCursor + a tsdb.StringArray +} + +func (c *stringTimeValuesGeneratorCursor) Next() *cursors.StringArray { + if c.tv.Next() { + c.tv.Values().(gen.StringValues).Copy(&c.a) + for _, v := range c.a.Values { + c.stats.ScannedBytes += len(v) + } + c.stats.ScannedValues += c.a.Len() + } else { + c.a.Timestamps = c.a.Timestamps[:0] + c.a.Values = c.a.Values[:0] + } + return &c.a +} + +type booleanTimeValuesGeneratorCursor struct { + timeValuesGeneratorCursor + a tsdb.BooleanArray +} + +func (c *booleanTimeValuesGeneratorCursor) Next() *cursors.BooleanArray { + if c.tv.Next() { + c.tv.Values().(gen.BooleanValues).Copy(&c.a) + c.stats.ScannedBytes += len(c.a.Values) + c.stats.ScannedValues += c.a.Len() + } else { + c.a.Timestamps = c.a.Timestamps[:0] + c.a.Values = c.a.Values[:0] + } + return &c.a +} diff --git a/mock/reads_resultset_test.go b/mock/reads_resultset_test.go new file mode 100644 index 0000000000..abbe1a1209 --- /dev/null +++ b/mock/reads_resultset_test.go @@ -0,0 +1,86 @@ +package mock_test + +import ( + "strings" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/influxdata/influxdb/mock" + "github.com/influxdata/influxdb/pkg/data/gen" + "github.com/influxdata/influxdb/storage/reads" + "github.com/influxdata/influxdb/tsdb/cursors" +) + +func mustNewSpecFromToml(tb testing.TB, toml string) *gen.Spec { + tb.Helper() + + spec, err := gen.NewSpecFromToml(toml) + if err != nil { + panic(err) + } + + return spec +} + +func TestNewResultSetFromSeriesGenerator(t *testing.T) { + checkResult := func(t *testing.T, sg gen.SeriesGenerator, expData string, expStats cursors.CursorStats) { + t.Helper() + + rs := mock.NewResultSetFromSeriesGenerator(sg) + var sb strings.Builder + err := reads.ResultSetToLineProtocol(&sb, rs) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + + if got, exp := sb.String(), expData; !cmp.Equal(got, exp) { + t.Errorf("unexpected value -got/+exp\n%s", cmp.Diff(got, exp)) + } + + if got, exp := rs.Stats(), expStats; !cmp.Equal(got, exp) { + t.Errorf("unexpected value -got/+exp\n%s", cmp.Diff(got, exp)) + } + } + + t.Run("float", func(t *testing.T) { + spec := mustNewSpecFromToml(t, ` +[[measurements]] +name = "m0" +sample = 1.0 +tags = [ + { name = "tag0", source = { type = "sequence", start = 0, count = 3 } }, + { name = "tag1", source = { type = "sequence", start = 0, count = 2 } }, +] +fields = [ + { name = "v0", count = 3, source = 1.0 }, +]`) + + sg := gen.NewSeriesGeneratorFromSpec(spec, gen.TimeRange{ + Start: time.Unix(1000, 0), + End: time.Unix(2000, 0), + }) + const expData = `m0,tag0=value0,tag1=value0 v0=1 1000000000000 +m0,tag0=value0,tag1=value0 v0=1 1333333000000 +m0,tag0=value0,tag1=value0 v0=1 1666666000000 +m0,tag0=value0,tag1=value1 v0=1 1000000000000 +m0,tag0=value0,tag1=value1 v0=1 1333333000000 +m0,tag0=value0,tag1=value1 v0=1 1666666000000 +m0,tag0=value1,tag1=value0 v0=1 1000000000000 +m0,tag0=value1,tag1=value0 v0=1 1333333000000 +m0,tag0=value1,tag1=value0 v0=1 1666666000000 +m0,tag0=value1,tag1=value1 v0=1 1000000000000 +m0,tag0=value1,tag1=value1 v0=1 1333333000000 +m0,tag0=value1,tag1=value1 v0=1 1666666000000 +m0,tag0=value2,tag1=value0 v0=1 1000000000000 +m0,tag0=value2,tag1=value0 v0=1 1333333000000 +m0,tag0=value2,tag1=value0 v0=1 1666666000000 +m0,tag0=value2,tag1=value1 v0=1 1000000000000 +m0,tag0=value2,tag1=value1 v0=1 1333333000000 +m0,tag0=value2,tag1=value1 v0=1 1666666000000 +` + expStats := cursors.CursorStats{ScannedValues: 18, ScannedBytes: 18 * 8} + checkResult(t, sg, expData, expStats) + }) + +} diff --git a/storage/reads/resultset_lineprotocol.go b/storage/reads/resultset_lineprotocol.go new file mode 100644 index 0000000000..f821aa6b88 --- /dev/null +++ b/storage/reads/resultset_lineprotocol.go @@ -0,0 +1,129 @@ +package reads + +import ( + "errors" + "io" + "strconv" + + "github.com/influxdata/influxdb/models" + "github.com/influxdata/influxdb/tsdb/cursors" +) + +// ResultSetToLineProtocol transforms rs to line protocol and writes the +// output to wr. +func ResultSetToLineProtocol(wr io.Writer, rs ResultSet) (err error) { + defer rs.Close() + + line := make([]byte, 0, 4096) + for rs.Next() { + tags := rs.Tags() + name := tags.Get(models.MeasurementTagKeyBytes) + field := tags.Get(models.FieldKeyTagKeyBytes) + if len(name) == 0 || len(field) == 0 { + return errors.New("missing measurement / field") + } + + line = append(line[:0], name...) + if tags.Len() > 2 { + tags = tags[1 : len(tags)-1] // take first and last elements which are measurement and field keys + line = tags.AppendHashKey(line) + } + + line = append(line, ' ') + line = append(line, field...) + line = append(line, '=') + err = cursorToLineProtocol(wr, line, rs.Cursor()) + if err != nil { + return err + } + } + + return rs.Err() +} + +func cursorToLineProtocol(wr io.Writer, line []byte, cur cursors.Cursor) error { + var newLine = []byte{'\n'} + + switch ccur := cur.(type) { + case cursors.IntegerArrayCursor: + for { + a := ccur.Next() + if a.Len() > 0 { + for i := range a.Timestamps { + buf := strconv.AppendInt(line, a.Values[i], 10) + buf = append(buf, 'i', ' ') + buf = strconv.AppendInt(buf, a.Timestamps[i], 10) + wr.Write(buf) + wr.Write(newLine) + } + } else { + break + } + } + case cursors.FloatArrayCursor: + for { + a := ccur.Next() + if a.Len() > 0 { + for i := range a.Timestamps { + buf := strconv.AppendFloat(line, a.Values[i], 'f', -1, 64) + buf = append(buf, ' ') + buf = strconv.AppendInt(buf, a.Timestamps[i], 10) + wr.Write(buf) + wr.Write(newLine) + } + } else { + break + } + } + case cursors.UnsignedArrayCursor: + for { + a := ccur.Next() + if a.Len() > 0 { + for i := range a.Timestamps { + buf := strconv.AppendUint(line, a.Values[i], 10) + buf = append(buf, 'u', ' ') + buf = strconv.AppendInt(buf, a.Timestamps[i], 10) + wr.Write(buf) + wr.Write(newLine) + } + } else { + break + } + } + case cursors.BooleanArrayCursor: + for { + a := ccur.Next() + if a.Len() > 0 { + for i := range a.Timestamps { + buf := strconv.AppendBool(line, a.Values[i]) + buf = append(buf, ' ') + buf = strconv.AppendInt(buf, a.Timestamps[i], 10) + wr.Write(buf) + wr.Write(newLine) + } + } else { + break + } + } + case cursors.StringArrayCursor: + for { + a := ccur.Next() + if a.Len() > 0 { + for i := range a.Timestamps { + buf := strconv.AppendQuote(line, a.Values[i]) + buf = append(buf, 'i', ' ') + buf = strconv.AppendInt(buf, a.Timestamps[i], 10) + wr.Write(buf) + wr.Write(newLine) + } + } else { + break + } + } + default: + panic("unreachable") + } + + cur.Close() + return cur.Err() +}