From bf1ad403f719203989e05b267d3b582dbf91d776 Mon Sep 17 00:00:00 2001 From: Stuart Carnie Date: Fri, 8 May 2020 12:39:54 -0700 Subject: [PATCH] fix(reads): Fix ResultSetToLineProtocol to generate correct output The ResultSetToLineProtocol test class was not generating correct line protocol for string output (appending `i`) In addition, the PR improves the mock.NewResultSetFromSeriesGenerator type with options. The one option added is `WithGeneratorMaxValues`, to limit the total number of values produced by the SeriesGenerator. --- mock/reads_resultset.go | 92 ++++++++++++++++++++----- mock/reads_resultset_test.go | 32 ++++++++- storage/reads/resultset_lineprotocol.go | 2 +- 3 files changed, 106 insertions(+), 20 deletions(-) diff --git a/mock/reads_resultset.go b/mock/reads_resultset.go index 4803b8c897..ebc20f0c0e 100644 --- a/mock/reads_resultset.go +++ b/mock/reads_resultset.go @@ -8,25 +8,55 @@ import ( ) type GeneratorResultSet struct { - sg gen.SeriesGenerator - f floatTimeValuesGeneratorCursor - i integerTimeValuesGeneratorCursor - u unsignedTimeValuesGeneratorCursor - s stringTimeValuesGeneratorCursor - b booleanTimeValuesGeneratorCursor - cur cursors.Cursor + sg gen.SeriesGenerator + max int + count int + f floatTimeValuesGeneratorCursor + i integerTimeValuesGeneratorCursor + u unsignedTimeValuesGeneratorCursor + s stringTimeValuesGeneratorCursor + b booleanTimeValuesGeneratorCursor + cur cursors.Cursor } var _ reads.ResultSet = (*GeneratorResultSet)(nil) +type GeneratorOptionFn func(*GeneratorResultSet) + +// WithGeneratorMaxValues limits the number of values +// produced by GeneratorResultSet to n. +func WithGeneratorMaxValues(n int) GeneratorOptionFn { + return func(g *GeneratorResultSet) { + g.max = n + } +} + // NewResultSetFromSeriesGenerator transforms a SeriesGenerator into a ResultSet, -// which is useful for mocking data when a client requires a ResultSet. -func NewResultSetFromSeriesGenerator(sg gen.SeriesGenerator) *GeneratorResultSet { - return &GeneratorResultSet{sg: sg} +// and therefore may be used anywhere a ResultSet is required. +func NewResultSetFromSeriesGenerator(sg gen.SeriesGenerator, opts ...GeneratorOptionFn) *GeneratorResultSet { + s := &GeneratorResultSet{sg: sg} + + for _, opt := range opts { + opt(s) + } + + s.f.max = s.max + s.i.max = s.max + s.u.max = s.max + s.s.max = s.max + s.b.max = s.max + s.f.count = &s.count + s.i.count = &s.count + s.u.count = &s.count + s.s.count = &s.count + s.b.count = &s.count + + return s } func (g *GeneratorResultSet) Next() bool { - return g.sg.Next() + remain := g.max - g.count + return g.sg.Next() && (g.max == 0 || remain > 0) } func (g *GeneratorResultSet) Cursor() cursors.Cursor { @@ -71,12 +101,17 @@ func (g *GeneratorResultSet) Stats() cursors.CursorStats { type timeValuesGeneratorCursor struct { tv gen.TimeValuesSequence + max int + count *int stats cursors.CursorStats } func (t timeValuesGeneratorCursor) Close() {} func (t timeValuesGeneratorCursor) Err() error { return nil } func (t timeValuesGeneratorCursor) Stats() cursors.CursorStats { return t.stats } +func (t *timeValuesGeneratorCursor) add(n int) { *t.count += n } +func (t *timeValuesGeneratorCursor) checkCount() bool { return t.max == 0 || *t.count < t.max } +func (t *timeValuesGeneratorCursor) remain() int { return t.max - *t.count } type floatTimeValuesGeneratorCursor struct { timeValuesGeneratorCursor @@ -84,10 +119,15 @@ type floatTimeValuesGeneratorCursor struct { } func (c *floatTimeValuesGeneratorCursor) Next() *cursors.FloatArray { - if c.tv.Next() { + if c.checkCount() && c.tv.Next() { c.tv.Values().(gen.FloatValues).Copy(&c.a) + if remain := c.remain(); c.max > 0 && remain < c.a.Len() { + c.a.Timestamps = c.a.Timestamps[:remain] + c.a.Values = c.a.Values[:remain] + } c.stats.ScannedBytes += len(c.a.Values) * 8 c.stats.ScannedValues += c.a.Len() + c.add(c.a.Len()) } else { c.a.Timestamps = c.a.Timestamps[:0] c.a.Values = c.a.Values[:0] @@ -101,10 +141,15 @@ type integerTimeValuesGeneratorCursor struct { } func (c *integerTimeValuesGeneratorCursor) Next() *cursors.IntegerArray { - if c.tv.Next() { + if c.checkCount() && c.tv.Next() { c.tv.Values().(gen.IntegerValues).Copy(&c.a) + if remain := c.remain(); c.max > 0 && remain < c.a.Len() { + c.a.Timestamps = c.a.Timestamps[:remain] + c.a.Values = c.a.Values[:remain] + } c.stats.ScannedBytes += len(c.a.Values) * 8 c.stats.ScannedValues += c.a.Len() + c.add(c.a.Len()) } else { c.a.Timestamps = c.a.Timestamps[:0] c.a.Values = c.a.Values[:0] @@ -118,10 +163,15 @@ type unsignedTimeValuesGeneratorCursor struct { } func (c *unsignedTimeValuesGeneratorCursor) Next() *cursors.UnsignedArray { - if c.tv.Next() { + if c.checkCount() && c.tv.Next() { c.tv.Values().(gen.UnsignedValues).Copy(&c.a) + if remain := c.remain(); c.max > 0 && remain < c.a.Len() { + c.a.Timestamps = c.a.Timestamps[:remain] + c.a.Values = c.a.Values[:remain] + } c.stats.ScannedBytes += len(c.a.Values) * 8 c.stats.ScannedValues += c.a.Len() + c.add(c.a.Len()) } else { c.a.Timestamps = c.a.Timestamps[:0] c.a.Values = c.a.Values[:0] @@ -135,12 +185,17 @@ type stringTimeValuesGeneratorCursor struct { } func (c *stringTimeValuesGeneratorCursor) Next() *cursors.StringArray { - if c.tv.Next() { + if c.checkCount() && c.tv.Next() { c.tv.Values().(gen.StringValues).Copy(&c.a) + if remain := c.remain(); c.max > 0 && remain < c.a.Len() { + c.a.Timestamps = c.a.Timestamps[:remain] + c.a.Values = c.a.Values[:remain] + } for _, v := range c.a.Values { c.stats.ScannedBytes += len(v) } c.stats.ScannedValues += c.a.Len() + c.add(c.a.Len()) } else { c.a.Timestamps = c.a.Timestamps[:0] c.a.Values = c.a.Values[:0] @@ -154,10 +209,15 @@ type booleanTimeValuesGeneratorCursor struct { } func (c *booleanTimeValuesGeneratorCursor) Next() *cursors.BooleanArray { - if c.tv.Next() { + if c.checkCount() && c.tv.Next() { c.tv.Values().(gen.BooleanValues).Copy(&c.a) + if remain := c.remain(); c.max > 0 && remain < c.a.Len() { + c.a.Timestamps = c.a.Timestamps[:remain] + c.a.Values = c.a.Values[:remain] + } c.stats.ScannedBytes += len(c.a.Values) c.stats.ScannedValues += c.a.Len() + c.add(c.a.Len()) } else { c.a.Timestamps = c.a.Timestamps[:0] c.a.Values = c.a.Values[:0] diff --git a/mock/reads_resultset_test.go b/mock/reads_resultset_test.go index cb0cc03ec3..1d385b4ee2 100644 --- a/mock/reads_resultset_test.go +++ b/mock/reads_resultset_test.go @@ -24,10 +24,9 @@ func mustNewSpecFromToml(tb testing.TB, toml string) *gen.Spec { } func TestNewResultSetFromSeriesGenerator(t *testing.T) { - checkResult := func(t *testing.T, sg gen.SeriesGenerator, expData string, expStats cursors.CursorStats) { + checkResult := func(t *testing.T, rs reads.ResultSet, expData string, expStats cursors.CursorStats) { t.Helper() - rs := mock.NewResultSetFromSeriesGenerator(sg) var sb strings.Builder err := reads.ResultSetToLineProtocol(&sb, rs) if err != nil { @@ -80,7 +79,34 @@ m0,tag0=value2,tag1=value1 v0=1 1333333000000 m0,tag0=value2,tag1=value1 v0=1 1666666000000 ` expStats := cursors.CursorStats{ScannedValues: 18, ScannedBytes: 18 * 8} - checkResult(t, sg, expData, expStats) + checkResult(t, mock.NewResultSetFromSeriesGenerator(sg), expData, expStats) + }) + + t.Run("max", func(t *testing.T) { + spec := mustNewSpecFromToml(t, ` +[[measurements]] +name = "m0" +sample = 1.0 +tags = [ + { name = "tag0", source = { type = "sequence", start = 0, count = 3 } }, + { name = "tag1", source = { type = "sequence", start = 0, count = 2 } }, +] +fields = [ + { name = "v0", count = 3, source = 1.0 }, +]`) + + sg := gen.NewSeriesGeneratorFromSpec(spec, gen.TimeRange{ + Start: time.Unix(1000, 0), + End: time.Unix(2000, 0), + }) + const expData = `m0,tag0=value0,tag1=value0 v0=1 1000000000000 +m0,tag0=value0,tag1=value0 v0=1 1333333000000 +m0,tag0=value0,tag1=value0 v0=1 1666666000000 +m0,tag0=value0,tag1=value1 v0=1 1000000000000 +m0,tag0=value0,tag1=value1 v0=1 1333333000000 +` + expStats := cursors.CursorStats{ScannedValues: 5, ScannedBytes: 5 * 8} + checkResult(t, mock.NewResultSetFromSeriesGenerator(sg, mock.WithGeneratorMaxValues(5)), expData, expStats) }) } diff --git a/storage/reads/resultset_lineprotocol.go b/storage/reads/resultset_lineprotocol.go index 7bfc938d8f..3de073b522 100644 --- a/storage/reads/resultset_lineprotocol.go +++ b/storage/reads/resultset_lineprotocol.go @@ -111,7 +111,7 @@ func cursorToLineProtocol(wr io.Writer, line []byte, cur cursors.Cursor) error { if a.Len() > 0 { for i := range a.Timestamps { buf := strconv.AppendQuote(line, a.Values[i]) - buf = append(buf, 'i', ' ') + buf = append(buf, ' ') buf = strconv.AppendInt(buf, a.Timestamps[i], 10) wr.Write(buf) wr.Write(newLine)