fix(reads): ResponseWriter truncates values for last series

The ResponseWriter would truncate the last series if the byte size of
the points frames exceeded the writeSize constant, causing a Flush to
occur and the cumulative ResponseWriter.sz to reset to zero. Because
ResponseWriter.sz was not incremented for each frame, it remained at
zero, which resulted in the final Flush short circuiting.

This commit implements the Size method for the cursors.Array types
to be used to estimate the size of frame. This is in place of calling
the Protocol Buffer `Size` function, which can be very expensive.
pull/14674/head
Stuart Carnie 2019-08-16 10:36:40 -07:00
parent 0b20c227b4
commit 3ca751cfd6
No known key found for this signature in database
GPG Key ID: 848D9C9718D78B4F
4 changed files with 598 additions and 103 deletions

View File

@ -57,39 +57,53 @@ func (w *ResponseWriter) streamFloatArrayPoints(cur cursors.FloatArrayCursor) {
frame := p.FloatPoints frame := p.FloatPoints
w.res.Frames = append(w.res.Frames, datatypes.ReadResponse_Frame{Data: p}) w.res.Frames = append(w.res.Frames, datatypes.ReadResponse_Frame{Data: p})
var ( var seriesValueCount = 0
seriesValueCount = 0
b = 0
)
for { for {
// If the number of values produced by cur > 1000,
// cur.Next() will produce batches of values that are of
// length ≤ 1000.
// We attempt to limit the frame Timestamps / Values lengths
// the same to avoid allocations. These frames are recycled
// after flushing so that on repeated use there should be enough space
// to append values from a into frame without additional allocations.
a := cur.Next() a := cur.Next()
if len(a.Timestamps) == 0 { if len(a.Timestamps) == 0 {
break break
} }
seriesValueCount += a.Len()
// As specified in the struct definition, w.sz is an estimated
// size (in bytes) of the buffered data. It is therefore a
// deliberate choice to accumulate using the array Size, which is
// cheap to calculate. Calling frame.Size() can be expensive
// when using varint encoding for numbers.
w.sz += a.Size()
frame.Timestamps = append(frame.Timestamps, a.Timestamps...) frame.Timestamps = append(frame.Timestamps, a.Timestamps...)
frame.Values = append(frame.Values, a.Values...) frame.Values = append(frame.Values, a.Values...)
b = len(frame.Timestamps) // given the expectation of cur.Next, we attempt to limit
if b >= batchSize { // the number of values appended to the frame to batchSize (1000)
seriesValueCount += b needsFrame := len(frame.Timestamps) >= batchSize
b = 0
w.sz += frame.Size()
if w.sz >= writeSize {
w.Flush()
if w.err != nil {
break
}
}
if w.sz >= writeSize {
needsFrame = true
w.Flush()
if w.err != nil {
break
}
}
if needsFrame {
// new frames are returned with Timestamps and Values preallocated
// to a minimum of batchSize length to reduce further allocations.
p = w.getFloatPointsFrame() p = w.getFloatPointsFrame()
frame = p.FloatPoints frame = p.FloatPoints
w.res.Frames = append(w.res.Frames, datatypes.ReadResponse_Frame{Data: p}) w.res.Frames = append(w.res.Frames, datatypes.ReadResponse_Frame{Data: p})
} }
} }
seriesValueCount += b
w.vc += seriesValueCount w.vc += seriesValueCount
if seriesValueCount == 0 { if seriesValueCount == 0 {
w.sz -= w.sf.Size() w.sz -= w.sf.Size()
@ -146,39 +160,53 @@ func (w *ResponseWriter) streamIntegerArrayPoints(cur cursors.IntegerArrayCursor
frame := p.IntegerPoints frame := p.IntegerPoints
w.res.Frames = append(w.res.Frames, datatypes.ReadResponse_Frame{Data: p}) w.res.Frames = append(w.res.Frames, datatypes.ReadResponse_Frame{Data: p})
var ( var seriesValueCount = 0
seriesValueCount = 0
b = 0
)
for { for {
// If the number of values produced by cur > 1000,
// cur.Next() will produce batches of values that are of
// length ≤ 1000.
// We attempt to limit the frame Timestamps / Values lengths
// the same to avoid allocations. These frames are recycled
// after flushing so that on repeated use there should be enough space
// to append values from a into frame without additional allocations.
a := cur.Next() a := cur.Next()
if len(a.Timestamps) == 0 { if len(a.Timestamps) == 0 {
break break
} }
seriesValueCount += a.Len()
// As specified in the struct definition, w.sz is an estimated
// size (in bytes) of the buffered data. It is therefore a
// deliberate choice to accumulate using the array Size, which is
// cheap to calculate. Calling frame.Size() can be expensive
// when using varint encoding for numbers.
w.sz += a.Size()
frame.Timestamps = append(frame.Timestamps, a.Timestamps...) frame.Timestamps = append(frame.Timestamps, a.Timestamps...)
frame.Values = append(frame.Values, a.Values...) frame.Values = append(frame.Values, a.Values...)
b = len(frame.Timestamps) // given the expectation of cur.Next, we attempt to limit
if b >= batchSize { // the number of values appended to the frame to batchSize (1000)
seriesValueCount += b needsFrame := len(frame.Timestamps) >= batchSize
b = 0
w.sz += frame.Size()
if w.sz >= writeSize {
w.Flush()
if w.err != nil {
break
}
}
if w.sz >= writeSize {
needsFrame = true
w.Flush()
if w.err != nil {
break
}
}
if needsFrame {
// new frames are returned with Timestamps and Values preallocated
// to a minimum of batchSize length to reduce further allocations.
p = w.getIntegerPointsFrame() p = w.getIntegerPointsFrame()
frame = p.IntegerPoints frame = p.IntegerPoints
w.res.Frames = append(w.res.Frames, datatypes.ReadResponse_Frame{Data: p}) w.res.Frames = append(w.res.Frames, datatypes.ReadResponse_Frame{Data: p})
} }
} }
seriesValueCount += b
w.vc += seriesValueCount w.vc += seriesValueCount
if seriesValueCount == 0 { if seriesValueCount == 0 {
w.sz -= w.sf.Size() w.sz -= w.sf.Size()
@ -235,39 +263,53 @@ func (w *ResponseWriter) streamUnsignedArrayPoints(cur cursors.UnsignedArrayCurs
frame := p.UnsignedPoints frame := p.UnsignedPoints
w.res.Frames = append(w.res.Frames, datatypes.ReadResponse_Frame{Data: p}) w.res.Frames = append(w.res.Frames, datatypes.ReadResponse_Frame{Data: p})
var ( var seriesValueCount = 0
seriesValueCount = 0
b = 0
)
for { for {
// If the number of values produced by cur > 1000,
// cur.Next() will produce batches of values that are of
// length ≤ 1000.
// We attempt to limit the frame Timestamps / Values lengths
// the same to avoid allocations. These frames are recycled
// after flushing so that on repeated use there should be enough space
// to append values from a into frame without additional allocations.
a := cur.Next() a := cur.Next()
if len(a.Timestamps) == 0 { if len(a.Timestamps) == 0 {
break break
} }
seriesValueCount += a.Len()
// As specified in the struct definition, w.sz is an estimated
// size (in bytes) of the buffered data. It is therefore a
// deliberate choice to accumulate using the array Size, which is
// cheap to calculate. Calling frame.Size() can be expensive
// when using varint encoding for numbers.
w.sz += a.Size()
frame.Timestamps = append(frame.Timestamps, a.Timestamps...) frame.Timestamps = append(frame.Timestamps, a.Timestamps...)
frame.Values = append(frame.Values, a.Values...) frame.Values = append(frame.Values, a.Values...)
b = len(frame.Timestamps) // given the expectation of cur.Next, we attempt to limit
if b >= batchSize { // the number of values appended to the frame to batchSize (1000)
seriesValueCount += b needsFrame := len(frame.Timestamps) >= batchSize
b = 0
w.sz += frame.Size()
if w.sz >= writeSize {
w.Flush()
if w.err != nil {
break
}
}
if w.sz >= writeSize {
needsFrame = true
w.Flush()
if w.err != nil {
break
}
}
if needsFrame {
// new frames are returned with Timestamps and Values preallocated
// to a minimum of batchSize length to reduce further allocations.
p = w.getUnsignedPointsFrame() p = w.getUnsignedPointsFrame()
frame = p.UnsignedPoints frame = p.UnsignedPoints
w.res.Frames = append(w.res.Frames, datatypes.ReadResponse_Frame{Data: p}) w.res.Frames = append(w.res.Frames, datatypes.ReadResponse_Frame{Data: p})
} }
} }
seriesValueCount += b
w.vc += seriesValueCount w.vc += seriesValueCount
if seriesValueCount == 0 { if seriesValueCount == 0 {
w.sz -= w.sf.Size() w.sz -= w.sf.Size()
@ -324,39 +366,53 @@ func (w *ResponseWriter) streamStringArrayPoints(cur cursors.StringArrayCursor)
frame := p.StringPoints frame := p.StringPoints
w.res.Frames = append(w.res.Frames, datatypes.ReadResponse_Frame{Data: p}) w.res.Frames = append(w.res.Frames, datatypes.ReadResponse_Frame{Data: p})
var ( var seriesValueCount = 0
seriesValueCount = 0
b = 0
)
for { for {
// If the number of values produced by cur > 1000,
// cur.Next() will produce batches of values that are of
// length ≤ 1000.
// We attempt to limit the frame Timestamps / Values lengths
// the same to avoid allocations. These frames are recycled
// after flushing so that on repeated use there should be enough space
// to append values from a into frame without additional allocations.
a := cur.Next() a := cur.Next()
if len(a.Timestamps) == 0 { if len(a.Timestamps) == 0 {
break break
} }
seriesValueCount += a.Len()
// As specified in the struct definition, w.sz is an estimated
// size (in bytes) of the buffered data. It is therefore a
// deliberate choice to accumulate using the array Size, which is
// cheap to calculate. Calling frame.Size() can be expensive
// when using varint encoding for numbers.
w.sz += a.Size()
frame.Timestamps = append(frame.Timestamps, a.Timestamps...) frame.Timestamps = append(frame.Timestamps, a.Timestamps...)
frame.Values = append(frame.Values, a.Values...) frame.Values = append(frame.Values, a.Values...)
b = len(frame.Timestamps) // given the expectation of cur.Next, we attempt to limit
if b >= batchSize { // the number of values appended to the frame to batchSize (1000)
seriesValueCount += b needsFrame := len(frame.Timestamps) >= batchSize
b = 0
w.sz += frame.Size()
if w.sz >= writeSize {
w.Flush()
if w.err != nil {
break
}
}
if w.sz >= writeSize {
needsFrame = true
w.Flush()
if w.err != nil {
break
}
}
if needsFrame {
// new frames are returned with Timestamps and Values preallocated
// to a minimum of batchSize length to reduce further allocations.
p = w.getStringPointsFrame() p = w.getStringPointsFrame()
frame = p.StringPoints frame = p.StringPoints
w.res.Frames = append(w.res.Frames, datatypes.ReadResponse_Frame{Data: p}) w.res.Frames = append(w.res.Frames, datatypes.ReadResponse_Frame{Data: p})
} }
} }
seriesValueCount += b
w.vc += seriesValueCount w.vc += seriesValueCount
if seriesValueCount == 0 { if seriesValueCount == 0 {
w.sz -= w.sf.Size() w.sz -= w.sf.Size()
@ -413,39 +469,53 @@ func (w *ResponseWriter) streamBooleanArrayPoints(cur cursors.BooleanArrayCursor
frame := p.BooleanPoints frame := p.BooleanPoints
w.res.Frames = append(w.res.Frames, datatypes.ReadResponse_Frame{Data: p}) w.res.Frames = append(w.res.Frames, datatypes.ReadResponse_Frame{Data: p})
var ( var seriesValueCount = 0
seriesValueCount = 0
b = 0
)
for { for {
// If the number of values produced by cur > 1000,
// cur.Next() will produce batches of values that are of
// length ≤ 1000.
// We attempt to limit the frame Timestamps / Values lengths
// the same to avoid allocations. These frames are recycled
// after flushing so that on repeated use there should be enough space
// to append values from a into frame without additional allocations.
a := cur.Next() a := cur.Next()
if len(a.Timestamps) == 0 { if len(a.Timestamps) == 0 {
break break
} }
seriesValueCount += a.Len()
// As specified in the struct definition, w.sz is an estimated
// size (in bytes) of the buffered data. It is therefore a
// deliberate choice to accumulate using the array Size, which is
// cheap to calculate. Calling frame.Size() can be expensive
// when using varint encoding for numbers.
w.sz += a.Size()
frame.Timestamps = append(frame.Timestamps, a.Timestamps...) frame.Timestamps = append(frame.Timestamps, a.Timestamps...)
frame.Values = append(frame.Values, a.Values...) frame.Values = append(frame.Values, a.Values...)
b = len(frame.Timestamps) // given the expectation of cur.Next, we attempt to limit
if b >= batchSize { // the number of values appended to the frame to batchSize (1000)
seriesValueCount += b needsFrame := len(frame.Timestamps) >= batchSize
b = 0
w.sz += frame.Size()
if w.sz >= writeSize {
w.Flush()
if w.err != nil {
break
}
}
if w.sz >= writeSize {
needsFrame = true
w.Flush()
if w.err != nil {
break
}
}
if needsFrame {
// new frames are returned with Timestamps and Values preallocated
// to a minimum of batchSize length to reduce further allocations.
p = w.getBooleanPointsFrame() p = w.getBooleanPointsFrame()
frame = p.BooleanPoints frame = p.BooleanPoints
w.res.Frames = append(w.res.Frames, datatypes.ReadResponse_Frame{Data: p}) w.res.Frames = append(w.res.Frames, datatypes.ReadResponse_Frame{Data: p})
} }
} }
seriesValueCount += b
w.vc += seriesValueCount w.vc += seriesValueCount
if seriesValueCount == 0 { if seriesValueCount == 0 {
w.sz -= w.sf.Size() w.sz -= w.sf.Size()

View File

@ -53,39 +53,53 @@ func (w *ResponseWriter) stream{{.Name}}ArrayPoints(cur cursors.{{.Name}}ArrayCu
frame := p.{{.Name}}Points frame := p.{{.Name}}Points
w.res.Frames = append(w.res.Frames, datatypes.ReadResponse_Frame{Data: p}) w.res.Frames = append(w.res.Frames, datatypes.ReadResponse_Frame{Data: p})
var ( var seriesValueCount = 0
seriesValueCount = 0
b = 0
)
for { for {
// If the number of values produced by cur > 1000,
// cur.Next() will produce batches of values that are of
// length ≤ 1000.
// We attempt to limit the frame Timestamps / Values lengths
// the same to avoid allocations. These frames are recycled
// after flushing so that on repeated use there should be enough space
// to append values from a into frame without additional allocations.
a := cur.Next() a := cur.Next()
if len(a.Timestamps) == 0 { if len(a.Timestamps) == 0 {
break break
} }
seriesValueCount += a.Len()
// As specified in the struct definition, w.sz is an estimated
// size (in bytes) of the buffered data. It is therefore a
// deliberate choice to accumulate using the array Size, which is
// cheap to calculate. Calling frame.Size() can be expensive
// when using varint encoding for numbers.
w.sz += a.Size()
frame.Timestamps = append(frame.Timestamps, a.Timestamps...) frame.Timestamps = append(frame.Timestamps, a.Timestamps...)
frame.Values = append(frame.Values, a.Values...) frame.Values = append(frame.Values, a.Values...)
b = len(frame.Timestamps) // given the expectation of cur.Next, we attempt to limit
if b >= batchSize { // the number of values appended to the frame to batchSize (1000)
seriesValueCount += b needsFrame := len(frame.Timestamps) >= batchSize
b = 0
w.sz += frame.Size()
if w.sz >= writeSize {
w.Flush()
if w.err != nil {
break
}
}
if w.sz >= writeSize {
needsFrame = true
w.Flush()
if w.err != nil {
break
}
}
if needsFrame {
// new frames are returned with Timestamps and Values preallocated
// to a minimum of batchSize length to reduce further allocations.
p = w.get{{.Name}}PointsFrame() p = w.get{{.Name}}PointsFrame()
frame = p.{{.Name}}Points frame = p.{{.Name}}Points
w.res.Frames = append(w.res.Frames, datatypes.ReadResponse_Frame{Data: p}) w.res.Frames = append(w.res.Frames, datatypes.ReadResponse_Frame{Data: p})
} }
} }
seriesValueCount += b
w.vc += seriesValueCount w.vc += seriesValueCount
if seriesValueCount == 0 { if seriesValueCount == 0 {
w.sz -= w.sf.Size() w.sz -= w.sf.Size()

View File

@ -31,7 +31,9 @@ type ResponseWriter struct {
// current series // current series
sf *datatypes.ReadResponse_SeriesFrame sf *datatypes.ReadResponse_SeriesFrame
ss int // pointer to current series frame; used to skip writing if no points ss int // pointer to current series frame; used to skip writing if no points
sz int // estimated size in bytes for pending write // sz is an estimated size in bytes for pending writes to flush periodically
// when the size exceeds writeSize.
sz int
vc int // total value count vc int // total value count

View File

@ -1,12 +1,21 @@
package reads_test package reads_test
import ( import (
"context"
"errors"
"fmt" "fmt"
"reflect" "reflect"
"strings"
"testing" "testing"
"time"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/mock" "github.com/influxdata/influxdb/mock"
"github.com/influxdata/influxdb/pkg/data/gen"
"github.com/influxdata/influxdb/pkg/testing/assert"
"github.com/influxdata/influxdb/storage/reads" "github.com/influxdata/influxdb/storage/reads"
"github.com/influxdata/influxdb/storage/reads/datatypes"
"github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/influxdb/tsdb/cursors" "github.com/influxdata/influxdb/tsdb/cursors"
"google.golang.org/grpc/metadata" "google.golang.org/grpc/metadata"
) )
@ -123,3 +132,403 @@ func TestResponseWriter_WriteGroupResultSet_Stats(t *testing.T) {
t.Errorf("expected scanned-bytes '%v' but got '%v'", []string{fmt.Sprint(scannedBytes)}, gotTrailer.Get("scanned-bytes")) t.Errorf("expected scanned-bytes '%v' but got '%v'", []string{fmt.Sprint(scannedBytes)}, gotTrailer.Get("scanned-bytes"))
} }
} }
var (
org = influxdb.ID(0xff00ff00)
bucket = influxdb.ID(0xcc00cc00)
orgBucketID = tsdb.EncodeName(org, bucket)
)
func makeTypedSeries(m, prefix, field string, val interface{}, valueCount int, counts ...int) gen.SeriesGenerator {
spec := gen.TimeSequenceSpec{Count: valueCount, Start: time.Unix(0, 0), Delta: time.Second}
ts := gen.NewTimestampSequenceFromSpec(spec)
var vg gen.TimeValuesSequence
switch val := val.(type) {
case float64:
vg = gen.NewTimeFloatValuesSequence(spec.Count, ts, gen.NewFloatConstantValuesSequence(val))
case int64:
vg = gen.NewTimeIntegerValuesSequence(spec.Count, ts, gen.NewIntegerConstantValuesSequence(val))
case int:
vg = gen.NewTimeIntegerValuesSequence(spec.Count, ts, gen.NewIntegerConstantValuesSequence(int64(val)))
case uint64:
vg = gen.NewTimeUnsignedValuesSequence(spec.Count, ts, gen.NewUnsignedConstantValuesSequence(val))
case string:
vg = gen.NewTimeStringValuesSequence(spec.Count, ts, gen.NewStringConstantValuesSequence(val))
case bool:
vg = gen.NewTimeBooleanValuesSequence(spec.Count, ts, gen.NewBooleanConstantValuesSequence(val))
default:
panic(fmt.Sprintf("unexpected type %T", val))
}
return gen.NewSeriesGenerator(orgBucketID, []byte(field), vg, gen.NewTagsValuesSequenceCounts(m, field, prefix, counts))
}
type sendSummary struct {
groupCount int
seriesCount int
floatCount int
integerCount int
unsignedCount int
stringCount int
booleanCount int
}
func (ss *sendSummary) makeSendFunc() func(*datatypes.ReadResponse) error {
return func(r *datatypes.ReadResponse) error {
for i := range r.Frames {
d := r.Frames[i].Data
switch p := d.(type) {
case *datatypes.ReadResponse_Frame_FloatPoints:
ss.floatCount += len(p.FloatPoints.Values)
case *datatypes.ReadResponse_Frame_IntegerPoints:
ss.integerCount += len(p.IntegerPoints.Values)
case *datatypes.ReadResponse_Frame_UnsignedPoints:
ss.unsignedCount += len(p.UnsignedPoints.Values)
case *datatypes.ReadResponse_Frame_StringPoints:
ss.stringCount += len(p.StringPoints.Values)
case *datatypes.ReadResponse_Frame_BooleanPoints:
ss.booleanCount += len(p.BooleanPoints.Values)
case *datatypes.ReadResponse_Frame_Series:
ss.seriesCount++
case *datatypes.ReadResponse_Frame_Group:
ss.groupCount++
default:
panic("unexpected")
}
}
return nil
}
}
func TestResponseWriter_WriteResultSet(t *testing.T) {
t.Run("normal", func(t *testing.T) {
t.Run("all types one series each", func(t *testing.T) {
exp := sendSummary{
seriesCount: 5,
floatCount: 500,
integerCount: 400,
unsignedCount: 300,
stringCount: 200,
booleanCount: 100,
}
var ss sendSummary
stream := mock.NewResponseStream()
stream.SendFunc = ss.makeSendFunc()
w := reads.NewResponseWriter(stream, 0)
var gens []gen.SeriesGenerator
gens = append(gens, makeTypedSeries("m0", "t", "ff", 3.3, exp.floatCount, 1))
gens = append(gens, makeTypedSeries("m0", "t", "if", 100, exp.integerCount, 1))
gens = append(gens, makeTypedSeries("m0", "t", "uf", uint64(25), exp.unsignedCount, 1))
gens = append(gens, makeTypedSeries("m0", "t", "sf", "foo", exp.stringCount, 1))
gens = append(gens, makeTypedSeries("m0", "t", "bf", false, exp.booleanCount, 1))
cur := newSeriesGeneratorSeriesCursor(gen.NewMergedSeriesGenerator(gens))
rs := reads.NewFilteredResultSet(context.Background(), &datatypes.ReadFilterRequest{}, cur)
err := w.WriteResultSet(rs)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
w.Flush()
assert.Equal(t, ss, exp)
})
t.Run("multi-series floats", func(t *testing.T) {
exp := sendSummary{
seriesCount: 5,
floatCount: 8600,
}
var ss sendSummary
stream := mock.NewResponseStream()
stream.SendFunc = ss.makeSendFunc()
w := reads.NewResponseWriter(stream, 0)
var gens []gen.SeriesGenerator
gens = append(gens, makeTypedSeries("m0", "t", "f0", 3.3, 2000, 1))
gens = append(gens, makeTypedSeries("m0", "t", "f1", 5.3, 1500, 1))
gens = append(gens, makeTypedSeries("m0", "t", "f2", 5.3, 2500, 1))
gens = append(gens, makeTypedSeries("m0", "t", "f3", -2.2, 900, 1))
gens = append(gens, makeTypedSeries("m0", "t", "f4", -9.2, 1700, 1))
cur := newSeriesGeneratorSeriesCursor(gen.NewMergedSeriesGenerator(gens))
rs := reads.NewFilteredResultSet(context.Background(), &datatypes.ReadFilterRequest{}, cur)
err := w.WriteResultSet(rs)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
w.Flush()
assert.Equal(t, ss, exp)
})
t.Run("multi-series strings", func(t *testing.T) {
exp := sendSummary{
seriesCount: 4,
stringCount: 6900,
}
var ss sendSummary
stream := mock.NewResponseStream()
stream.SendFunc = ss.makeSendFunc()
w := reads.NewResponseWriter(stream, 0)
var gens []gen.SeriesGenerator
gens = append(gens, makeTypedSeries("m0", "t", "s0", strings.Repeat("aaa", 100), 2000, 1))
gens = append(gens, makeTypedSeries("m0", "t", "s1", strings.Repeat("bbb", 200), 1500, 1))
gens = append(gens, makeTypedSeries("m0", "t", "s2", strings.Repeat("ccc", 300), 2500, 1))
gens = append(gens, makeTypedSeries("m0", "t", "s3", strings.Repeat("ddd", 200), 900, 1))
cur := newSeriesGeneratorSeriesCursor(gen.NewMergedSeriesGenerator(gens))
rs := reads.NewFilteredResultSet(context.Background(), &datatypes.ReadFilterRequest{}, cur)
err := w.WriteResultSet(rs)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
w.Flush()
assert.Equal(t, ss, exp)
})
t.Run("writer doesn't send series with no values", func(t *testing.T) {
exp := sendSummary{
seriesCount: 2,
stringCount: 3700,
}
var ss sendSummary
stream := mock.NewResponseStream()
stream.SendFunc = ss.makeSendFunc()
w := reads.NewResponseWriter(stream, 0)
var gens []gen.SeriesGenerator
gens = append(gens, makeTypedSeries("m0", "t", "s0", strings.Repeat("aaa", 100), 2000, 1))
gens = append(gens, makeTypedSeries("m0", "t", "s1", strings.Repeat("bbb", 200), 0, 1))
gens = append(gens, makeTypedSeries("m0", "t", "s2", strings.Repeat("ccc", 100), 1700, 1))
cur := newSeriesGeneratorSeriesCursor(gen.NewMergedSeriesGenerator(gens))
rs := reads.NewFilteredResultSet(context.Background(), &datatypes.ReadFilterRequest{}, cur)
err := w.WriteResultSet(rs)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
w.Flush()
assert.Equal(t, ss, exp)
})
})
t.Run("error conditions", func(t *testing.T) {
t.Run("writer returns stream error", func(t *testing.T) {
exp := errors.New("no write")
stream := mock.NewResponseStream()
stream.SendFunc = func(r *datatypes.ReadResponse) error { return exp }
w := reads.NewResponseWriter(stream, 0)
cur := newSeriesGeneratorSeriesCursor(makeTypedSeries("m0", "t", "f0", strings.Repeat("0", 1000), 2000, 1))
rs := reads.NewFilteredResultSet(context.Background(), &datatypes.ReadFilterRequest{}, cur)
_ = w.WriteResultSet(rs)
assert.Equal(t, w.Err(), exp)
})
})
t.Run("issues", func(t *testing.T) {
t.Run("#4321 short write", func(t *testing.T) {
t.Run("single string series", func(t *testing.T) {
exp := sendSummary{seriesCount: 1, stringCount: 1020}
var ss sendSummary
stream := mock.NewResponseStream()
stream.SendFunc = ss.makeSendFunc()
w := reads.NewResponseWriter(stream, 0)
cur := newSeriesGeneratorSeriesCursor(makeTypedSeries("m0", "t", "f0", strings.Repeat("0", 1000), exp.stringCount, 1))
rs := reads.NewFilteredResultSet(context.Background(), &datatypes.ReadFilterRequest{}, cur)
err := w.WriteResultSet(rs)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
w.Flush()
assert.Equal(t, ss, exp)
})
t.Run("single float series", func(t *testing.T) {
exp := sendSummary{seriesCount: 1, floatCount: 50500}
var ss sendSummary
stream := mock.NewResponseStream()
stream.SendFunc = ss.makeSendFunc()
w := reads.NewResponseWriter(stream, 0)
cur := newSeriesGeneratorSeriesCursor(makeTypedSeries("m0", "t", "f0", 5.5, exp.floatCount, 1))
rs := reads.NewFilteredResultSet(context.Background(), &datatypes.ReadFilterRequest{}, cur)
err := w.WriteResultSet(rs)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
w.Flush()
assert.Equal(t, ss, exp)
})
t.Run("multi series", func(t *testing.T) {
exp := sendSummary{seriesCount: 2, stringCount: 3700}
var ss sendSummary
stream := mock.NewResponseStream()
stream.SendFunc = ss.makeSendFunc()
w := reads.NewResponseWriter(stream, 0)
var gens []gen.SeriesGenerator
gens = append(gens, makeTypedSeries("m0", "t", "s0", strings.Repeat("aaa", 1000), 2200, 1))
gens = append(gens, makeTypedSeries("m0", "t", "s1", strings.Repeat("bbb", 1000), 1500, 1))
cur := newSeriesGeneratorSeriesCursor(gen.NewMergedSeriesGenerator(gens))
rs := reads.NewFilteredResultSet(context.Background(), &datatypes.ReadFilterRequest{}, cur)
err := w.WriteResultSet(rs)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
w.Flush()
assert.Equal(t, ss, exp)
})
})
})
}
func TestResponseWriter_WriteGroupResultSet(t *testing.T) {
t.Run("normal", func(t *testing.T) {
t.Run("all types one series each", func(t *testing.T) {
exp := sendSummary{
groupCount: 1,
seriesCount: 5,
floatCount: 500,
integerCount: 400,
unsignedCount: 300,
stringCount: 200,
booleanCount: 100,
}
var ss sendSummary
stream := mock.NewResponseStream()
stream.SendFunc = ss.makeSendFunc()
w := reads.NewResponseWriter(stream, 0)
newCursor := func() (cursor reads.SeriesCursor, e error) {
var gens []gen.SeriesGenerator
gens = append(gens, makeTypedSeries("m0", "t", "ff", 3.3, exp.floatCount, 1))
gens = append(gens, makeTypedSeries("m0", "t", "if", 100, exp.integerCount, 1))
gens = append(gens, makeTypedSeries("m0", "t", "uf", uint64(25), exp.unsignedCount, 1))
gens = append(gens, makeTypedSeries("m0", "t", "sf", "foo", exp.stringCount, 1))
gens = append(gens, makeTypedSeries("m0", "t", "bf", false, exp.booleanCount, 1))
return newSeriesGeneratorSeriesCursor(gen.NewMergedSeriesGenerator(gens)), nil
}
rs := reads.NewGroupResultSet(context.Background(), &datatypes.ReadGroupRequest{Group: datatypes.GroupNone}, newCursor)
err := w.WriteGroupResultSet(rs)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
w.Flush()
assert.Equal(t, ss, exp)
})
t.Run("multi-series floats", func(t *testing.T) {
exp := sendSummary{
groupCount: 1,
seriesCount: 5,
floatCount: 8600,
}
var ss sendSummary
stream := mock.NewResponseStream()
stream.SendFunc = ss.makeSendFunc()
w := reads.NewResponseWriter(stream, 0)
newCursor := func() (cursor reads.SeriesCursor, e error) {
var gens []gen.SeriesGenerator
gens = append(gens, makeTypedSeries("m0", "t", "f0", 3.3, 2000, 1))
gens = append(gens, makeTypedSeries("m0", "t", "f1", 5.3, 1500, 1))
gens = append(gens, makeTypedSeries("m0", "t", "f2", 5.3, 2500, 1))
gens = append(gens, makeTypedSeries("m0", "t", "f3", -2.2, 900, 1))
gens = append(gens, makeTypedSeries("m0", "t", "f4", -9.2, 1700, 1))
return newSeriesGeneratorSeriesCursor(gen.NewMergedSeriesGenerator(gens)), nil
}
rs := reads.NewGroupResultSet(context.Background(), &datatypes.ReadGroupRequest{Group: datatypes.GroupNone}, newCursor)
err := w.WriteGroupResultSet(rs)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
w.Flush()
assert.Equal(t, ss, exp)
})
t.Run("multi-series strings", func(t *testing.T) {
exp := sendSummary{
groupCount: 1,
seriesCount: 4,
stringCount: 6900,
}
var ss sendSummary
stream := mock.NewResponseStream()
stream.SendFunc = ss.makeSendFunc()
w := reads.NewResponseWriter(stream, 0)
newCursor := func() (cursor reads.SeriesCursor, e error) {
var gens []gen.SeriesGenerator
gens = append(gens, makeTypedSeries("m0", "t", "s0", strings.Repeat("aaa", 100), 2000, 1))
gens = append(gens, makeTypedSeries("m0", "t", "s1", strings.Repeat("bbb", 200), 1500, 1))
gens = append(gens, makeTypedSeries("m0", "t", "s2", strings.Repeat("ccc", 300), 2500, 1))
gens = append(gens, makeTypedSeries("m0", "t", "s3", strings.Repeat("ddd", 200), 900, 1))
return newSeriesGeneratorSeriesCursor(gen.NewMergedSeriesGenerator(gens)), nil
}
rs := reads.NewGroupResultSet(context.Background(), &datatypes.ReadGroupRequest{Group: datatypes.GroupNone}, newCursor)
err := w.WriteGroupResultSet(rs)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
w.Flush()
assert.Equal(t, ss, exp)
})
t.Run("writer doesn't send series with no values", func(t *testing.T) {
exp := sendSummary{
groupCount: 1,
seriesCount: 2,
stringCount: 3700,
}
var ss sendSummary
stream := mock.NewResponseStream()
stream.SendFunc = ss.makeSendFunc()
w := reads.NewResponseWriter(stream, 0)
newCursor := func() (cursor reads.SeriesCursor, e error) {
var gens []gen.SeriesGenerator
gens = append(gens, makeTypedSeries("m0", "t", "s0", strings.Repeat("aaa", 100), 2000, 1))
gens = append(gens, makeTypedSeries("m0", "t", "s1", strings.Repeat("bbb", 200), 0, 1))
gens = append(gens, makeTypedSeries("m0", "t", "s2", strings.Repeat("ccc", 100), 1700, 1))
return newSeriesGeneratorSeriesCursor(gen.NewMergedSeriesGenerator(gens)), nil
}
rs := reads.NewGroupResultSet(context.Background(), &datatypes.ReadGroupRequest{Group: datatypes.GroupNone}, newCursor)
err := w.WriteGroupResultSet(rs)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
w.Flush()
assert.Equal(t, ss, exp)
})
})
}