refactor(storage): remove reads.ResponseWriter (#17137)

ResponseWriter is only used in IDPE, so move it to that repo.
pull/17138/head
Jacob Marble 2020-03-06 15:54:11 -08:00 committed by GitHub
parent 1c9fd705a3
commit 8bfe05e554
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 0 additions and 1900 deletions

View File

@ -1,268 +0,0 @@
package mock
import (
"context"
"github.com/gogo/protobuf/proto"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/storage/reads"
"github.com/influxdata/influxdb/storage/reads/datatypes"
"github.com/influxdata/influxdb/tsdb/cursors"
"google.golang.org/grpc/metadata"
)
type ResponseStream struct {
SendFunc func(*datatypes.ReadResponse) error
SetTrailerFunc func(metadata.MD)
}
func NewResponseStream() *ResponseStream {
return &ResponseStream{
SendFunc: func(*datatypes.ReadResponse) error { return nil },
SetTrailerFunc: func(mds metadata.MD) {},
}
}
func (s *ResponseStream) Send(r *datatypes.ReadResponse) error {
return s.SendFunc(r)
}
func (s *ResponseStream) SetTrailer(m metadata.MD) {
s.SetTrailerFunc(m)
}
type ResultSet struct {
NextFunc func() bool
CursorFunc func() cursors.Cursor
TagsFunc func() models.Tags
CloseFunc func()
ErrFunc func() error
StatsFunc func() cursors.CursorStats
}
func NewResultSet() *ResultSet {
return &ResultSet{
NextFunc: func() bool { return false },
CursorFunc: func() cursors.Cursor { return nil },
TagsFunc: func() models.Tags { return nil },
CloseFunc: func() {},
ErrFunc: func() error { return nil },
StatsFunc: func() cursors.CursorStats { return cursors.CursorStats{} },
}
}
func (rs *ResultSet) Next() bool {
return rs.NextFunc()
}
func (rs *ResultSet) Cursor() cursors.Cursor {
return rs.CursorFunc()
}
func (rs *ResultSet) Tags() models.Tags {
return rs.TagsFunc()
}
func (rs *ResultSet) Close() {
rs.CloseFunc()
}
func (rs *ResultSet) Err() error {
return rs.ErrFunc()
}
func (rs *ResultSet) Stats() cursors.CursorStats {
return rs.StatsFunc()
}
type GroupResultSet struct {
NextFunc func() reads.GroupCursor
CloseFunc func()
ErrFunc func() error
}
func NewGroupResultSet() *GroupResultSet {
return &GroupResultSet{
NextFunc: func() reads.GroupCursor { return nil },
CloseFunc: func() {},
ErrFunc: func() error { return nil },
}
}
func (rs *GroupResultSet) Next() reads.GroupCursor {
return rs.NextFunc()
}
func (rs *GroupResultSet) Close() {
rs.CloseFunc()
}
func (rs *GroupResultSet) Err() error {
return rs.ErrFunc()
}
type FloatArrayCursor struct {
CloseFunc func()
Errfunc func() error
StatsFunc func() cursors.CursorStats
NextFunc func() *cursors.FloatArray
}
func NewFloatArrayCursor() *FloatArrayCursor {
return &FloatArrayCursor{
CloseFunc: func() {},
Errfunc: func() error { return nil },
StatsFunc: func() cursors.CursorStats { return cursors.CursorStats{} },
NextFunc: func() *cursors.FloatArray { return &cursors.FloatArray{} },
}
}
func (c *FloatArrayCursor) Close() {
c.CloseFunc()
}
func (c *FloatArrayCursor) Err() error {
return c.Errfunc()
}
func (c *FloatArrayCursor) Stats() cursors.CursorStats {
return c.StatsFunc()
}
func (c *FloatArrayCursor) Next() *cursors.FloatArray {
return c.NextFunc()
}
type IntegerArrayCursor struct {
CloseFunc func()
Errfunc func() error
StatsFunc func() cursors.CursorStats
NextFunc func() *cursors.IntegerArray
}
func NewIntegerArrayCursor() *IntegerArrayCursor {
return &IntegerArrayCursor{
CloseFunc: func() {},
Errfunc: func() error { return nil },
StatsFunc: func() cursors.CursorStats { return cursors.CursorStats{} },
NextFunc: func() *cursors.IntegerArray { return &cursors.IntegerArray{} },
}
}
func (c *IntegerArrayCursor) Close() {
c.CloseFunc()
}
func (c *IntegerArrayCursor) Err() error {
return c.Errfunc()
}
func (c *IntegerArrayCursor) Stats() cursors.CursorStats {
return c.StatsFunc()
}
func (c *IntegerArrayCursor) Next() *cursors.IntegerArray {
return c.NextFunc()
}
type GroupCursor struct {
NextFunc func() bool
CursorFunc func() cursors.Cursor
TagsFunc func() models.Tags
KeysFunc func() [][]byte
PartitionKeyValsFunc func() [][]byte
CloseFunc func()
ErrFunc func() error
StatsFunc func() cursors.CursorStats
}
func NewGroupCursor() *GroupCursor {
return &GroupCursor{
NextFunc: func() bool { return false },
CursorFunc: func() cursors.Cursor { return nil },
TagsFunc: func() models.Tags { return nil },
KeysFunc: func() [][]byte { return nil },
PartitionKeyValsFunc: func() [][]byte { return nil },
CloseFunc: func() {},
ErrFunc: func() error { return nil },
StatsFunc: func() cursors.CursorStats { return cursors.CursorStats{} },
}
}
func (c *GroupCursor) Next() bool {
return c.NextFunc()
}
func (c *GroupCursor) Cursor() cursors.Cursor {
return c.CursorFunc()
}
func (c *GroupCursor) Tags() models.Tags {
return c.TagsFunc()
}
func (c *GroupCursor) Keys() [][]byte {
return c.KeysFunc()
}
func (c *GroupCursor) PartitionKeyVals() [][]byte {
return c.PartitionKeyValsFunc()
}
func (c *GroupCursor) Close() {
c.CloseFunc()
}
func (c *GroupCursor) Err() error {
return c.ErrFunc()
}
func (c *GroupCursor) Stats() cursors.CursorStats {
return c.StatsFunc()
}
type StoreReader struct {
ReadFilterFunc func(ctx context.Context, req *datatypes.ReadFilterRequest) (reads.ResultSet, error)
ReadGroupFunc func(ctx context.Context, req *datatypes.ReadGroupRequest) (reads.GroupResultSet, error)
TagKeysFunc func(ctx context.Context, req *datatypes.TagKeysRequest) (cursors.StringIterator, error)
TagValuesFunc func(ctx context.Context, req *datatypes.TagValuesRequest) (cursors.StringIterator, error)
}
func NewStoreReader() *StoreReader {
return &StoreReader{}
}
func (s *StoreReader) ReadFilter(ctx context.Context, req *datatypes.ReadFilterRequest) (reads.ResultSet, error) {
return s.ReadFilterFunc(ctx, req)
}
func (s *StoreReader) ReadGroup(ctx context.Context, req *datatypes.ReadGroupRequest) (reads.GroupResultSet, error) {
return s.ReadGroupFunc(ctx, req)
}
func (s *StoreReader) TagKeys(ctx context.Context, req *datatypes.TagKeysRequest) (cursors.StringIterator, error) {
return s.TagKeysFunc(ctx, req)
}
func (s *StoreReader) TagValues(ctx context.Context, req *datatypes.TagValuesRequest) (cursors.StringIterator, error) {
return s.TagValuesFunc(ctx, req)
}
// this is easier than fooling around with .proto files.
type readSource struct {
BucketID uint64 `protobuf:"varint,1,opt,name=bucket_id,proto3"`
OrganizationID uint64 `protobuf:"varint,2,opt,name=organization_id,proto3"`
}
func (r *readSource) XXX_MessageName() string { return "readSource" }
func (r *readSource) Reset() { *r = readSource{} }
func (r *readSource) String() string { return "readSource{}" }
func (r *readSource) ProtoMessage() {}
func (*StoreReader) GetSource(orgID, bucketID uint64) proto.Message {
return &readSource{
BucketID: bucketID,
OrganizationID: orgID,
}
}

View File

@ -1,13 +1,11 @@
# List any generated files here
TARGETS = array_cursor.gen.go \
response_writer.gen.go \
flux_table.gen.go
# List any source files used to generate the targets here
SOURCES = gen.go \
array_cursor.gen.go.tmpl \
array_cursor.gen.go.tmpldata \
response_writer.gen.go.tmpl \
flux_table.gen.go.tmpl \
types.tmpldata

View File

@ -1,4 +1,3 @@
package reads
//go:generate env GO111MODULE=on go run github.com/benbjohnson/tmpl -data=@array_cursor.gen.go.tmpldata array_cursor.gen.go.tmpl
//go:generate env GO111MODULE=on go run github.com/benbjohnson/tmpl -data=@array_cursor.gen.go.tmpldata response_writer.gen.go.tmpl

View File

@ -1,169 +0,0 @@
package reads_test
import (
"context"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/pkg/data/gen"
"github.com/influxdata/influxdb/storage/reads"
"github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/influxdb/tsdb/cursors"
)
type seriesGeneratorCursorIterator struct {
g gen.SeriesGenerator
f floatTimeValuesGeneratorCursor
i integerTimeValuesGeneratorCursor
u unsignedTimeValuesGeneratorCursor
s stringTimeValuesGeneratorCursor
b booleanTimeValuesGeneratorCursor
cur cursors.Cursor
}
func (ci *seriesGeneratorCursorIterator) Next(ctx context.Context, r *cursors.CursorRequest) (cursors.Cursor, error) {
switch ci.g.FieldType() {
case models.Float:
ci.f.tv = ci.g.TimeValuesGenerator()
ci.cur = &ci.f
case models.Integer:
ci.i.tv = ci.g.TimeValuesGenerator()
ci.cur = &ci.i
case models.Unsigned:
ci.u.tv = ci.g.TimeValuesGenerator()
ci.cur = &ci.u
case models.String:
ci.s.tv = ci.g.TimeValuesGenerator()
ci.cur = &ci.s
case models.Boolean:
ci.b.tv = ci.g.TimeValuesGenerator()
ci.cur = &ci.b
default:
panic("unreachable")
}
return ci.cur, nil
}
func (ci *seriesGeneratorCursorIterator) Stats() cursors.CursorStats {
return ci.cur.Stats()
}
type seriesGeneratorSeriesCursor struct {
ci seriesGeneratorCursorIterator
r reads.SeriesRow
}
func newSeriesGeneratorSeriesCursor(g gen.SeriesGenerator) *seriesGeneratorSeriesCursor {
s := &seriesGeneratorSeriesCursor{}
s.ci.g = g
s.r.Query = tsdb.CursorIterators{&s.ci}
return s
}
func (s *seriesGeneratorSeriesCursor) Close() {}
func (s *seriesGeneratorSeriesCursor) Err() error { return nil }
func (s *seriesGeneratorSeriesCursor) Next() *reads.SeriesRow {
if s.ci.g.Next() {
s.r.SeriesTags = s.ci.g.Tags()
s.r.Tags = s.ci.g.Tags()
return &s.r
}
return nil
}
type timeValuesGeneratorCursor struct {
tv gen.TimeValuesSequence
stats cursors.CursorStats
}
func (t timeValuesGeneratorCursor) Close() {}
func (t timeValuesGeneratorCursor) Err() error { return nil }
func (t timeValuesGeneratorCursor) Stats() cursors.CursorStats { return t.stats }
type floatTimeValuesGeneratorCursor struct {
timeValuesGeneratorCursor
a tsdb.FloatArray
}
func (c *floatTimeValuesGeneratorCursor) Next() *cursors.FloatArray {
if c.tv.Next() {
c.tv.Values().(gen.FloatValues).Copy(&c.a)
} else {
c.a.Timestamps = c.a.Timestamps[:0]
c.a.Values = c.a.Values[:0]
}
c.stats.ScannedBytes += len(c.a.Values) * 8
c.stats.ScannedValues += c.a.Len()
return &c.a
}
type integerTimeValuesGeneratorCursor struct {
timeValuesGeneratorCursor
a tsdb.IntegerArray
}
func (c *integerTimeValuesGeneratorCursor) Next() *cursors.IntegerArray {
if c.tv.Next() {
c.tv.Values().(gen.IntegerValues).Copy(&c.a)
} else {
c.a.Timestamps = c.a.Timestamps[:0]
c.a.Values = c.a.Values[:0]
}
c.stats.ScannedBytes += len(c.a.Values) * 8
c.stats.ScannedValues += c.a.Len()
return &c.a
}
type unsignedTimeValuesGeneratorCursor struct {
timeValuesGeneratorCursor
a tsdb.UnsignedArray
}
func (c *unsignedTimeValuesGeneratorCursor) Next() *cursors.UnsignedArray {
if c.tv.Next() {
c.tv.Values().(gen.UnsignedValues).Copy(&c.a)
} else {
c.a.Timestamps = c.a.Timestamps[:0]
c.a.Values = c.a.Values[:0]
}
c.stats.ScannedBytes += len(c.a.Values) * 8
c.stats.ScannedValues += c.a.Len()
return &c.a
}
type stringTimeValuesGeneratorCursor struct {
timeValuesGeneratorCursor
a tsdb.StringArray
}
func (c *stringTimeValuesGeneratorCursor) Next() *cursors.StringArray {
if c.tv.Next() {
c.tv.Values().(gen.StringValues).Copy(&c.a)
} else {
c.a.Timestamps = c.a.Timestamps[:0]
c.a.Values = c.a.Values[:0]
}
for _, v := range c.a.Values {
c.stats.ScannedBytes += len(v)
}
c.stats.ScannedValues += c.a.Len()
return &c.a
}
type booleanTimeValuesGeneratorCursor struct {
timeValuesGeneratorCursor
a tsdb.BooleanArray
}
func (c *booleanTimeValuesGeneratorCursor) Next() *cursors.BooleanArray {
if c.tv.Next() {
c.tv.Values().(gen.BooleanValues).Copy(&c.a)
} else {
c.a.Timestamps = c.a.Timestamps[:0]
c.a.Values = c.a.Values[:0]
}
c.stats.ScannedBytes += len(c.a.Values)
c.stats.ScannedValues += c.a.Len()
return &c.a
}

View File

@ -1,527 +0,0 @@
// Generated by tmpl
// https://github.com/benbjohnson/tmpl
//
// DO NOT EDIT!
// Source: response_writer.gen.go.tmpl
package reads
import (
"github.com/influxdata/influxdb/storage/reads/datatypes"
"github.com/influxdata/influxdb/tsdb/cursors"
)
func (w *ResponseWriter) getFloatPointsFrame() *datatypes.ReadResponse_Frame_FloatPoints {
var res *datatypes.ReadResponse_Frame_FloatPoints
if len(w.buffer.Float) > 0 {
i := len(w.buffer.Float) - 1
res = w.buffer.Float[i]
w.buffer.Float[i] = nil
w.buffer.Float = w.buffer.Float[:i]
} else {
res = &datatypes.ReadResponse_Frame_FloatPoints{
FloatPoints: &datatypes.ReadResponse_FloatPointsFrame{
Timestamps: make([]int64, 0, batchSize),
Values: make([]float64, 0, batchSize),
},
}
}
return res
}
func (w *ResponseWriter) putFloatPointsFrame(f *datatypes.ReadResponse_Frame_FloatPoints) {
f.FloatPoints.Timestamps = f.FloatPoints.Timestamps[:0]
f.FloatPoints.Values = f.FloatPoints.Values[:0]
w.buffer.Float = append(w.buffer.Float, f)
}
func (w *ResponseWriter) streamFloatArraySeries(cur cursors.FloatArrayCursor) {
w.sf.DataType = datatypes.DataTypeFloat
ss := len(w.res.Frames) - 1
a := cur.Next()
if len(a.Timestamps) == 0 {
w.sz -= w.sf.Size()
w.putSeriesFrame(w.res.Frames[ss].Data.(*datatypes.ReadResponse_Frame_Series))
w.res.Frames = w.res.Frames[:ss]
} else if w.sz > writeSize {
w.Flush()
}
}
func (w *ResponseWriter) streamFloatArrayPoints(cur cursors.FloatArrayCursor) {
w.sf.DataType = datatypes.DataTypeFloat
ss := len(w.res.Frames) - 1
p := w.getFloatPointsFrame()
frame := p.FloatPoints
w.res.Frames = append(w.res.Frames, datatypes.ReadResponse_Frame{Data: p})
var seriesValueCount = 0
for {
// If the number of values produced by cur > 1000,
// cur.Next() will produce batches of values that are of
// length ≤ 1000.
// We attempt to limit the frame Timestamps / Values lengths
// the same to avoid allocations. These frames are recycled
// after flushing so that on repeated use there should be enough space
// to append values from a into frame without additional allocations.
a := cur.Next()
if len(a.Timestamps) == 0 {
break
}
seriesValueCount += a.Len()
// As specified in the struct definition, w.sz is an estimated
// size (in bytes) of the buffered data. It is therefore a
// deliberate choice to accumulate using the array Size, which is
// cheap to calculate. Calling frame.Size() can be expensive
// when using varint encoding for numbers.
w.sz += a.Size()
frame.Timestamps = append(frame.Timestamps, a.Timestamps...)
frame.Values = append(frame.Values, a.Values...)
// given the expectation of cur.Next, we attempt to limit
// the number of values appended to the frame to batchSize (1000)
needsFrame := len(frame.Timestamps) >= batchSize
if w.sz >= writeSize {
needsFrame = true
w.Flush()
if w.err != nil {
break
}
}
if needsFrame {
// new frames are returned with Timestamps and Values preallocated
// to a minimum of batchSize length to reduce further allocations.
p = w.getFloatPointsFrame()
frame = p.FloatPoints
w.res.Frames = append(w.res.Frames, datatypes.ReadResponse_Frame{Data: p})
}
}
w.vc += seriesValueCount
if seriesValueCount == 0 {
w.sz -= w.sf.Size()
w.putSeriesFrame(w.res.Frames[ss].Data.(*datatypes.ReadResponse_Frame_Series))
w.res.Frames = w.res.Frames[:ss]
} else if w.sz > writeSize {
w.Flush()
}
}
func (w *ResponseWriter) getIntegerPointsFrame() *datatypes.ReadResponse_Frame_IntegerPoints {
var res *datatypes.ReadResponse_Frame_IntegerPoints
if len(w.buffer.Integer) > 0 {
i := len(w.buffer.Integer) - 1
res = w.buffer.Integer[i]
w.buffer.Integer[i] = nil
w.buffer.Integer = w.buffer.Integer[:i]
} else {
res = &datatypes.ReadResponse_Frame_IntegerPoints{
IntegerPoints: &datatypes.ReadResponse_IntegerPointsFrame{
Timestamps: make([]int64, 0, batchSize),
Values: make([]int64, 0, batchSize),
},
}
}
return res
}
func (w *ResponseWriter) putIntegerPointsFrame(f *datatypes.ReadResponse_Frame_IntegerPoints) {
f.IntegerPoints.Timestamps = f.IntegerPoints.Timestamps[:0]
f.IntegerPoints.Values = f.IntegerPoints.Values[:0]
w.buffer.Integer = append(w.buffer.Integer, f)
}
func (w *ResponseWriter) streamIntegerArraySeries(cur cursors.IntegerArrayCursor) {
w.sf.DataType = datatypes.DataTypeInteger
ss := len(w.res.Frames) - 1
a := cur.Next()
if len(a.Timestamps) == 0 {
w.sz -= w.sf.Size()
w.putSeriesFrame(w.res.Frames[ss].Data.(*datatypes.ReadResponse_Frame_Series))
w.res.Frames = w.res.Frames[:ss]
} else if w.sz > writeSize {
w.Flush()
}
}
func (w *ResponseWriter) streamIntegerArrayPoints(cur cursors.IntegerArrayCursor) {
w.sf.DataType = datatypes.DataTypeInteger
ss := len(w.res.Frames) - 1
p := w.getIntegerPointsFrame()
frame := p.IntegerPoints
w.res.Frames = append(w.res.Frames, datatypes.ReadResponse_Frame{Data: p})
var seriesValueCount = 0
for {
// If the number of values produced by cur > 1000,
// cur.Next() will produce batches of values that are of
// length ≤ 1000.
// We attempt to limit the frame Timestamps / Values lengths
// the same to avoid allocations. These frames are recycled
// after flushing so that on repeated use there should be enough space
// to append values from a into frame without additional allocations.
a := cur.Next()
if len(a.Timestamps) == 0 {
break
}
seriesValueCount += a.Len()
// As specified in the struct definition, w.sz is an estimated
// size (in bytes) of the buffered data. It is therefore a
// deliberate choice to accumulate using the array Size, which is
// cheap to calculate. Calling frame.Size() can be expensive
// when using varint encoding for numbers.
w.sz += a.Size()
frame.Timestamps = append(frame.Timestamps, a.Timestamps...)
frame.Values = append(frame.Values, a.Values...)
// given the expectation of cur.Next, we attempt to limit
// the number of values appended to the frame to batchSize (1000)
needsFrame := len(frame.Timestamps) >= batchSize
if w.sz >= writeSize {
needsFrame = true
w.Flush()
if w.err != nil {
break
}
}
if needsFrame {
// new frames are returned with Timestamps and Values preallocated
// to a minimum of batchSize length to reduce further allocations.
p = w.getIntegerPointsFrame()
frame = p.IntegerPoints
w.res.Frames = append(w.res.Frames, datatypes.ReadResponse_Frame{Data: p})
}
}
w.vc += seriesValueCount
if seriesValueCount == 0 {
w.sz -= w.sf.Size()
w.putSeriesFrame(w.res.Frames[ss].Data.(*datatypes.ReadResponse_Frame_Series))
w.res.Frames = w.res.Frames[:ss]
} else if w.sz > writeSize {
w.Flush()
}
}
func (w *ResponseWriter) getUnsignedPointsFrame() *datatypes.ReadResponse_Frame_UnsignedPoints {
var res *datatypes.ReadResponse_Frame_UnsignedPoints
if len(w.buffer.Unsigned) > 0 {
i := len(w.buffer.Unsigned) - 1
res = w.buffer.Unsigned[i]
w.buffer.Unsigned[i] = nil
w.buffer.Unsigned = w.buffer.Unsigned[:i]
} else {
res = &datatypes.ReadResponse_Frame_UnsignedPoints{
UnsignedPoints: &datatypes.ReadResponse_UnsignedPointsFrame{
Timestamps: make([]int64, 0, batchSize),
Values: make([]uint64, 0, batchSize),
},
}
}
return res
}
func (w *ResponseWriter) putUnsignedPointsFrame(f *datatypes.ReadResponse_Frame_UnsignedPoints) {
f.UnsignedPoints.Timestamps = f.UnsignedPoints.Timestamps[:0]
f.UnsignedPoints.Values = f.UnsignedPoints.Values[:0]
w.buffer.Unsigned = append(w.buffer.Unsigned, f)
}
func (w *ResponseWriter) streamUnsignedArraySeries(cur cursors.UnsignedArrayCursor) {
w.sf.DataType = datatypes.DataTypeUnsigned
ss := len(w.res.Frames) - 1
a := cur.Next()
if len(a.Timestamps) == 0 {
w.sz -= w.sf.Size()
w.putSeriesFrame(w.res.Frames[ss].Data.(*datatypes.ReadResponse_Frame_Series))
w.res.Frames = w.res.Frames[:ss]
} else if w.sz > writeSize {
w.Flush()
}
}
func (w *ResponseWriter) streamUnsignedArrayPoints(cur cursors.UnsignedArrayCursor) {
w.sf.DataType = datatypes.DataTypeUnsigned
ss := len(w.res.Frames) - 1
p := w.getUnsignedPointsFrame()
frame := p.UnsignedPoints
w.res.Frames = append(w.res.Frames, datatypes.ReadResponse_Frame{Data: p})
var seriesValueCount = 0
for {
// If the number of values produced by cur > 1000,
// cur.Next() will produce batches of values that are of
// length ≤ 1000.
// We attempt to limit the frame Timestamps / Values lengths
// the same to avoid allocations. These frames are recycled
// after flushing so that on repeated use there should be enough space
// to append values from a into frame without additional allocations.
a := cur.Next()
if len(a.Timestamps) == 0 {
break
}
seriesValueCount += a.Len()
// As specified in the struct definition, w.sz is an estimated
// size (in bytes) of the buffered data. It is therefore a
// deliberate choice to accumulate using the array Size, which is
// cheap to calculate. Calling frame.Size() can be expensive
// when using varint encoding for numbers.
w.sz += a.Size()
frame.Timestamps = append(frame.Timestamps, a.Timestamps...)
frame.Values = append(frame.Values, a.Values...)
// given the expectation of cur.Next, we attempt to limit
// the number of values appended to the frame to batchSize (1000)
needsFrame := len(frame.Timestamps) >= batchSize
if w.sz >= writeSize {
needsFrame = true
w.Flush()
if w.err != nil {
break
}
}
if needsFrame {
// new frames are returned with Timestamps and Values preallocated
// to a minimum of batchSize length to reduce further allocations.
p = w.getUnsignedPointsFrame()
frame = p.UnsignedPoints
w.res.Frames = append(w.res.Frames, datatypes.ReadResponse_Frame{Data: p})
}
}
w.vc += seriesValueCount
if seriesValueCount == 0 {
w.sz -= w.sf.Size()
w.putSeriesFrame(w.res.Frames[ss].Data.(*datatypes.ReadResponse_Frame_Series))
w.res.Frames = w.res.Frames[:ss]
} else if w.sz > writeSize {
w.Flush()
}
}
func (w *ResponseWriter) getStringPointsFrame() *datatypes.ReadResponse_Frame_StringPoints {
var res *datatypes.ReadResponse_Frame_StringPoints
if len(w.buffer.String) > 0 {
i := len(w.buffer.String) - 1
res = w.buffer.String[i]
w.buffer.String[i] = nil
w.buffer.String = w.buffer.String[:i]
} else {
res = &datatypes.ReadResponse_Frame_StringPoints{
StringPoints: &datatypes.ReadResponse_StringPointsFrame{
Timestamps: make([]int64, 0, batchSize),
Values: make([]string, 0, batchSize),
},
}
}
return res
}
func (w *ResponseWriter) putStringPointsFrame(f *datatypes.ReadResponse_Frame_StringPoints) {
f.StringPoints.Timestamps = f.StringPoints.Timestamps[:0]
f.StringPoints.Values = f.StringPoints.Values[:0]
w.buffer.String = append(w.buffer.String, f)
}
func (w *ResponseWriter) streamStringArraySeries(cur cursors.StringArrayCursor) {
w.sf.DataType = datatypes.DataTypeString
ss := len(w.res.Frames) - 1
a := cur.Next()
if len(a.Timestamps) == 0 {
w.sz -= w.sf.Size()
w.putSeriesFrame(w.res.Frames[ss].Data.(*datatypes.ReadResponse_Frame_Series))
w.res.Frames = w.res.Frames[:ss]
} else if w.sz > writeSize {
w.Flush()
}
}
func (w *ResponseWriter) streamStringArrayPoints(cur cursors.StringArrayCursor) {
w.sf.DataType = datatypes.DataTypeString
ss := len(w.res.Frames) - 1
p := w.getStringPointsFrame()
frame := p.StringPoints
w.res.Frames = append(w.res.Frames, datatypes.ReadResponse_Frame{Data: p})
var seriesValueCount = 0
for {
// If the number of values produced by cur > 1000,
// cur.Next() will produce batches of values that are of
// length ≤ 1000.
// We attempt to limit the frame Timestamps / Values lengths
// the same to avoid allocations. These frames are recycled
// after flushing so that on repeated use there should be enough space
// to append values from a into frame without additional allocations.
a := cur.Next()
if len(a.Timestamps) == 0 {
break
}
seriesValueCount += a.Len()
// As specified in the struct definition, w.sz is an estimated
// size (in bytes) of the buffered data. It is therefore a
// deliberate choice to accumulate using the array Size, which is
// cheap to calculate. Calling frame.Size() can be expensive
// when using varint encoding for numbers.
w.sz += a.Size()
frame.Timestamps = append(frame.Timestamps, a.Timestamps...)
frame.Values = append(frame.Values, a.Values...)
// given the expectation of cur.Next, we attempt to limit
// the number of values appended to the frame to batchSize (1000)
needsFrame := len(frame.Timestamps) >= batchSize
if w.sz >= writeSize {
needsFrame = true
w.Flush()
if w.err != nil {
break
}
}
if needsFrame {
// new frames are returned with Timestamps and Values preallocated
// to a minimum of batchSize length to reduce further allocations.
p = w.getStringPointsFrame()
frame = p.StringPoints
w.res.Frames = append(w.res.Frames, datatypes.ReadResponse_Frame{Data: p})
}
}
w.vc += seriesValueCount
if seriesValueCount == 0 {
w.sz -= w.sf.Size()
w.putSeriesFrame(w.res.Frames[ss].Data.(*datatypes.ReadResponse_Frame_Series))
w.res.Frames = w.res.Frames[:ss]
} else if w.sz > writeSize {
w.Flush()
}
}
func (w *ResponseWriter) getBooleanPointsFrame() *datatypes.ReadResponse_Frame_BooleanPoints {
var res *datatypes.ReadResponse_Frame_BooleanPoints
if len(w.buffer.Boolean) > 0 {
i := len(w.buffer.Boolean) - 1
res = w.buffer.Boolean[i]
w.buffer.Boolean[i] = nil
w.buffer.Boolean = w.buffer.Boolean[:i]
} else {
res = &datatypes.ReadResponse_Frame_BooleanPoints{
BooleanPoints: &datatypes.ReadResponse_BooleanPointsFrame{
Timestamps: make([]int64, 0, batchSize),
Values: make([]bool, 0, batchSize),
},
}
}
return res
}
func (w *ResponseWriter) putBooleanPointsFrame(f *datatypes.ReadResponse_Frame_BooleanPoints) {
f.BooleanPoints.Timestamps = f.BooleanPoints.Timestamps[:0]
f.BooleanPoints.Values = f.BooleanPoints.Values[:0]
w.buffer.Boolean = append(w.buffer.Boolean, f)
}
func (w *ResponseWriter) streamBooleanArraySeries(cur cursors.BooleanArrayCursor) {
w.sf.DataType = datatypes.DataTypeBoolean
ss := len(w.res.Frames) - 1
a := cur.Next()
if len(a.Timestamps) == 0 {
w.sz -= w.sf.Size()
w.putSeriesFrame(w.res.Frames[ss].Data.(*datatypes.ReadResponse_Frame_Series))
w.res.Frames = w.res.Frames[:ss]
} else if w.sz > writeSize {
w.Flush()
}
}
func (w *ResponseWriter) streamBooleanArrayPoints(cur cursors.BooleanArrayCursor) {
w.sf.DataType = datatypes.DataTypeBoolean
ss := len(w.res.Frames) - 1
p := w.getBooleanPointsFrame()
frame := p.BooleanPoints
w.res.Frames = append(w.res.Frames, datatypes.ReadResponse_Frame{Data: p})
var seriesValueCount = 0
for {
// If the number of values produced by cur > 1000,
// cur.Next() will produce batches of values that are of
// length ≤ 1000.
// We attempt to limit the frame Timestamps / Values lengths
// the same to avoid allocations. These frames are recycled
// after flushing so that on repeated use there should be enough space
// to append values from a into frame without additional allocations.
a := cur.Next()
if len(a.Timestamps) == 0 {
break
}
seriesValueCount += a.Len()
// As specified in the struct definition, w.sz is an estimated
// size (in bytes) of the buffered data. It is therefore a
// deliberate choice to accumulate using the array Size, which is
// cheap to calculate. Calling frame.Size() can be expensive
// when using varint encoding for numbers.
w.sz += a.Size()
frame.Timestamps = append(frame.Timestamps, a.Timestamps...)
frame.Values = append(frame.Values, a.Values...)
// given the expectation of cur.Next, we attempt to limit
// the number of values appended to the frame to batchSize (1000)
needsFrame := len(frame.Timestamps) >= batchSize
if w.sz >= writeSize {
needsFrame = true
w.Flush()
if w.err != nil {
break
}
}
if needsFrame {
// new frames are returned with Timestamps and Values preallocated
// to a minimum of batchSize length to reduce further allocations.
p = w.getBooleanPointsFrame()
frame = p.BooleanPoints
w.res.Frames = append(w.res.Frames, datatypes.ReadResponse_Frame{Data: p})
}
}
w.vc += seriesValueCount
if seriesValueCount == 0 {
w.sz -= w.sf.Size()
w.putSeriesFrame(w.res.Frames[ss].Data.(*datatypes.ReadResponse_Frame_Series))
w.res.Frames = w.res.Frames[:ss]
} else if w.sz > writeSize {
w.Flush()
}
}

View File

@ -1,112 +0,0 @@
package reads
import (
"github.com/influxdata/influxdb/storage/reads/datatypes"
"github.com/influxdata/influxdb/tsdb/cursors"
)
{{range .}}
func (w *ResponseWriter) get{{.Name}}PointsFrame() *datatypes.ReadResponse_Frame_{{.Name}}Points {
var res *datatypes.ReadResponse_Frame_{{.Name}}Points
if len(w.buffer.{{.Name}}) > 0 {
i := len(w.buffer.{{.Name}}) - 1
res = w.buffer.{{.Name}}[i]
w.buffer.{{.Name}}[i] = nil
w.buffer.{{.Name}} = w.buffer.{{.Name}}[:i]
} else {
res = &datatypes.ReadResponse_Frame_{{.Name}}Points{
{{.Name}}Points: &datatypes.ReadResponse_{{.Name}}PointsFrame{
Timestamps: make([]int64, 0, batchSize),
Values: make([]{{.Type}}, 0, batchSize),
},
}
}
return res
}
func (w *ResponseWriter) put{{.Name}}PointsFrame(f *datatypes.ReadResponse_Frame_{{.Name}}Points) {
f.{{.Name}}Points.Timestamps = f.{{.Name}}Points.Timestamps[:0]
f.{{.Name}}Points.Values = f.{{.Name}}Points.Values[:0]
w.buffer.{{.Name}} = append(w.buffer.{{.Name}}, f)
}
func (w *ResponseWriter) stream{{.Name}}ArraySeries(cur cursors.{{.Name}}ArrayCursor) {
w.sf.DataType = datatypes.DataType{{.Name}}
ss := len(w.res.Frames) - 1
a := cur.Next()
if len(a.Timestamps) == 0 {
w.sz -= w.sf.Size()
w.putSeriesFrame(w.res.Frames[ss].Data.(*datatypes.ReadResponse_Frame_Series))
w.res.Frames = w.res.Frames[:ss]
} else if w.sz > writeSize {
w.Flush()
}
}
func (w *ResponseWriter) stream{{.Name}}ArrayPoints(cur cursors.{{.Name}}ArrayCursor) {
w.sf.DataType = datatypes.DataType{{.Name}}
ss := len(w.res.Frames) - 1
p := w.get{{.Name}}PointsFrame()
frame := p.{{.Name}}Points
w.res.Frames = append(w.res.Frames, datatypes.ReadResponse_Frame{Data: p})
var seriesValueCount = 0
for {
// If the number of values produced by cur > 1000,
// cur.Next() will produce batches of values that are of
// length ≤ 1000.
// We attempt to limit the frame Timestamps / Values lengths
// the same to avoid allocations. These frames are recycled
// after flushing so that on repeated use there should be enough space
// to append values from a into frame without additional allocations.
a := cur.Next()
if len(a.Timestamps) == 0 {
break
}
seriesValueCount += a.Len()
// As specified in the struct definition, w.sz is an estimated
// size (in bytes) of the buffered data. It is therefore a
// deliberate choice to accumulate using the array Size, which is
// cheap to calculate. Calling frame.Size() can be expensive
// when using varint encoding for numbers.
w.sz += a.Size()
frame.Timestamps = append(frame.Timestamps, a.Timestamps...)
frame.Values = append(frame.Values, a.Values...)
// given the expectation of cur.Next, we attempt to limit
// the number of values appended to the frame to batchSize (1000)
needsFrame := len(frame.Timestamps) >= batchSize
if w.sz >= writeSize {
needsFrame = true
w.Flush()
if w.err != nil {
break
}
}
if needsFrame {
// new frames are returned with Timestamps and Values preallocated
// to a minimum of batchSize length to reduce further allocations.
p = w.get{{.Name}}PointsFrame()
frame = p.{{.Name}}Points
w.res.Frames = append(w.res.Frames, datatypes.ReadResponse_Frame{Data: p})
}
}
w.vc += seriesValueCount
if seriesValueCount == 0 {
w.sz -= w.sf.Size()
w.putSeriesFrame(w.res.Frames[ss].Data.(*datatypes.ReadResponse_Frame_Series))
w.res.Frames = w.res.Frames[:ss]
} else if w.sz > writeSize {
w.Flush()
}
}
{{end}}

View File

@ -1,287 +0,0 @@
package reads
import (
"fmt"
"google.golang.org/grpc/metadata"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/storage/reads/datatypes"
"github.com/influxdata/influxdb/tsdb/cursors"
)
type ResponseStream interface {
Send(*datatypes.ReadResponse) error
// SetTrailer sets the trailer metadata which will be sent with the RPC status.
// When called more than once, all the provided metadata will be merged.
SetTrailer(metadata.MD)
}
const (
batchSize = 1000
frameCount = 50
writeSize = 64 << 10 // 64k
)
type ResponseWriter struct {
stream ResponseStream
res *datatypes.ReadResponse
err error
// current series
sf *datatypes.ReadResponse_SeriesFrame
ss int // pointer to current series frame; used to skip writing if no points
// sz is an estimated size in bytes for pending writes to flush periodically
// when the size exceeds writeSize.
sz int
vc int // total value count
buffer struct {
Float []*datatypes.ReadResponse_Frame_FloatPoints
Integer []*datatypes.ReadResponse_Frame_IntegerPoints
Unsigned []*datatypes.ReadResponse_Frame_UnsignedPoints
Boolean []*datatypes.ReadResponse_Frame_BooleanPoints
String []*datatypes.ReadResponse_Frame_StringPoints
Series []*datatypes.ReadResponse_Frame_Series
Group []*datatypes.ReadResponse_Frame_Group
}
hints datatypes.HintFlags
}
func NewResponseWriter(stream ResponseStream, hints datatypes.HintFlags) *ResponseWriter {
rw := &ResponseWriter{stream: stream,
res: &datatypes.ReadResponse{
Frames: make([]datatypes.ReadResponse_Frame, 0, frameCount),
},
hints: hints,
}
return rw
}
// WrittenN returns the number of values written to the response stream.
func (w *ResponseWriter) WrittenN() int { return w.vc }
func (w *ResponseWriter) WriteResultSet(rs ResultSet) error {
for rs.Next() {
cur := rs.Cursor()
if cur == nil {
// no data for series key + field combination
continue
}
w.startSeries(rs.Tags())
w.streamCursor(cur)
if w.err != nil {
cur.Close()
return w.err
}
}
stats := rs.Stats()
w.stream.SetTrailer(metadata.Pairs(
"scanned-bytes", fmt.Sprint(stats.ScannedBytes),
"scanned-values", fmt.Sprint(stats.ScannedValues)))
return nil
}
func (w *ResponseWriter) WriteGroupResultSet(rs GroupResultSet) error {
stats := cursors.CursorStats{}
gc := rs.Next()
for gc != nil {
w.startGroup(gc.Keys(), gc.PartitionKeyVals())
for gc.Next() {
cur := gc.Cursor()
if cur == nil {
// no data for series key + field combination
continue
}
w.startSeries(gc.Tags())
w.streamCursor(cur)
if w.err != nil {
gc.Close()
return w.err
}
stats.Add(gc.Stats())
}
gc.Close()
gc = rs.Next()
}
w.stream.SetTrailer(metadata.Pairs(
"scanned-bytes", fmt.Sprint(stats.ScannedBytes),
"scanned-values", fmt.Sprint(stats.ScannedValues)))
return nil
}
func (w *ResponseWriter) Err() error { return w.err }
func (w *ResponseWriter) getGroupFrame(keys, partitionKey [][]byte) *datatypes.ReadResponse_Frame_Group {
var res *datatypes.ReadResponse_Frame_Group
if len(w.buffer.Group) > 0 {
i := len(w.buffer.Group) - 1
res = w.buffer.Group[i]
w.buffer.Group[i] = nil
w.buffer.Group = w.buffer.Group[:i]
} else {
res = &datatypes.ReadResponse_Frame_Group{Group: &datatypes.ReadResponse_GroupFrame{}}
}
if cap(res.Group.TagKeys) < len(keys) {
res.Group.TagKeys = make([][]byte, len(keys))
} else if len(res.Group.TagKeys) != len(keys) {
res.Group.TagKeys = res.Group.TagKeys[:len(keys)]
}
if cap(res.Group.PartitionKeyVals) < len(partitionKey) {
res.Group.PartitionKeyVals = make([][]byte, len(partitionKey))
} else if len(res.Group.PartitionKeyVals) != len(partitionKey) {
res.Group.PartitionKeyVals = res.Group.PartitionKeyVals[:len(partitionKey)]
}
return res
}
func (w *ResponseWriter) putGroupFrame(f *datatypes.ReadResponse_Frame_Group) {
for i := range f.Group.TagKeys {
f.Group.TagKeys[i] = nil
}
for i := range f.Group.PartitionKeyVals {
f.Group.PartitionKeyVals[i] = nil
}
w.buffer.Group = append(w.buffer.Group, f)
}
func (w *ResponseWriter) getSeriesFrame(next models.Tags) *datatypes.ReadResponse_Frame_Series {
var res *datatypes.ReadResponse_Frame_Series
if len(w.buffer.Series) > 0 {
i := len(w.buffer.Series) - 1
res = w.buffer.Series[i]
w.buffer.Series[i] = nil
w.buffer.Series = w.buffer.Series[:i]
} else {
res = &datatypes.ReadResponse_Frame_Series{Series: &datatypes.ReadResponse_SeriesFrame{}}
}
if cap(res.Series.Tags) < len(next) {
res.Series.Tags = make([]datatypes.Tag, len(next))
} else if len(res.Series.Tags) != len(next) {
res.Series.Tags = res.Series.Tags[:len(next)]
}
return res
}
func (w *ResponseWriter) putSeriesFrame(f *datatypes.ReadResponse_Frame_Series) {
tags := f.Series.Tags
for i := range tags {
tags[i].Key = nil
tags[i].Value = nil
}
w.buffer.Series = append(w.buffer.Series, f)
}
func (w *ResponseWriter) startGroup(keys, partitionKey [][]byte) {
f := w.getGroupFrame(keys, partitionKey)
copy(f.Group.TagKeys, keys)
copy(f.Group.PartitionKeyVals, partitionKey)
w.res.Frames = append(w.res.Frames, datatypes.ReadResponse_Frame{Data: f})
w.sz += f.Size()
}
func (w *ResponseWriter) startSeries(next models.Tags) {
if w.hints.NoSeries() {
return
}
w.ss = len(w.res.Frames)
f := w.getSeriesFrame(next)
w.sf = f.Series
for i, t := range next {
w.sf.Tags[i] = datatypes.Tag{
Key: t.Key,
Value: t.Value,
}
}
w.res.Frames = append(w.res.Frames, datatypes.ReadResponse_Frame{Data: f})
w.sz += w.sf.Size()
}
func (w *ResponseWriter) streamCursor(cur cursors.Cursor) {
switch {
case w.hints.NoSeries():
// skip
case w.hints.NoPoints():
switch cur := cur.(type) {
case cursors.IntegerArrayCursor:
w.streamIntegerArraySeries(cur)
case cursors.FloatArrayCursor:
w.streamFloatArraySeries(cur)
case cursors.UnsignedArrayCursor:
w.streamUnsignedArraySeries(cur)
case cursors.BooleanArrayCursor:
w.streamBooleanArraySeries(cur)
case cursors.StringArrayCursor:
w.streamStringArraySeries(cur)
default:
panic(fmt.Sprintf("unreachable: %T", cur))
}
default:
switch cur := cur.(type) {
case cursors.IntegerArrayCursor:
w.streamIntegerArrayPoints(cur)
case cursors.FloatArrayCursor:
w.streamFloatArrayPoints(cur)
case cursors.UnsignedArrayCursor:
w.streamUnsignedArrayPoints(cur)
case cursors.BooleanArrayCursor:
w.streamBooleanArrayPoints(cur)
case cursors.StringArrayCursor:
w.streamStringArrayPoints(cur)
default:
panic(fmt.Sprintf("unreachable: %T", cur))
}
}
cur.Close()
}
func (w *ResponseWriter) Flush() {
if w.err != nil || w.sz == 0 {
return
}
w.sz = 0
if w.err = w.stream.Send(w.res); w.err != nil {
return
}
for i := range w.res.Frames {
d := w.res.Frames[i].Data
w.res.Frames[i].Data = nil
switch p := d.(type) {
case *datatypes.ReadResponse_Frame_FloatPoints:
w.putFloatPointsFrame(p)
case *datatypes.ReadResponse_Frame_IntegerPoints:
w.putIntegerPointsFrame(p)
case *datatypes.ReadResponse_Frame_UnsignedPoints:
w.putUnsignedPointsFrame(p)
case *datatypes.ReadResponse_Frame_BooleanPoints:
w.putBooleanPointsFrame(p)
case *datatypes.ReadResponse_Frame_StringPoints:
w.putStringPointsFrame(p)
case *datatypes.ReadResponse_Frame_Series:
w.putSeriesFrame(p)
case *datatypes.ReadResponse_Frame_Group:
w.putGroupFrame(p)
}
}
w.res.Frames = w.res.Frames[:0]
}

View File

@ -1,534 +0,0 @@
package reads_test
import (
"context"
"errors"
"fmt"
"reflect"
"strings"
"testing"
"time"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/mock"
"github.com/influxdata/influxdb/pkg/data/gen"
"github.com/influxdata/influxdb/pkg/testing/assert"
"github.com/influxdata/influxdb/storage/reads"
"github.com/influxdata/influxdb/storage/reads/datatypes"
"github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/influxdb/tsdb/cursors"
"google.golang.org/grpc/metadata"
)
func TestResponseWriter_WriteResultSet_Stats(t *testing.T) {
scannedValues := 37
scannedBytes := 41
var gotTrailer metadata.MD = nil
stream := mock.NewResponseStream()
stream.SetTrailerFunc = func(trailer metadata.MD) {
if gotTrailer != nil {
t.Error("trailer expected to be set once, but SetTrailer was called more than once")
} else {
gotTrailer = trailer
}
}
rs := mock.NewResultSet()
rs.StatsFunc = func() cursors.CursorStats {
return cursors.CursorStats{
ScannedValues: scannedValues,
ScannedBytes: scannedBytes,
}
}
nextHasBeenCalledOnce := false
rs.NextFunc = func() bool { // Returns true exactly once
if !nextHasBeenCalledOnce {
nextHasBeenCalledOnce = true
return true
}
return false
}
cursorHasBeenCalledOnce := false
rs.CursorFunc = func() cursors.Cursor {
if !cursorHasBeenCalledOnce {
cursorHasBeenCalledOnce = true
return mock.NewIntegerArrayCursor()
}
return nil
}
// This is what we're testing.
rw := reads.NewResponseWriter(stream, 0)
err := rw.WriteResultSet(rs)
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(gotTrailer.Get("scanned-values"), []string{fmt.Sprint(scannedValues)}) {
t.Errorf("expected scanned-values '%v' but got '%v'", []string{fmt.Sprint(scannedValues)}, gotTrailer.Get("scanned-values"))
}
if !reflect.DeepEqual(gotTrailer.Get("scanned-bytes"), []string{fmt.Sprint(scannedBytes)}) {
t.Errorf("expected scanned-bytes '%v' but got '%v'", []string{fmt.Sprint(scannedBytes)}, gotTrailer.Get("scanned-bytes"))
}
}
func TestResponseWriter_WriteGroupResultSet_Stats(t *testing.T) {
scannedValues := 37
scannedBytes := 41
var gotTrailer metadata.MD = nil
stream := mock.NewResponseStream()
stream.SetTrailerFunc = func(trailer metadata.MD) {
if gotTrailer != nil {
t.Error("trailer expected to be set once, but SetTrailer was called more than once")
} else {
gotTrailer = trailer
}
}
gc := mock.NewGroupCursor()
gc.StatsFunc = func() cursors.CursorStats {
return cursors.CursorStats{
ScannedValues: scannedValues,
ScannedBytes: scannedBytes,
}
}
cNextHasBeenCalledOnce := false
gc.NextFunc = func() bool {
if !cNextHasBeenCalledOnce {
cNextHasBeenCalledOnce = true
return true
}
return false
}
gc.CursorFunc = func() cursors.Cursor {
return mock.NewIntegerArrayCursor()
}
rs := mock.NewGroupResultSet()
rsNextHasBeenCalledOnce := false
rs.NextFunc = func() reads.GroupCursor {
if !rsNextHasBeenCalledOnce {
rsNextHasBeenCalledOnce = true
return gc
}
return nil
}
// This is what we're testing.
rw := reads.NewResponseWriter(stream, 0)
err := rw.WriteGroupResultSet(rs)
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(gotTrailer.Get("scanned-values"), []string{fmt.Sprint(scannedValues)}) {
t.Errorf("expected scanned-values '%v' but got '%v'", []string{fmt.Sprint(scannedValues)}, gotTrailer.Get("scanned-values"))
}
if !reflect.DeepEqual(gotTrailer.Get("scanned-bytes"), []string{fmt.Sprint(scannedBytes)}) {
t.Errorf("expected scanned-bytes '%v' but got '%v'", []string{fmt.Sprint(scannedBytes)}, gotTrailer.Get("scanned-bytes"))
}
}
var (
org = influxdb.ID(0xff00ff00)
bucket = influxdb.ID(0xcc00cc00)
orgBucketID = tsdb.EncodeName(org, bucket)
)
func makeTypedSeries(m, prefix, field string, val interface{}, valueCount int, counts ...int) gen.SeriesGenerator {
spec := gen.TimeSequenceSpec{Count: valueCount, Start: time.Unix(0, 0), Delta: time.Second}
ts := gen.NewTimestampSequenceFromSpec(spec)
var vg gen.TimeValuesSequence
switch val := val.(type) {
case float64:
vg = gen.NewTimeFloatValuesSequence(spec.Count, ts, gen.NewFloatConstantValuesSequence(val))
case int64:
vg = gen.NewTimeIntegerValuesSequence(spec.Count, ts, gen.NewIntegerConstantValuesSequence(val))
case int:
vg = gen.NewTimeIntegerValuesSequence(spec.Count, ts, gen.NewIntegerConstantValuesSequence(int64(val)))
case uint64:
vg = gen.NewTimeUnsignedValuesSequence(spec.Count, ts, gen.NewUnsignedConstantValuesSequence(val))
case string:
vg = gen.NewTimeStringValuesSequence(spec.Count, ts, gen.NewStringConstantValuesSequence(val))
case bool:
vg = gen.NewTimeBooleanValuesSequence(spec.Count, ts, gen.NewBooleanConstantValuesSequence(val))
default:
panic(fmt.Sprintf("unexpected type %T", val))
}
return gen.NewSeriesGenerator(orgBucketID, []byte(field), vg, gen.NewTagsValuesSequenceCounts(m, field, prefix, counts))
}
type sendSummary struct {
groupCount int
seriesCount int
floatCount int
integerCount int
unsignedCount int
stringCount int
booleanCount int
}
func (ss *sendSummary) makeSendFunc() func(*datatypes.ReadResponse) error {
return func(r *datatypes.ReadResponse) error {
for i := range r.Frames {
d := r.Frames[i].Data
switch p := d.(type) {
case *datatypes.ReadResponse_Frame_FloatPoints:
ss.floatCount += len(p.FloatPoints.Values)
case *datatypes.ReadResponse_Frame_IntegerPoints:
ss.integerCount += len(p.IntegerPoints.Values)
case *datatypes.ReadResponse_Frame_UnsignedPoints:
ss.unsignedCount += len(p.UnsignedPoints.Values)
case *datatypes.ReadResponse_Frame_StringPoints:
ss.stringCount += len(p.StringPoints.Values)
case *datatypes.ReadResponse_Frame_BooleanPoints:
ss.booleanCount += len(p.BooleanPoints.Values)
case *datatypes.ReadResponse_Frame_Series:
ss.seriesCount++
case *datatypes.ReadResponse_Frame_Group:
ss.groupCount++
default:
panic("unexpected")
}
}
return nil
}
}
func TestResponseWriter_WriteResultSet(t *testing.T) {
t.Run("normal", func(t *testing.T) {
t.Run("all types one series each", func(t *testing.T) {
exp := sendSummary{
seriesCount: 5,
floatCount: 500,
integerCount: 400,
unsignedCount: 300,
stringCount: 200,
booleanCount: 100,
}
var ss sendSummary
stream := mock.NewResponseStream()
stream.SendFunc = ss.makeSendFunc()
w := reads.NewResponseWriter(stream, 0)
var gens []gen.SeriesGenerator
gens = append(gens, makeTypedSeries("m0", "t", "ff", 3.3, exp.floatCount, 1))
gens = append(gens, makeTypedSeries("m0", "t", "if", 100, exp.integerCount, 1))
gens = append(gens, makeTypedSeries("m0", "t", "uf", uint64(25), exp.unsignedCount, 1))
gens = append(gens, makeTypedSeries("m0", "t", "sf", "foo", exp.stringCount, 1))
gens = append(gens, makeTypedSeries("m0", "t", "bf", false, exp.booleanCount, 1))
cur := newSeriesGeneratorSeriesCursor(gen.NewMergedSeriesGenerator(gens))
rs := reads.NewFilteredResultSet(context.Background(), &datatypes.ReadFilterRequest{}, cur)
err := w.WriteResultSet(rs)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
w.Flush()
assert.Equal(t, ss, exp)
})
t.Run("multi-series floats", func(t *testing.T) {
exp := sendSummary{
seriesCount: 5,
floatCount: 8600,
}
var ss sendSummary
stream := mock.NewResponseStream()
stream.SendFunc = ss.makeSendFunc()
w := reads.NewResponseWriter(stream, 0)
var gens []gen.SeriesGenerator
gens = append(gens, makeTypedSeries("m0", "t", "f0", 3.3, 2000, 1))
gens = append(gens, makeTypedSeries("m0", "t", "f1", 5.3, 1500, 1))
gens = append(gens, makeTypedSeries("m0", "t", "f2", 5.3, 2500, 1))
gens = append(gens, makeTypedSeries("m0", "t", "f3", -2.2, 900, 1))
gens = append(gens, makeTypedSeries("m0", "t", "f4", -9.2, 1700, 1))
cur := newSeriesGeneratorSeriesCursor(gen.NewMergedSeriesGenerator(gens))
rs := reads.NewFilteredResultSet(context.Background(), &datatypes.ReadFilterRequest{}, cur)
err := w.WriteResultSet(rs)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
w.Flush()
assert.Equal(t, ss, exp)
})
t.Run("multi-series strings", func(t *testing.T) {
exp := sendSummary{
seriesCount: 4,
stringCount: 6900,
}
var ss sendSummary
stream := mock.NewResponseStream()
stream.SendFunc = ss.makeSendFunc()
w := reads.NewResponseWriter(stream, 0)
var gens []gen.SeriesGenerator
gens = append(gens, makeTypedSeries("m0", "t", "s0", strings.Repeat("aaa", 100), 2000, 1))
gens = append(gens, makeTypedSeries("m0", "t", "s1", strings.Repeat("bbb", 200), 1500, 1))
gens = append(gens, makeTypedSeries("m0", "t", "s2", strings.Repeat("ccc", 300), 2500, 1))
gens = append(gens, makeTypedSeries("m0", "t", "s3", strings.Repeat("ddd", 200), 900, 1))
cur := newSeriesGeneratorSeriesCursor(gen.NewMergedSeriesGenerator(gens))
rs := reads.NewFilteredResultSet(context.Background(), &datatypes.ReadFilterRequest{}, cur)
err := w.WriteResultSet(rs)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
w.Flush()
assert.Equal(t, ss, exp)
})
t.Run("writer doesn't send series with no values", func(t *testing.T) {
exp := sendSummary{
seriesCount: 2,
stringCount: 3700,
}
var ss sendSummary
stream := mock.NewResponseStream()
stream.SendFunc = ss.makeSendFunc()
w := reads.NewResponseWriter(stream, 0)
var gens []gen.SeriesGenerator
gens = append(gens, makeTypedSeries("m0", "t", "s0", strings.Repeat("aaa", 100), 2000, 1))
gens = append(gens, makeTypedSeries("m0", "t", "s1", strings.Repeat("bbb", 200), 0, 1))
gens = append(gens, makeTypedSeries("m0", "t", "s2", strings.Repeat("ccc", 100), 1700, 1))
cur := newSeriesGeneratorSeriesCursor(gen.NewMergedSeriesGenerator(gens))
rs := reads.NewFilteredResultSet(context.Background(), &datatypes.ReadFilterRequest{}, cur)
err := w.WriteResultSet(rs)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
w.Flush()
assert.Equal(t, ss, exp)
})
})
t.Run("error conditions", func(t *testing.T) {
t.Run("writer returns stream error", func(t *testing.T) {
exp := errors.New("no write")
stream := mock.NewResponseStream()
stream.SendFunc = func(r *datatypes.ReadResponse) error { return exp }
w := reads.NewResponseWriter(stream, 0)
cur := newSeriesGeneratorSeriesCursor(makeTypedSeries("m0", "t", "f0", strings.Repeat("0", 1000), 2000, 1))
rs := reads.NewFilteredResultSet(context.Background(), &datatypes.ReadFilterRequest{}, cur)
_ = w.WriteResultSet(rs)
assert.Equal(t, w.Err(), exp)
})
})
t.Run("issues", func(t *testing.T) {
t.Run("short write", func(t *testing.T) {
t.Run("single string series", func(t *testing.T) {
exp := sendSummary{seriesCount: 1, stringCount: 1020}
var ss sendSummary
stream := mock.NewResponseStream()
stream.SendFunc = ss.makeSendFunc()
w := reads.NewResponseWriter(stream, 0)
cur := newSeriesGeneratorSeriesCursor(makeTypedSeries("m0", "t", "f0", strings.Repeat("0", 1000), exp.stringCount, 1))
rs := reads.NewFilteredResultSet(context.Background(), &datatypes.ReadFilterRequest{}, cur)
err := w.WriteResultSet(rs)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
w.Flush()
assert.Equal(t, ss, exp)
})
t.Run("single float series", func(t *testing.T) {
exp := sendSummary{seriesCount: 1, floatCount: 50500}
var ss sendSummary
stream := mock.NewResponseStream()
stream.SendFunc = ss.makeSendFunc()
w := reads.NewResponseWriter(stream, 0)
cur := newSeriesGeneratorSeriesCursor(makeTypedSeries("m0", "t", "f0", 5.5, exp.floatCount, 1))
rs := reads.NewFilteredResultSet(context.Background(), &datatypes.ReadFilterRequest{}, cur)
err := w.WriteResultSet(rs)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
w.Flush()
assert.Equal(t, ss, exp)
})
t.Run("multi series", func(t *testing.T) {
exp := sendSummary{seriesCount: 2, stringCount: 3700}
var ss sendSummary
stream := mock.NewResponseStream()
stream.SendFunc = ss.makeSendFunc()
w := reads.NewResponseWriter(stream, 0)
var gens []gen.SeriesGenerator
gens = append(gens, makeTypedSeries("m0", "t", "s0", strings.Repeat("aaa", 1000), 2200, 1))
gens = append(gens, makeTypedSeries("m0", "t", "s1", strings.Repeat("bbb", 1000), 1500, 1))
cur := newSeriesGeneratorSeriesCursor(gen.NewMergedSeriesGenerator(gens))
rs := reads.NewFilteredResultSet(context.Background(), &datatypes.ReadFilterRequest{}, cur)
err := w.WriteResultSet(rs)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
w.Flush()
assert.Equal(t, ss, exp)
})
})
})
}
func TestResponseWriter_WriteGroupResultSet(t *testing.T) {
t.Run("normal", func(t *testing.T) {
t.Run("all types one series each", func(t *testing.T) {
exp := sendSummary{
groupCount: 1,
seriesCount: 5,
floatCount: 500,
integerCount: 400,
unsignedCount: 300,
stringCount: 200,
booleanCount: 100,
}
var ss sendSummary
stream := mock.NewResponseStream()
stream.SendFunc = ss.makeSendFunc()
w := reads.NewResponseWriter(stream, 0)
newCursor := func() (cursor reads.SeriesCursor, e error) {
var gens []gen.SeriesGenerator
gens = append(gens, makeTypedSeries("m0", "t", "ff", 3.3, exp.floatCount, 1))
gens = append(gens, makeTypedSeries("m0", "t", "if", 100, exp.integerCount, 1))
gens = append(gens, makeTypedSeries("m0", "t", "uf", uint64(25), exp.unsignedCount, 1))
gens = append(gens, makeTypedSeries("m0", "t", "sf", "foo", exp.stringCount, 1))
gens = append(gens, makeTypedSeries("m0", "t", "bf", false, exp.booleanCount, 1))
return newSeriesGeneratorSeriesCursor(gen.NewMergedSeriesGenerator(gens)), nil
}
rs := reads.NewGroupResultSet(context.Background(), &datatypes.ReadGroupRequest{Group: datatypes.GroupNone}, newCursor)
err := w.WriteGroupResultSet(rs)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
w.Flush()
assert.Equal(t, ss, exp)
})
t.Run("multi-series floats", func(t *testing.T) {
exp := sendSummary{
groupCount: 1,
seriesCount: 5,
floatCount: 8600,
}
var ss sendSummary
stream := mock.NewResponseStream()
stream.SendFunc = ss.makeSendFunc()
w := reads.NewResponseWriter(stream, 0)
newCursor := func() (cursor reads.SeriesCursor, e error) {
var gens []gen.SeriesGenerator
gens = append(gens, makeTypedSeries("m0", "t", "f0", 3.3, 2000, 1))
gens = append(gens, makeTypedSeries("m0", "t", "f1", 5.3, 1500, 1))
gens = append(gens, makeTypedSeries("m0", "t", "f2", 5.3, 2500, 1))
gens = append(gens, makeTypedSeries("m0", "t", "f3", -2.2, 900, 1))
gens = append(gens, makeTypedSeries("m0", "t", "f4", -9.2, 1700, 1))
return newSeriesGeneratorSeriesCursor(gen.NewMergedSeriesGenerator(gens)), nil
}
rs := reads.NewGroupResultSet(context.Background(), &datatypes.ReadGroupRequest{Group: datatypes.GroupNone}, newCursor)
err := w.WriteGroupResultSet(rs)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
w.Flush()
assert.Equal(t, ss, exp)
})
t.Run("multi-series strings", func(t *testing.T) {
exp := sendSummary{
groupCount: 1,
seriesCount: 4,
stringCount: 6900,
}
var ss sendSummary
stream := mock.NewResponseStream()
stream.SendFunc = ss.makeSendFunc()
w := reads.NewResponseWriter(stream, 0)
newCursor := func() (cursor reads.SeriesCursor, e error) {
var gens []gen.SeriesGenerator
gens = append(gens, makeTypedSeries("m0", "t", "s0", strings.Repeat("aaa", 100), 2000, 1))
gens = append(gens, makeTypedSeries("m0", "t", "s1", strings.Repeat("bbb", 200), 1500, 1))
gens = append(gens, makeTypedSeries("m0", "t", "s2", strings.Repeat("ccc", 300), 2500, 1))
gens = append(gens, makeTypedSeries("m0", "t", "s3", strings.Repeat("ddd", 200), 900, 1))
return newSeriesGeneratorSeriesCursor(gen.NewMergedSeriesGenerator(gens)), nil
}
rs := reads.NewGroupResultSet(context.Background(), &datatypes.ReadGroupRequest{Group: datatypes.GroupNone}, newCursor)
err := w.WriteGroupResultSet(rs)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
w.Flush()
assert.Equal(t, ss, exp)
})
t.Run("writer doesn't send series with no values", func(t *testing.T) {
exp := sendSummary{
groupCount: 1,
seriesCount: 2,
stringCount: 3700,
}
var ss sendSummary
stream := mock.NewResponseStream()
stream.SendFunc = ss.makeSendFunc()
w := reads.NewResponseWriter(stream, 0)
newCursor := func() (cursor reads.SeriesCursor, e error) {
var gens []gen.SeriesGenerator
gens = append(gens, makeTypedSeries("m0", "t", "s0", strings.Repeat("aaa", 100), 2000, 1))
gens = append(gens, makeTypedSeries("m0", "t", "s1", strings.Repeat("bbb", 200), 0, 1))
gens = append(gens, makeTypedSeries("m0", "t", "s2", strings.Repeat("ccc", 100), 1700, 1))
return newSeriesGeneratorSeriesCursor(gen.NewMergedSeriesGenerator(gens)), nil
}
rs := reads.NewGroupResultSet(context.Background(), &datatypes.ReadGroupRequest{Group: datatypes.GroupNone}, newCursor)
err := w.WriteGroupResultSet(rs)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
w.Flush()
assert.Equal(t, ss, exp)
})
})
}