fix(reads): Fix ResultSetToLineProtocol to generate correct output
The ResultSetToLineProtocol test class was not generating correct line protocol for string output (appending `i`) In addition, the PR improves the mock.NewResultSetFromSeriesGenerator type with options. The one option added is `WithGeneratorMaxValues`, to limit the total number of values produced by the SeriesGenerator.pull/18018/head
parent
3ad00934c4
commit
bf1ad403f7
|
@ -9,6 +9,8 @@ import (
|
||||||
|
|
||||||
type GeneratorResultSet struct {
|
type GeneratorResultSet struct {
|
||||||
sg gen.SeriesGenerator
|
sg gen.SeriesGenerator
|
||||||
|
max int
|
||||||
|
count int
|
||||||
f floatTimeValuesGeneratorCursor
|
f floatTimeValuesGeneratorCursor
|
||||||
i integerTimeValuesGeneratorCursor
|
i integerTimeValuesGeneratorCursor
|
||||||
u unsignedTimeValuesGeneratorCursor
|
u unsignedTimeValuesGeneratorCursor
|
||||||
|
@ -19,14 +21,42 @@ type GeneratorResultSet struct {
|
||||||
|
|
||||||
var _ reads.ResultSet = (*GeneratorResultSet)(nil)
|
var _ reads.ResultSet = (*GeneratorResultSet)(nil)
|
||||||
|
|
||||||
|
type GeneratorOptionFn func(*GeneratorResultSet)
|
||||||
|
|
||||||
|
// WithGeneratorMaxValues limits the number of values
|
||||||
|
// produced by GeneratorResultSet to n.
|
||||||
|
func WithGeneratorMaxValues(n int) GeneratorOptionFn {
|
||||||
|
return func(g *GeneratorResultSet) {
|
||||||
|
g.max = n
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// NewResultSetFromSeriesGenerator transforms a SeriesGenerator into a ResultSet,
|
// NewResultSetFromSeriesGenerator transforms a SeriesGenerator into a ResultSet,
|
||||||
// which is useful for mocking data when a client requires a ResultSet.
|
// and therefore may be used anywhere a ResultSet is required.
|
||||||
func NewResultSetFromSeriesGenerator(sg gen.SeriesGenerator) *GeneratorResultSet {
|
func NewResultSetFromSeriesGenerator(sg gen.SeriesGenerator, opts ...GeneratorOptionFn) *GeneratorResultSet {
|
||||||
return &GeneratorResultSet{sg: sg}
|
s := &GeneratorResultSet{sg: sg}
|
||||||
|
|
||||||
|
for _, opt := range opts {
|
||||||
|
opt(s)
|
||||||
|
}
|
||||||
|
|
||||||
|
s.f.max = s.max
|
||||||
|
s.i.max = s.max
|
||||||
|
s.u.max = s.max
|
||||||
|
s.s.max = s.max
|
||||||
|
s.b.max = s.max
|
||||||
|
s.f.count = &s.count
|
||||||
|
s.i.count = &s.count
|
||||||
|
s.u.count = &s.count
|
||||||
|
s.s.count = &s.count
|
||||||
|
s.b.count = &s.count
|
||||||
|
|
||||||
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *GeneratorResultSet) Next() bool {
|
func (g *GeneratorResultSet) Next() bool {
|
||||||
return g.sg.Next()
|
remain := g.max - g.count
|
||||||
|
return g.sg.Next() && (g.max == 0 || remain > 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *GeneratorResultSet) Cursor() cursors.Cursor {
|
func (g *GeneratorResultSet) Cursor() cursors.Cursor {
|
||||||
|
@ -71,12 +101,17 @@ func (g *GeneratorResultSet) Stats() cursors.CursorStats {
|
||||||
|
|
||||||
type timeValuesGeneratorCursor struct {
|
type timeValuesGeneratorCursor struct {
|
||||||
tv gen.TimeValuesSequence
|
tv gen.TimeValuesSequence
|
||||||
|
max int
|
||||||
|
count *int
|
||||||
stats cursors.CursorStats
|
stats cursors.CursorStats
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t timeValuesGeneratorCursor) Close() {}
|
func (t timeValuesGeneratorCursor) Close() {}
|
||||||
func (t timeValuesGeneratorCursor) Err() error { return nil }
|
func (t timeValuesGeneratorCursor) Err() error { return nil }
|
||||||
func (t timeValuesGeneratorCursor) Stats() cursors.CursorStats { return t.stats }
|
func (t timeValuesGeneratorCursor) Stats() cursors.CursorStats { return t.stats }
|
||||||
|
func (t *timeValuesGeneratorCursor) add(n int) { *t.count += n }
|
||||||
|
func (t *timeValuesGeneratorCursor) checkCount() bool { return t.max == 0 || *t.count < t.max }
|
||||||
|
func (t *timeValuesGeneratorCursor) remain() int { return t.max - *t.count }
|
||||||
|
|
||||||
type floatTimeValuesGeneratorCursor struct {
|
type floatTimeValuesGeneratorCursor struct {
|
||||||
timeValuesGeneratorCursor
|
timeValuesGeneratorCursor
|
||||||
|
@ -84,10 +119,15 @@ type floatTimeValuesGeneratorCursor struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *floatTimeValuesGeneratorCursor) Next() *cursors.FloatArray {
|
func (c *floatTimeValuesGeneratorCursor) Next() *cursors.FloatArray {
|
||||||
if c.tv.Next() {
|
if c.checkCount() && c.tv.Next() {
|
||||||
c.tv.Values().(gen.FloatValues).Copy(&c.a)
|
c.tv.Values().(gen.FloatValues).Copy(&c.a)
|
||||||
|
if remain := c.remain(); c.max > 0 && remain < c.a.Len() {
|
||||||
|
c.a.Timestamps = c.a.Timestamps[:remain]
|
||||||
|
c.a.Values = c.a.Values[:remain]
|
||||||
|
}
|
||||||
c.stats.ScannedBytes += len(c.a.Values) * 8
|
c.stats.ScannedBytes += len(c.a.Values) * 8
|
||||||
c.stats.ScannedValues += c.a.Len()
|
c.stats.ScannedValues += c.a.Len()
|
||||||
|
c.add(c.a.Len())
|
||||||
} else {
|
} else {
|
||||||
c.a.Timestamps = c.a.Timestamps[:0]
|
c.a.Timestamps = c.a.Timestamps[:0]
|
||||||
c.a.Values = c.a.Values[:0]
|
c.a.Values = c.a.Values[:0]
|
||||||
|
@ -101,10 +141,15 @@ type integerTimeValuesGeneratorCursor struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *integerTimeValuesGeneratorCursor) Next() *cursors.IntegerArray {
|
func (c *integerTimeValuesGeneratorCursor) Next() *cursors.IntegerArray {
|
||||||
if c.tv.Next() {
|
if c.checkCount() && c.tv.Next() {
|
||||||
c.tv.Values().(gen.IntegerValues).Copy(&c.a)
|
c.tv.Values().(gen.IntegerValues).Copy(&c.a)
|
||||||
|
if remain := c.remain(); c.max > 0 && remain < c.a.Len() {
|
||||||
|
c.a.Timestamps = c.a.Timestamps[:remain]
|
||||||
|
c.a.Values = c.a.Values[:remain]
|
||||||
|
}
|
||||||
c.stats.ScannedBytes += len(c.a.Values) * 8
|
c.stats.ScannedBytes += len(c.a.Values) * 8
|
||||||
c.stats.ScannedValues += c.a.Len()
|
c.stats.ScannedValues += c.a.Len()
|
||||||
|
c.add(c.a.Len())
|
||||||
} else {
|
} else {
|
||||||
c.a.Timestamps = c.a.Timestamps[:0]
|
c.a.Timestamps = c.a.Timestamps[:0]
|
||||||
c.a.Values = c.a.Values[:0]
|
c.a.Values = c.a.Values[:0]
|
||||||
|
@ -118,10 +163,15 @@ type unsignedTimeValuesGeneratorCursor struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *unsignedTimeValuesGeneratorCursor) Next() *cursors.UnsignedArray {
|
func (c *unsignedTimeValuesGeneratorCursor) Next() *cursors.UnsignedArray {
|
||||||
if c.tv.Next() {
|
if c.checkCount() && c.tv.Next() {
|
||||||
c.tv.Values().(gen.UnsignedValues).Copy(&c.a)
|
c.tv.Values().(gen.UnsignedValues).Copy(&c.a)
|
||||||
|
if remain := c.remain(); c.max > 0 && remain < c.a.Len() {
|
||||||
|
c.a.Timestamps = c.a.Timestamps[:remain]
|
||||||
|
c.a.Values = c.a.Values[:remain]
|
||||||
|
}
|
||||||
c.stats.ScannedBytes += len(c.a.Values) * 8
|
c.stats.ScannedBytes += len(c.a.Values) * 8
|
||||||
c.stats.ScannedValues += c.a.Len()
|
c.stats.ScannedValues += c.a.Len()
|
||||||
|
c.add(c.a.Len())
|
||||||
} else {
|
} else {
|
||||||
c.a.Timestamps = c.a.Timestamps[:0]
|
c.a.Timestamps = c.a.Timestamps[:0]
|
||||||
c.a.Values = c.a.Values[:0]
|
c.a.Values = c.a.Values[:0]
|
||||||
|
@ -135,12 +185,17 @@ type stringTimeValuesGeneratorCursor struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *stringTimeValuesGeneratorCursor) Next() *cursors.StringArray {
|
func (c *stringTimeValuesGeneratorCursor) Next() *cursors.StringArray {
|
||||||
if c.tv.Next() {
|
if c.checkCount() && c.tv.Next() {
|
||||||
c.tv.Values().(gen.StringValues).Copy(&c.a)
|
c.tv.Values().(gen.StringValues).Copy(&c.a)
|
||||||
|
if remain := c.remain(); c.max > 0 && remain < c.a.Len() {
|
||||||
|
c.a.Timestamps = c.a.Timestamps[:remain]
|
||||||
|
c.a.Values = c.a.Values[:remain]
|
||||||
|
}
|
||||||
for _, v := range c.a.Values {
|
for _, v := range c.a.Values {
|
||||||
c.stats.ScannedBytes += len(v)
|
c.stats.ScannedBytes += len(v)
|
||||||
}
|
}
|
||||||
c.stats.ScannedValues += c.a.Len()
|
c.stats.ScannedValues += c.a.Len()
|
||||||
|
c.add(c.a.Len())
|
||||||
} else {
|
} else {
|
||||||
c.a.Timestamps = c.a.Timestamps[:0]
|
c.a.Timestamps = c.a.Timestamps[:0]
|
||||||
c.a.Values = c.a.Values[:0]
|
c.a.Values = c.a.Values[:0]
|
||||||
|
@ -154,10 +209,15 @@ type booleanTimeValuesGeneratorCursor struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *booleanTimeValuesGeneratorCursor) Next() *cursors.BooleanArray {
|
func (c *booleanTimeValuesGeneratorCursor) Next() *cursors.BooleanArray {
|
||||||
if c.tv.Next() {
|
if c.checkCount() && c.tv.Next() {
|
||||||
c.tv.Values().(gen.BooleanValues).Copy(&c.a)
|
c.tv.Values().(gen.BooleanValues).Copy(&c.a)
|
||||||
|
if remain := c.remain(); c.max > 0 && remain < c.a.Len() {
|
||||||
|
c.a.Timestamps = c.a.Timestamps[:remain]
|
||||||
|
c.a.Values = c.a.Values[:remain]
|
||||||
|
}
|
||||||
c.stats.ScannedBytes += len(c.a.Values)
|
c.stats.ScannedBytes += len(c.a.Values)
|
||||||
c.stats.ScannedValues += c.a.Len()
|
c.stats.ScannedValues += c.a.Len()
|
||||||
|
c.add(c.a.Len())
|
||||||
} else {
|
} else {
|
||||||
c.a.Timestamps = c.a.Timestamps[:0]
|
c.a.Timestamps = c.a.Timestamps[:0]
|
||||||
c.a.Values = c.a.Values[:0]
|
c.a.Values = c.a.Values[:0]
|
||||||
|
|
|
@ -24,10 +24,9 @@ func mustNewSpecFromToml(tb testing.TB, toml string) *gen.Spec {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestNewResultSetFromSeriesGenerator(t *testing.T) {
|
func TestNewResultSetFromSeriesGenerator(t *testing.T) {
|
||||||
checkResult := func(t *testing.T, sg gen.SeriesGenerator, expData string, expStats cursors.CursorStats) {
|
checkResult := func(t *testing.T, rs reads.ResultSet, expData string, expStats cursors.CursorStats) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
|
|
||||||
rs := mock.NewResultSetFromSeriesGenerator(sg)
|
|
||||||
var sb strings.Builder
|
var sb strings.Builder
|
||||||
err := reads.ResultSetToLineProtocol(&sb, rs)
|
err := reads.ResultSetToLineProtocol(&sb, rs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -80,7 +79,34 @@ m0,tag0=value2,tag1=value1 v0=1 1333333000000
|
||||||
m0,tag0=value2,tag1=value1 v0=1 1666666000000
|
m0,tag0=value2,tag1=value1 v0=1 1666666000000
|
||||||
`
|
`
|
||||||
expStats := cursors.CursorStats{ScannedValues: 18, ScannedBytes: 18 * 8}
|
expStats := cursors.CursorStats{ScannedValues: 18, ScannedBytes: 18 * 8}
|
||||||
checkResult(t, sg, expData, expStats)
|
checkResult(t, mock.NewResultSetFromSeriesGenerator(sg), expData, expStats)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("max", func(t *testing.T) {
|
||||||
|
spec := mustNewSpecFromToml(t, `
|
||||||
|
[[measurements]]
|
||||||
|
name = "m0"
|
||||||
|
sample = 1.0
|
||||||
|
tags = [
|
||||||
|
{ name = "tag0", source = { type = "sequence", start = 0, count = 3 } },
|
||||||
|
{ name = "tag1", source = { type = "sequence", start = 0, count = 2 } },
|
||||||
|
]
|
||||||
|
fields = [
|
||||||
|
{ name = "v0", count = 3, source = 1.0 },
|
||||||
|
]`)
|
||||||
|
|
||||||
|
sg := gen.NewSeriesGeneratorFromSpec(spec, gen.TimeRange{
|
||||||
|
Start: time.Unix(1000, 0),
|
||||||
|
End: time.Unix(2000, 0),
|
||||||
|
})
|
||||||
|
const expData = `m0,tag0=value0,tag1=value0 v0=1 1000000000000
|
||||||
|
m0,tag0=value0,tag1=value0 v0=1 1333333000000
|
||||||
|
m0,tag0=value0,tag1=value0 v0=1 1666666000000
|
||||||
|
m0,tag0=value0,tag1=value1 v0=1 1000000000000
|
||||||
|
m0,tag0=value0,tag1=value1 v0=1 1333333000000
|
||||||
|
`
|
||||||
|
expStats := cursors.CursorStats{ScannedValues: 5, ScannedBytes: 5 * 8}
|
||||||
|
checkResult(t, mock.NewResultSetFromSeriesGenerator(sg, mock.WithGeneratorMaxValues(5)), expData, expStats)
|
||||||
})
|
})
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -111,7 +111,7 @@ func cursorToLineProtocol(wr io.Writer, line []byte, cur cursors.Cursor) error {
|
||||||
if a.Len() > 0 {
|
if a.Len() > 0 {
|
||||||
for i := range a.Timestamps {
|
for i := range a.Timestamps {
|
||||||
buf := strconv.AppendQuote(line, a.Values[i])
|
buf := strconv.AppendQuote(line, a.Values[i])
|
||||||
buf = append(buf, 'i', ' ')
|
buf = append(buf, ' ')
|
||||||
buf = strconv.AppendInt(buf, a.Timestamps[i], 10)
|
buf = strconv.AppendInt(buf, a.Timestamps[i], 10)
|
||||||
wr.Write(buf)
|
wr.Write(buf)
|
||||||
wr.Write(newLine)
|
wr.Write(newLine)
|
||||||
|
|
Loading…
Reference in New Issue