influxdb/patches/storage.patch

924 lines
29 KiB
Diff

diff --git b/storage/reads/datatypes/gen.go a/storage/reads/datatypes/gen.go
index 6df6b5c4a7..54b5a9699b 100644
--- b/storage/reads/datatypes/gen.go
+++ a/storage/reads/datatypes/gen.go
@@ -1,3 +1 @@
package datatypes
-
-//go:generate protoc -I ../../../internal -I . --plugin ../../../scripts/protoc-gen-gogofaster --gogofaster_out=Mgoogle/protobuf/empty.proto=github.com/gogo/protobuf/types,Mgoogle/protobuf/any.proto=github.com/gogo/protobuf/types,plugins=grpc:. storage_common.proto predicate.proto
diff --git b/storage/reads/flux_reader.go a/storage/reads/flux_reader.go
index bb98c2918c..e10bf3eae7 100644
--- b/storage/reads/flux_reader.go
+++ a/storage/reads/flux_reader.go
@@ -10,8 +10,8 @@ import (
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/memory"
"github.com/influxdata/flux/values"
+ "github.com/influxdata/influxdb/flux/stdlib/influxdata/influxdb"
"github.com/influxdata/influxdb/models"
- "github.com/influxdata/influxdb/query/stdlib/influxdata/influxdb"
"github.com/influxdata/influxdb/storage/reads/datatypes"
"github.com/influxdata/influxdb/tsdb/cursors"
)
@@ -106,8 +106,8 @@ func (fi *filterIterator) Statistics() cursors.CursorStats { return fi.stats }
func (fi *filterIterator) Do(f func(flux.Table) error) error {
src := fi.s.GetSource(
- uint64(fi.spec.OrganizationID),
- uint64(fi.spec.BucketID),
+ fi.spec.Database,
+ fi.spec.RetentionPolicy,
)
// Setup read request
@@ -230,8 +230,8 @@ func (gi *groupIterator) Statistics() cursors.CursorStats { return gi.stats }
func (gi *groupIterator) Do(f func(flux.Table) error) error {
src := gi.s.GetSource(
- uint64(gi.spec.OrganizationID),
- uint64(gi.spec.BucketID),
+ gi.spec.Database,
+ gi.spec.RetentionPolicy,
)
// Setup read request
@@ -510,8 +510,8 @@ type tagKeysIterator struct {
func (ti *tagKeysIterator) Do(f func(flux.Table) error) error {
src := ti.s.GetSource(
- uint64(ti.readSpec.OrganizationID),
- uint64(ti.readSpec.BucketID),
+ ti.readSpec.Database,
+ ti.readSpec.RetentionPolicy,
)
var req datatypes.TagKeysRequest
@@ -592,8 +592,8 @@ type tagValuesIterator struct {
func (ti *tagValuesIterator) Do(f func(flux.Table) error) error {
src := ti.s.GetSource(
- uint64(ti.readSpec.OrganizationID),
- uint64(ti.readSpec.BucketID),
+ ti.readSpec.Database,
+ ti.readSpec.RetentionPolicy,
)
var req datatypes.TagValuesRequest
diff --git b/storage/reads/flux_table.go a/storage/reads/flux_table.go
index 58a586c777..952073c314 100644
--- b/storage/reads/flux_table.go
+++ a/storage/reads/flux_table.go
@@ -1,7 +1,5 @@
package reads
-//go:generate env GO111MODULE=on go run github.com/benbjohnson/tmpl -data=@types.tmpldata flux_table.gen.go.tmpl
-
import (
"errors"
"sync/atomic"
diff --git b/storage/reads/flux_table_test.go a/storage/reads/flux_table_test.go
index 620d0e37d7..ff8698b893 100644
--- b/storage/reads/flux_table_test.go
+++ a/storage/reads/flux_table_test.go
@@ -1,174 +1 @@
package reads_test
-
-import (
- "context"
- "io/ioutil"
- "math"
- "math/rand"
- "os"
- "path/filepath"
- "testing"
- "time"
-
- "github.com/influxdata/flux"
- "github.com/influxdata/flux/execute"
- "github.com/influxdata/flux/memory"
- "github.com/influxdata/flux/values"
- "github.com/influxdata/influxdb/cmd/influxd/generate"
- "github.com/influxdata/influxdb/mock"
- "github.com/influxdata/influxdb/models"
- "github.com/influxdata/influxdb/pkg/data/gen"
- "github.com/influxdata/influxdb/query/stdlib/influxdata/influxdb"
- "github.com/influxdata/influxdb/storage"
- "github.com/influxdata/influxdb/storage/reads"
- "github.com/influxdata/influxdb/storage/readservice"
- "go.uber.org/zap/zaptest"
-)
-
-func BenchmarkReadFilter(b *testing.B) {
- idgen := mock.NewMockIDGenerator()
- tagsSpec := &gen.TagsSpec{
- Tags: []*gen.TagValuesSpec{
- {
- TagKey: "t0",
- Values: func() gen.CountableSequence {
- return gen.NewCounterByteSequence("a-%d", 0, 5)
- },
- },
- {
- TagKey: "t1",
- Values: func() gen.CountableSequence {
- return gen.NewCounterByteSequence("b-%d", 0, 1000)
- },
- },
- },
- }
- spec := gen.Spec{
- OrgID: idgen.ID(),
- BucketID: idgen.ID(),
- Measurements: []gen.MeasurementSpec{
- {
- Name: "m0",
- TagsSpec: tagsSpec,
- FieldValuesSpec: &gen.FieldValuesSpec{
- Name: "f0",
- TimeSequenceSpec: gen.TimeSequenceSpec{
- Count: math.MaxInt32,
- Delta: time.Minute,
- },
- DataType: models.Float,
- Values: func(spec gen.TimeSequenceSpec) gen.TimeValuesSequence {
- r := rand.New(rand.NewSource(10))
- return gen.NewTimeFloatValuesSequence(
- spec.Count,
- gen.NewTimestampSequenceFromSpec(spec),
- gen.NewFloatRandomValuesSequence(0, 90, r),
- )
- },
- },
- },
- {
- Name: "m0",
- TagsSpec: tagsSpec,
- FieldValuesSpec: &gen.FieldValuesSpec{
- Name: "f1",
- TimeSequenceSpec: gen.TimeSequenceSpec{
- Count: math.MaxInt32,
- Delta: time.Minute,
- },
- DataType: models.Float,
- Values: func(spec gen.TimeSequenceSpec) gen.TimeValuesSequence {
- r := rand.New(rand.NewSource(11))
- return gen.NewTimeFloatValuesSequence(
- spec.Count,
- gen.NewTimestampSequenceFromSpec(spec),
- gen.NewFloatRandomValuesSequence(0, 180, r),
- )
- },
- },
- },
- {
- Name: "m0",
- TagsSpec: tagsSpec,
- FieldValuesSpec: &gen.FieldValuesSpec{
- Name: "f1",
- TimeSequenceSpec: gen.TimeSequenceSpec{
- Count: math.MaxInt32,
- Delta: time.Minute,
- },
- DataType: models.Float,
- Values: func(spec gen.TimeSequenceSpec) gen.TimeValuesSequence {
- r := rand.New(rand.NewSource(12))
- return gen.NewTimeFloatValuesSequence(
- spec.Count,
- gen.NewTimestampSequenceFromSpec(spec),
- gen.NewFloatRandomValuesSequence(10, 10000, r),
- )
- },
- },
- },
- },
- }
- tr := gen.TimeRange{
- Start: mustParseTime("2019-11-25T00:00:00Z"),
- End: mustParseTime("2019-11-26T00:00:00Z"),
- }
- sg := gen.NewSeriesGeneratorFromSpec(&spec, tr)
- benchmarkRead(b, sg, func(r influxdb.Reader) error {
- mem := &memory.Allocator{}
- tables, err := r.ReadFilter(context.Background(), influxdb.ReadFilterSpec{
- OrganizationID: spec.OrgID,
- BucketID: spec.BucketID,
- Bounds: execute.Bounds{
- Start: values.ConvertTime(tr.Start),
- Stop: values.ConvertTime(tr.End),
- },
- }, mem)
- if err != nil {
- return err
- }
- return tables.Do(func(table flux.Table) error {
- table.Done()
- return nil
- })
- })
-}
-
-func benchmarkRead(b *testing.B, sg gen.SeriesGenerator, f func(r influxdb.Reader) error) {
- logger := zaptest.NewLogger(b)
- rootDir, err := ioutil.TempDir("", "storage-reads-test")
- if err != nil {
- b.Fatal(err)
- }
- defer func() { _ = os.RemoveAll(rootDir) }()
-
- generator := generate.Generator{}
- if _, err := generator.Run(context.Background(), rootDir, sg); err != nil {
- b.Fatal(err)
- }
-
- enginePath := filepath.Join(rootDir, "engine")
- engine := storage.NewEngine(enginePath, storage.NewConfig())
- engine.WithLogger(logger)
-
- if err := engine.Open(context.Background()); err != nil {
- b.Fatal(err)
- }
- reader := reads.NewReader(readservice.NewStore(engine))
-
- b.ResetTimer()
- b.ReportAllocs()
- for i := 0; i < b.N; i++ {
- if err := f(reader); err != nil {
- b.Fatal(err)
- }
- }
-}
-
-func mustParseTime(s string) time.Time {
- ts, err := time.Parse(time.RFC3339, s)
- if err != nil {
- panic(err)
- }
- return ts
-}
diff --git b/storage/reads/gen.go a/storage/reads/gen.go
index 9e80e93ba6..8eee6fe0b5 100644
--- b/storage/reads/gen.go
+++ a/storage/reads/gen.go
@@ -1,4 +1 @@
package reads
-
-//go:generate env GO111MODULE=on go run github.com/benbjohnson/tmpl -data=@array_cursor.gen.go.tmpldata array_cursor.gen.go.tmpl
-//go:generate env GO111MODULE=on go run github.com/benbjohnson/tmpl -data=@array_cursor.gen.go.tmpldata response_writer.gen.go.tmpl
diff --git b/storage/reads/group_resultset.go a/storage/reads/group_resultset.go
index 24766cff67..21e0e2b4c9 100644
--- b/storage/reads/group_resultset.go
+++ a/storage/reads/group_resultset.go
@@ -7,7 +7,6 @@ import (
"math"
"sort"
- "github.com/influxdata/influxdb/kit/tracing"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/storage/reads/datatypes"
"github.com/influxdata/influxdb/tsdb/cursors"
@@ -112,16 +111,7 @@ func (g *groupResultSet) Next() GroupCursor {
}
func (g *groupResultSet) sort() (int, error) {
- span, _ := tracing.StartSpanFromContext(g.ctx)
- defer span.Finish()
- span.LogKV("group_type", g.req.Group.String())
-
n, err := g.sortFn(g)
-
- if err != nil {
- span.LogKV("rows", n)
- }
-
return n, err
}
diff --git b/storage/reads/group_resultset_test.go a/storage/reads/group_resultset_test.go
index ee13d16167..eb1fc91fc3 100644
--- b/storage/reads/group_resultset_test.go
+++ a/storage/reads/group_resultset_test.go
@@ -394,7 +394,7 @@ func BenchmarkNewGroupResultSet_GroupBy(b *testing.B) {
vals[i] = gen.NewCounterByteSequenceCount(card[i])
}
- tags := gen.NewTagsValuesSequenceValues("m0", "f0", "tag", vals)
+ tags := gen.NewTagsValuesSequenceValues("tag", vals)
rows := make([]reads.SeriesRow, tags.Count())
for i := range rows {
tags.Next()
diff --git b/storage/reads/helpers_test.go a/storage/reads/helpers_test.go
index d688ae3658..ff8698b893 100644
--- b/storage/reads/helpers_test.go
+++ a/storage/reads/helpers_test.go
@@ -1,169 +1 @@
package reads_test
-
-import (
- "context"
-
- "github.com/influxdata/influxdb/models"
- "github.com/influxdata/influxdb/pkg/data/gen"
- "github.com/influxdata/influxdb/storage/reads"
- "github.com/influxdata/influxdb/tsdb"
- "github.com/influxdata/influxdb/tsdb/cursors"
-)
-
-type seriesGeneratorCursorIterator struct {
- g gen.SeriesGenerator
- f floatTimeValuesGeneratorCursor
- i integerTimeValuesGeneratorCursor
- u unsignedTimeValuesGeneratorCursor
- s stringTimeValuesGeneratorCursor
- b booleanTimeValuesGeneratorCursor
- cur cursors.Cursor
-}
-
-func (ci *seriesGeneratorCursorIterator) Next(ctx context.Context, r *cursors.CursorRequest) (cursors.Cursor, error) {
- switch ci.g.FieldType() {
- case models.Float:
- ci.f.tv = ci.g.TimeValuesGenerator()
- ci.cur = &ci.f
- case models.Integer:
- ci.i.tv = ci.g.TimeValuesGenerator()
- ci.cur = &ci.i
- case models.Unsigned:
- ci.u.tv = ci.g.TimeValuesGenerator()
- ci.cur = &ci.u
- case models.String:
- ci.s.tv = ci.g.TimeValuesGenerator()
- ci.cur = &ci.s
- case models.Boolean:
- ci.b.tv = ci.g.TimeValuesGenerator()
- ci.cur = &ci.b
- default:
- panic("unreachable")
- }
-
- return ci.cur, nil
-}
-
-func (ci *seriesGeneratorCursorIterator) Stats() cursors.CursorStats {
- return ci.cur.Stats()
-}
-
-type seriesGeneratorSeriesCursor struct {
- ci seriesGeneratorCursorIterator
- r reads.SeriesRow
-}
-
-func newSeriesGeneratorSeriesCursor(g gen.SeriesGenerator) *seriesGeneratorSeriesCursor {
- s := &seriesGeneratorSeriesCursor{}
- s.ci.g = g
- s.r.Query = tsdb.CursorIterators{&s.ci}
- return s
-}
-
-func (s *seriesGeneratorSeriesCursor) Close() {}
-func (s *seriesGeneratorSeriesCursor) Err() error { return nil }
-
-func (s *seriesGeneratorSeriesCursor) Next() *reads.SeriesRow {
- if s.ci.g.Next() {
- s.r.SeriesTags = s.ci.g.Tags()
- s.r.Tags = s.ci.g.Tags()
- return &s.r
- }
- return nil
-}
-
-type timeValuesGeneratorCursor struct {
- tv gen.TimeValuesSequence
- stats cursors.CursorStats
-}
-
-func (t timeValuesGeneratorCursor) Close() {}
-func (t timeValuesGeneratorCursor) Err() error { return nil }
-func (t timeValuesGeneratorCursor) Stats() cursors.CursorStats { return t.stats }
-
-type floatTimeValuesGeneratorCursor struct {
- timeValuesGeneratorCursor
- a tsdb.FloatArray
-}
-
-func (c *floatTimeValuesGeneratorCursor) Next() *cursors.FloatArray {
- if c.tv.Next() {
- c.tv.Values().(gen.FloatValues).Copy(&c.a)
- } else {
- c.a.Timestamps = c.a.Timestamps[:0]
- c.a.Values = c.a.Values[:0]
- }
- c.stats.ScannedBytes += len(c.a.Values) * 8
- c.stats.ScannedValues += c.a.Len()
- return &c.a
-}
-
-type integerTimeValuesGeneratorCursor struct {
- timeValuesGeneratorCursor
- a tsdb.IntegerArray
-}
-
-func (c *integerTimeValuesGeneratorCursor) Next() *cursors.IntegerArray {
- if c.tv.Next() {
- c.tv.Values().(gen.IntegerValues).Copy(&c.a)
- } else {
- c.a.Timestamps = c.a.Timestamps[:0]
- c.a.Values = c.a.Values[:0]
- }
- c.stats.ScannedBytes += len(c.a.Values) * 8
- c.stats.ScannedValues += c.a.Len()
- return &c.a
-}
-
-type unsignedTimeValuesGeneratorCursor struct {
- timeValuesGeneratorCursor
- a tsdb.UnsignedArray
-}
-
-func (c *unsignedTimeValuesGeneratorCursor) Next() *cursors.UnsignedArray {
- if c.tv.Next() {
- c.tv.Values().(gen.UnsignedValues).Copy(&c.a)
- } else {
- c.a.Timestamps = c.a.Timestamps[:0]
- c.a.Values = c.a.Values[:0]
- }
- c.stats.ScannedBytes += len(c.a.Values) * 8
- c.stats.ScannedValues += c.a.Len()
- return &c.a
-}
-
-type stringTimeValuesGeneratorCursor struct {
- timeValuesGeneratorCursor
- a tsdb.StringArray
-}
-
-func (c *stringTimeValuesGeneratorCursor) Next() *cursors.StringArray {
- if c.tv.Next() {
- c.tv.Values().(gen.StringValues).Copy(&c.a)
- } else {
- c.a.Timestamps = c.a.Timestamps[:0]
- c.a.Values = c.a.Values[:0]
- }
- for _, v := range c.a.Values {
- c.stats.ScannedBytes += len(v)
- }
- c.stats.ScannedValues += c.a.Len()
- return &c.a
-}
-
-type booleanTimeValuesGeneratorCursor struct {
- timeValuesGeneratorCursor
- a tsdb.BooleanArray
-}
-
-func (c *booleanTimeValuesGeneratorCursor) Next() *cursors.BooleanArray {
- if c.tv.Next() {
- c.tv.Values().(gen.BooleanValues).Copy(&c.a)
- } else {
- c.a.Timestamps = c.a.Timestamps[:0]
- c.a.Values = c.a.Values[:0]
- }
- c.stats.ScannedBytes += len(c.a.Values)
- c.stats.ScannedValues += c.a.Len()
- return &c.a
-}
diff --git b/storage/reads/response_writer_test.go a/storage/reads/response_writer_test.go
index 0916c822b2..0abaf75443 100644
--- b/storage/reads/response_writer_test.go
+++ a/storage/reads/response_writer_test.go
@@ -1,21 +1,12 @@
package reads_test
import (
- "context"
- "errors"
"fmt"
"reflect"
- "strings"
"testing"
- "time"
- "github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/mock"
- "github.com/influxdata/influxdb/pkg/data/gen"
- "github.com/influxdata/influxdb/pkg/testing/assert"
"github.com/influxdata/influxdb/storage/reads"
- "github.com/influxdata/influxdb/storage/reads/datatypes"
- "github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/influxdb/tsdb/cursors"
"google.golang.org/grpc/metadata"
)
@@ -132,403 +123,3 @@ func TestResponseWriter_WriteGroupResultSet_Stats(t *testing.T) {
t.Errorf("expected scanned-bytes '%v' but got '%v'", []string{fmt.Sprint(scannedBytes)}, gotTrailer.Get("scanned-bytes"))
}
}
-
-var (
- org = influxdb.ID(0xff00ff00)
- bucket = influxdb.ID(0xcc00cc00)
- orgBucketID = tsdb.EncodeName(org, bucket)
-)
-
-func makeTypedSeries(m, prefix, field string, val interface{}, valueCount int, counts ...int) gen.SeriesGenerator {
- spec := gen.TimeSequenceSpec{Count: valueCount, Start: time.Unix(0, 0), Delta: time.Second}
- ts := gen.NewTimestampSequenceFromSpec(spec)
- var vg gen.TimeValuesSequence
- switch val := val.(type) {
- case float64:
- vg = gen.NewTimeFloatValuesSequence(spec.Count, ts, gen.NewFloatConstantValuesSequence(val))
- case int64:
- vg = gen.NewTimeIntegerValuesSequence(spec.Count, ts, gen.NewIntegerConstantValuesSequence(val))
- case int:
- vg = gen.NewTimeIntegerValuesSequence(spec.Count, ts, gen.NewIntegerConstantValuesSequence(int64(val)))
- case uint64:
- vg = gen.NewTimeUnsignedValuesSequence(spec.Count, ts, gen.NewUnsignedConstantValuesSequence(val))
- case string:
- vg = gen.NewTimeStringValuesSequence(spec.Count, ts, gen.NewStringConstantValuesSequence(val))
- case bool:
- vg = gen.NewTimeBooleanValuesSequence(spec.Count, ts, gen.NewBooleanConstantValuesSequence(val))
- default:
- panic(fmt.Sprintf("unexpected type %T", val))
- }
-
- return gen.NewSeriesGenerator(orgBucketID, []byte(field), vg, gen.NewTagsValuesSequenceCounts(m, field, prefix, counts))
-}
-
-type sendSummary struct {
- groupCount int
- seriesCount int
- floatCount int
- integerCount int
- unsignedCount int
- stringCount int
- booleanCount int
-}
-
-func (ss *sendSummary) makeSendFunc() func(*datatypes.ReadResponse) error {
- return func(r *datatypes.ReadResponse) error {
- for i := range r.Frames {
- d := r.Frames[i].Data
- switch p := d.(type) {
- case *datatypes.ReadResponse_Frame_FloatPoints:
- ss.floatCount += len(p.FloatPoints.Values)
- case *datatypes.ReadResponse_Frame_IntegerPoints:
- ss.integerCount += len(p.IntegerPoints.Values)
- case *datatypes.ReadResponse_Frame_UnsignedPoints:
- ss.unsignedCount += len(p.UnsignedPoints.Values)
- case *datatypes.ReadResponse_Frame_StringPoints:
- ss.stringCount += len(p.StringPoints.Values)
- case *datatypes.ReadResponse_Frame_BooleanPoints:
- ss.booleanCount += len(p.BooleanPoints.Values)
- case *datatypes.ReadResponse_Frame_Series:
- ss.seriesCount++
- case *datatypes.ReadResponse_Frame_Group:
- ss.groupCount++
- default:
- panic("unexpected")
- }
- }
- return nil
- }
-}
-
-func TestResponseWriter_WriteResultSet(t *testing.T) {
- t.Run("normal", func(t *testing.T) {
- t.Run("all types one series each", func(t *testing.T) {
- exp := sendSummary{
- seriesCount: 5,
- floatCount: 500,
- integerCount: 400,
- unsignedCount: 300,
- stringCount: 200,
- booleanCount: 100,
- }
- var ss sendSummary
-
- stream := mock.NewResponseStream()
- stream.SendFunc = ss.makeSendFunc()
- w := reads.NewResponseWriter(stream, 0)
-
- var gens []gen.SeriesGenerator
-
- gens = append(gens, makeTypedSeries("m0", "t", "ff", 3.3, exp.floatCount, 1))
- gens = append(gens, makeTypedSeries("m0", "t", "if", 100, exp.integerCount, 1))
- gens = append(gens, makeTypedSeries("m0", "t", "uf", uint64(25), exp.unsignedCount, 1))
- gens = append(gens, makeTypedSeries("m0", "t", "sf", "foo", exp.stringCount, 1))
- gens = append(gens, makeTypedSeries("m0", "t", "bf", false, exp.booleanCount, 1))
-
- cur := newSeriesGeneratorSeriesCursor(gen.NewMergedSeriesGenerator(gens))
- rs := reads.NewFilteredResultSet(context.Background(), &datatypes.ReadFilterRequest{}, cur)
- err := w.WriteResultSet(rs)
- if err != nil {
- t.Fatalf("unexpected err: %v", err)
- }
- w.Flush()
-
- assert.Equal(t, ss, exp)
- })
- t.Run("multi-series floats", func(t *testing.T) {
- exp := sendSummary{
- seriesCount: 5,
- floatCount: 8600,
- }
- var ss sendSummary
-
- stream := mock.NewResponseStream()
- stream.SendFunc = ss.makeSendFunc()
- w := reads.NewResponseWriter(stream, 0)
-
- var gens []gen.SeriesGenerator
- gens = append(gens, makeTypedSeries("m0", "t", "f0", 3.3, 2000, 1))
- gens = append(gens, makeTypedSeries("m0", "t", "f1", 5.3, 1500, 1))
- gens = append(gens, makeTypedSeries("m0", "t", "f2", 5.3, 2500, 1))
- gens = append(gens, makeTypedSeries("m0", "t", "f3", -2.2, 900, 1))
- gens = append(gens, makeTypedSeries("m0", "t", "f4", -9.2, 1700, 1))
-
- cur := newSeriesGeneratorSeriesCursor(gen.NewMergedSeriesGenerator(gens))
- rs := reads.NewFilteredResultSet(context.Background(), &datatypes.ReadFilterRequest{}, cur)
- err := w.WriteResultSet(rs)
- if err != nil {
- t.Fatalf("unexpected err: %v", err)
- }
- w.Flush()
-
- assert.Equal(t, ss, exp)
- })
-
- t.Run("multi-series strings", func(t *testing.T) {
- exp := sendSummary{
- seriesCount: 4,
- stringCount: 6900,
- }
- var ss sendSummary
-
- stream := mock.NewResponseStream()
- stream.SendFunc = ss.makeSendFunc()
- w := reads.NewResponseWriter(stream, 0)
-
- var gens []gen.SeriesGenerator
- gens = append(gens, makeTypedSeries("m0", "t", "s0", strings.Repeat("aaa", 100), 2000, 1))
- gens = append(gens, makeTypedSeries("m0", "t", "s1", strings.Repeat("bbb", 200), 1500, 1))
- gens = append(gens, makeTypedSeries("m0", "t", "s2", strings.Repeat("ccc", 300), 2500, 1))
- gens = append(gens, makeTypedSeries("m0", "t", "s3", strings.Repeat("ddd", 200), 900, 1))
-
- cur := newSeriesGeneratorSeriesCursor(gen.NewMergedSeriesGenerator(gens))
- rs := reads.NewFilteredResultSet(context.Background(), &datatypes.ReadFilterRequest{}, cur)
- err := w.WriteResultSet(rs)
- if err != nil {
- t.Fatalf("unexpected err: %v", err)
- }
- w.Flush()
-
- assert.Equal(t, ss, exp)
- })
-
- t.Run("writer doesn't send series with no values", func(t *testing.T) {
- exp := sendSummary{
- seriesCount: 2,
- stringCount: 3700,
- }
- var ss sendSummary
-
- stream := mock.NewResponseStream()
- stream.SendFunc = ss.makeSendFunc()
- w := reads.NewResponseWriter(stream, 0)
-
- var gens []gen.SeriesGenerator
- gens = append(gens, makeTypedSeries("m0", "t", "s0", strings.Repeat("aaa", 100), 2000, 1))
- gens = append(gens, makeTypedSeries("m0", "t", "s1", strings.Repeat("bbb", 200), 0, 1))
- gens = append(gens, makeTypedSeries("m0", "t", "s2", strings.Repeat("ccc", 100), 1700, 1))
- cur := newSeriesGeneratorSeriesCursor(gen.NewMergedSeriesGenerator(gens))
-
- rs := reads.NewFilteredResultSet(context.Background(), &datatypes.ReadFilterRequest{}, cur)
- err := w.WriteResultSet(rs)
- if err != nil {
- t.Fatalf("unexpected err: %v", err)
- }
- w.Flush()
-
- assert.Equal(t, ss, exp)
- })
- })
-
- t.Run("error conditions", func(t *testing.T) {
- t.Run("writer returns stream error", func(t *testing.T) {
- exp := errors.New("no write")
-
- stream := mock.NewResponseStream()
- stream.SendFunc = func(r *datatypes.ReadResponse) error { return exp }
- w := reads.NewResponseWriter(stream, 0)
-
- cur := newSeriesGeneratorSeriesCursor(makeTypedSeries("m0", "t", "f0", strings.Repeat("0", 1000), 2000, 1))
- rs := reads.NewFilteredResultSet(context.Background(), &datatypes.ReadFilterRequest{}, cur)
- _ = w.WriteResultSet(rs)
- assert.Equal(t, w.Err(), exp)
- })
- })
-
- t.Run("issues", func(t *testing.T) {
- t.Run("short write", func(t *testing.T) {
- t.Run("single string series", func(t *testing.T) {
- exp := sendSummary{seriesCount: 1, stringCount: 1020}
- var ss sendSummary
-
- stream := mock.NewResponseStream()
- stream.SendFunc = ss.makeSendFunc()
- w := reads.NewResponseWriter(stream, 0)
-
- cur := newSeriesGeneratorSeriesCursor(makeTypedSeries("m0", "t", "f0", strings.Repeat("0", 1000), exp.stringCount, 1))
- rs := reads.NewFilteredResultSet(context.Background(), &datatypes.ReadFilterRequest{}, cur)
- err := w.WriteResultSet(rs)
- if err != nil {
- t.Fatalf("unexpected err: %v", err)
- }
- w.Flush()
-
- assert.Equal(t, ss, exp)
- })
-
- t.Run("single float series", func(t *testing.T) {
- exp := sendSummary{seriesCount: 1, floatCount: 50500}
- var ss sendSummary
-
- stream := mock.NewResponseStream()
- stream.SendFunc = ss.makeSendFunc()
- w := reads.NewResponseWriter(stream, 0)
-
- cur := newSeriesGeneratorSeriesCursor(makeTypedSeries("m0", "t", "f0", 5.5, exp.floatCount, 1))
- rs := reads.NewFilteredResultSet(context.Background(), &datatypes.ReadFilterRequest{}, cur)
- err := w.WriteResultSet(rs)
- if err != nil {
- t.Fatalf("unexpected err: %v", err)
- }
- w.Flush()
-
- assert.Equal(t, ss, exp)
- })
-
- t.Run("multi series", func(t *testing.T) {
- exp := sendSummary{seriesCount: 2, stringCount: 3700}
- var ss sendSummary
-
- stream := mock.NewResponseStream()
- stream.SendFunc = ss.makeSendFunc()
- w := reads.NewResponseWriter(stream, 0)
-
- var gens []gen.SeriesGenerator
- gens = append(gens, makeTypedSeries("m0", "t", "s0", strings.Repeat("aaa", 1000), 2200, 1))
- gens = append(gens, makeTypedSeries("m0", "t", "s1", strings.Repeat("bbb", 1000), 1500, 1))
-
- cur := newSeriesGeneratorSeriesCursor(gen.NewMergedSeriesGenerator(gens))
- rs := reads.NewFilteredResultSet(context.Background(), &datatypes.ReadFilterRequest{}, cur)
- err := w.WriteResultSet(rs)
- if err != nil {
- t.Fatalf("unexpected err: %v", err)
- }
- w.Flush()
-
- assert.Equal(t, ss, exp)
- })
- })
- })
-}
-
-func TestResponseWriter_WriteGroupResultSet(t *testing.T) {
- t.Run("normal", func(t *testing.T) {
- t.Run("all types one series each", func(t *testing.T) {
- exp := sendSummary{
- groupCount: 1,
- seriesCount: 5,
- floatCount: 500,
- integerCount: 400,
- unsignedCount: 300,
- stringCount: 200,
- booleanCount: 100,
- }
- var ss sendSummary
-
- stream := mock.NewResponseStream()
- stream.SendFunc = ss.makeSendFunc()
- w := reads.NewResponseWriter(stream, 0)
-
- newCursor := func() (cursor reads.SeriesCursor, e error) {
- var gens []gen.SeriesGenerator
- gens = append(gens, makeTypedSeries("m0", "t", "ff", 3.3, exp.floatCount, 1))
- gens = append(gens, makeTypedSeries("m0", "t", "if", 100, exp.integerCount, 1))
- gens = append(gens, makeTypedSeries("m0", "t", "uf", uint64(25), exp.unsignedCount, 1))
- gens = append(gens, makeTypedSeries("m0", "t", "sf", "foo", exp.stringCount, 1))
- gens = append(gens, makeTypedSeries("m0", "t", "bf", false, exp.booleanCount, 1))
- return newSeriesGeneratorSeriesCursor(gen.NewMergedSeriesGenerator(gens)), nil
- }
-
- rs := reads.NewGroupResultSet(context.Background(), &datatypes.ReadGroupRequest{Group: datatypes.GroupNone}, newCursor)
- err := w.WriteGroupResultSet(rs)
- if err != nil {
- t.Fatalf("unexpected err: %v", err)
- }
- w.Flush()
-
- assert.Equal(t, ss, exp)
- })
- t.Run("multi-series floats", func(t *testing.T) {
- exp := sendSummary{
- groupCount: 1,
- seriesCount: 5,
- floatCount: 8600,
- }
- var ss sendSummary
-
- stream := mock.NewResponseStream()
- stream.SendFunc = ss.makeSendFunc()
- w := reads.NewResponseWriter(stream, 0)
-
- newCursor := func() (cursor reads.SeriesCursor, e error) {
- var gens []gen.SeriesGenerator
- gens = append(gens, makeTypedSeries("m0", "t", "f0", 3.3, 2000, 1))
- gens = append(gens, makeTypedSeries("m0", "t", "f1", 5.3, 1500, 1))
- gens = append(gens, makeTypedSeries("m0", "t", "f2", 5.3, 2500, 1))
- gens = append(gens, makeTypedSeries("m0", "t", "f3", -2.2, 900, 1))
- gens = append(gens, makeTypedSeries("m0", "t", "f4", -9.2, 1700, 1))
- return newSeriesGeneratorSeriesCursor(gen.NewMergedSeriesGenerator(gens)), nil
- }
-
- rs := reads.NewGroupResultSet(context.Background(), &datatypes.ReadGroupRequest{Group: datatypes.GroupNone}, newCursor)
- err := w.WriteGroupResultSet(rs)
- if err != nil {
- t.Fatalf("unexpected err: %v", err)
- }
- w.Flush()
-
- assert.Equal(t, ss, exp)
- })
-
- t.Run("multi-series strings", func(t *testing.T) {
- exp := sendSummary{
- groupCount: 1,
- seriesCount: 4,
- stringCount: 6900,
- }
- var ss sendSummary
-
- stream := mock.NewResponseStream()
- stream.SendFunc = ss.makeSendFunc()
- w := reads.NewResponseWriter(stream, 0)
-
- newCursor := func() (cursor reads.SeriesCursor, e error) {
- var gens []gen.SeriesGenerator
- gens = append(gens, makeTypedSeries("m0", "t", "s0", strings.Repeat("aaa", 100), 2000, 1))
- gens = append(gens, makeTypedSeries("m0", "t", "s1", strings.Repeat("bbb", 200), 1500, 1))
- gens = append(gens, makeTypedSeries("m0", "t", "s2", strings.Repeat("ccc", 300), 2500, 1))
- gens = append(gens, makeTypedSeries("m0", "t", "s3", strings.Repeat("ddd", 200), 900, 1))
- return newSeriesGeneratorSeriesCursor(gen.NewMergedSeriesGenerator(gens)), nil
- }
-
- rs := reads.NewGroupResultSet(context.Background(), &datatypes.ReadGroupRequest{Group: datatypes.GroupNone}, newCursor)
- err := w.WriteGroupResultSet(rs)
- if err != nil {
- t.Fatalf("unexpected err: %v", err)
- }
- w.Flush()
-
- assert.Equal(t, ss, exp)
- })
-
- t.Run("writer doesn't send series with no values", func(t *testing.T) {
- exp := sendSummary{
- groupCount: 1,
- seriesCount: 2,
- stringCount: 3700,
- }
- var ss sendSummary
-
- stream := mock.NewResponseStream()
- stream.SendFunc = ss.makeSendFunc()
- w := reads.NewResponseWriter(stream, 0)
-
- newCursor := func() (cursor reads.SeriesCursor, e error) {
- var gens []gen.SeriesGenerator
- gens = append(gens, makeTypedSeries("m0", "t", "s0", strings.Repeat("aaa", 100), 2000, 1))
- gens = append(gens, makeTypedSeries("m0", "t", "s1", strings.Repeat("bbb", 200), 0, 1))
- gens = append(gens, makeTypedSeries("m0", "t", "s2", strings.Repeat("ccc", 100), 1700, 1))
- return newSeriesGeneratorSeriesCursor(gen.NewMergedSeriesGenerator(gens)), nil
- }
-
- rs := reads.NewGroupResultSet(context.Background(), &datatypes.ReadGroupRequest{Group: datatypes.GroupNone}, newCursor)
- err := w.WriteGroupResultSet(rs)
- if err != nil {
- t.Fatalf("unexpected err: %v", err)
- }
- w.Flush()
-
- assert.Equal(t, ss, exp)
- })
- })
-}
diff --git b/storage/reads/store.go a/storage/reads/store.go
index 8918794b37..655d12d21c 100644
--- b/storage/reads/store.go
+++ a/storage/reads/store.go
@@ -80,5 +80,5 @@ type Store interface {
TagKeys(ctx context.Context, req *datatypes.TagKeysRequest) (cursors.StringIterator, error)
TagValues(ctx context.Context, req *datatypes.TagValuesRequest) (cursors.StringIterator, error)
- GetSource(orgID, bucketID uint64) proto.Message
+ GetSource(db, rp string) proto.Message
}