refactor(storage): move unused code to repo that needs it (#17090)

* refactor(storage): move unused code to repo that needs it

Turns out that a bunch of code is only needed in IDPE. This change
removes that code, and another PR adds it to IDPE.

* refactor(storage): export KeyMerger

* refactor(storage): export NilSortHi and NilSortLo

* refactor(storage): move StringIterator & friends to IDPE

* refactor(storage): unexport a few test helper funcs
pull/17108/head
Jacob Marble 2020-03-05 14:15:51 -08:00 committed by GitHub
parent f897c15187
commit 1facad82dd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 38 additions and 3270 deletions

View File

@ -1,8 +1,6 @@
# List any generated files here
TARGETS = array_cursor.gen.go \
response_writer.gen.go \
stream_reader.gen.go \
stream_reader_gen_test.go \
table.gen.go
# List any source files used to generate the targets here
@ -10,8 +8,6 @@ SOURCES = gen.go \
array_cursor.gen.go.tmpl \
array_cursor.gen.go.tmpldata \
response_writer.gen.go.tmpl \
stream_reader.gen.go.tmpl \
stream_reader_gen_test.go.tmpl \
table.gen.go.tmpl \
types.tmpldata

View File

@ -2,8 +2,3 @@ package reads
//go:generate env GO111MODULE=on go run github.com/benbjohnson/tmpl -data=@array_cursor.gen.go.tmpldata array_cursor.gen.go.tmpl
//go:generate env GO111MODULE=on go run github.com/benbjohnson/tmpl -data=@array_cursor.gen.go.tmpldata response_writer.gen.go.tmpl
//go:generate env GO111MODULE=on go run github.com/benbjohnson/tmpl -data=@array_cursor.gen.go.tmpldata stream_reader.gen.go.tmpl
//go:generate env GO111MODULE=on go run github.com/benbjohnson/tmpl -data=@array_cursor.gen.go.tmpldata stream_reader_gen_test.go.tmpl
// Stringer is not compatible with Go modules, see https://github.com/influxdata/platform/issues/2017
// For now we are disabling the command. If it needs to be regenerated you must do so manually.
////go:generate stringer -type=readState -trimprefix=state

View File

@ -24,7 +24,7 @@ type groupResultSet struct {
keys [][]byte
nilSort []byte
rgc groupByCursor
km keyMerger
km KeyMerger
newCursorFn func() (SeriesCursor, error)
nextGroupFn func(c *groupResultSet) GroupCursor
@ -39,7 +39,7 @@ type GroupOption func(g *groupResultSet)
// other value
func GroupOptionNilSortLo() GroupOption {
return func(g *groupResultSet) {
g.nilSort = nilSortLo
g.nilSort = NilSortLo
}
}
@ -49,7 +49,7 @@ func NewGroupResultSet(ctx context.Context, req *datatypes.ReadGroupRequest, new
req: req,
agg: req.Aggregate,
keys: make([][]byte, len(req.GroupKeys)),
nilSort: nilSortHi,
nilSort: NilSortHi,
newCursorFn: newCursorFn,
}
@ -90,13 +90,13 @@ func NewGroupResultSet(ctx context.Context, req *datatypes.ReadGroupRequest, new
return g
}
// nilSort values determine the lexicographical order of nil values in the
// NilSort values determine the lexicographical order of nil values in the
// partition key
var (
// nil sorts lowest
nilSortLo = []byte{0x00}
NilSortLo = []byte{0x00}
// nil sorts highest
nilSortHi = []byte{0xff} // sort nil values
NilSortHi = []byte{0xff}
)
func (g *groupResultSet) Err() error { return nil }
@ -171,7 +171,7 @@ func groupNoneNextGroup(g *groupResultSet) GroupCursor {
mb: g.mb,
agg: g.agg,
cur: cur,
keys: g.km.get(),
keys: g.km.Get(),
}
}
@ -184,13 +184,13 @@ func groupNoneSort(g *groupResultSet) (int, error) {
}
allTime := g.req.Hints.HintSchemaAllTime()
g.km.clear()
g.km.Clear()
n := 0
row := cur.Next()
for row != nil {
if allTime || g.seriesHasPoints(row) {
n++
g.km.mergeTagKeys(row.Tags)
g.km.MergeTagKeys(row.Tags)
}
row = cur.Next()
}
@ -205,16 +205,16 @@ func groupByNextGroup(g *groupResultSet) GroupCursor {
g.rgc.vals[i] = row.Tags.Get(g.keys[i])
}
g.km.clear()
g.km.Clear()
rowKey := row.SortKey
j := g.i
for j < len(g.rows) && bytes.Equal(rowKey, g.rows[j].SortKey) {
g.km.mergeTagKeys(g.rows[j].Tags)
g.km.MergeTagKeys(g.rows[j].Tags)
j++
}
g.rgc.reset(g.rows[g.i:j])
g.rgc.keys = g.km.get()
g.rgc.keys = g.km.Get()
g.i = j
if j == len(g.rows) {

View File

@ -8,13 +8,13 @@ import (
)
// tagsKeyMerger is responsible for determining a merged set of tag keys
type keyMerger struct {
type KeyMerger struct {
i int
tmp [][]byte
keys [2][][]byte
}
func (km *keyMerger) clear() {
func (km *KeyMerger) Clear() {
km.i = 0
km.keys[0] = km.keys[0][:0]
if km.tmp != nil {
@ -25,17 +25,17 @@ func (km *keyMerger) clear() {
}
}
func (km *keyMerger) get() [][]byte { return km.keys[km.i&1] }
func (km *KeyMerger) Get() [][]byte { return km.keys[km.i&1] }
func (km *keyMerger) String() string {
func (km *KeyMerger) String() string {
var s []string
for _, k := range km.get() {
for _, k := range km.Get() {
s = append(s, string(k))
}
return strings.Join(s, ",")
}
func (km *keyMerger) mergeTagKeys(tags models.Tags) {
func (km *KeyMerger) MergeTagKeys(tags models.Tags) {
if cap(km.tmp) < len(tags) {
km.tmp = make([][]byte, len(tags))
} else {
@ -46,10 +46,10 @@ func (km *keyMerger) mergeTagKeys(tags models.Tags) {
km.tmp[i] = tags[i].Key
}
km.mergeKeys(km.tmp)
km.MergeKeys(km.tmp)
}
func (km *keyMerger) mergeKeys(in [][]byte) {
func (km *KeyMerger) MergeKeys(in [][]byte) {
keys := km.keys[km.i&1]
i, j := 0, 0
for i < len(keys) && j < len(in) && bytes.Equal(keys[i], in[j]) {

View File

@ -56,12 +56,12 @@ func TestKeyMerger_MergeTagKeys(t *testing.T) {
},
}
var km keyMerger
var km KeyMerger
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
km.clear()
km.Clear()
for _, tags := range tt.tags {
km.mergeTagKeys(tags)
km.MergeTagKeys(tags)
}
if got := km.String(); !cmp.Equal(got, tt.exp) {
@ -120,12 +120,12 @@ func TestKeyMerger_MergeKeys(t *testing.T) {
},
}
var km keyMerger
var km KeyMerger
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
km.clear()
km.Clear()
for _, keys := range tt.keys {
km.mergeKeys(keys)
km.MergeKeys(keys)
}
if got := km.String(); !cmp.Equal(got, tt.exp) {
@ -158,12 +158,12 @@ func BenchmarkKeyMerger_MergeKeys(b *testing.B) {
b.Run(strconv.Itoa(n), func(b *testing.B) {
b.ResetTimer()
var km keyMerger
var km KeyMerger
for i := 0; i < b.N; i++ {
for j := 0; j < n; j++ {
km.mergeKeys(keys[rand.Int()%len(keys)])
km.MergeKeys(keys[rand.Int()%len(keys)])
}
km.clear()
km.Clear()
}
})
}
@ -192,12 +192,12 @@ func BenchmarkKeyMerger_MergeTagKeys(b *testing.B) {
b.Run(strconv.Itoa(n), func(b *testing.B) {
b.ResetTimer()
var km keyMerger
var km KeyMerger
for i := 0; i < b.N; i++ {
for j := 0; j < n; j++ {
km.mergeTagKeys(tags[rand.Int()%len(tags)])
km.MergeTagKeys(tags[rand.Int()%len(tags)])
}
km.clear()
km.Clear()
}
})
}

View File

@ -1,311 +0,0 @@
package reads
import (
"container/heap"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/tsdb/cursors"
)
type sequenceResultSet struct {
items []ResultSet
rs ResultSet
err error
stats cursors.CursorStats
}
// NewSequenceResultSet combines results into a single ResultSet,
// draining each ResultSet in order before moving to the next.
func NewSequenceResultSet(results []ResultSet) ResultSet {
if len(results) == 0 {
return nil
} else if len(results) == 1 {
return results[0]
}
rs := &sequenceResultSet{items: results}
rs.pop()
return rs
}
func (r *sequenceResultSet) Err() error { return r.err }
func (r *sequenceResultSet) Close() {
if r.rs != nil {
r.rs.Close()
r.rs = nil
}
for _, rs := range r.items {
rs.Close()
}
r.items = nil
}
func (r *sequenceResultSet) pop() bool {
if r.rs != nil {
r.rs.Close()
r.rs = nil
}
if len(r.items) > 0 {
r.rs = r.items[0]
r.items[0] = nil
r.items = r.items[1:]
return true
}
return false
}
func (r *sequenceResultSet) Next() bool {
RETRY:
if r.rs != nil {
if r.rs.Next() {
return true
}
err := r.rs.Err()
stats := r.rs.Stats()
if err != nil {
r.err = err
r.Close()
return false
}
r.stats.Add(stats)
if r.pop() {
goto RETRY
}
}
return false
}
func (r *sequenceResultSet) Cursor() cursors.Cursor {
return r.rs.Cursor()
}
func (r *sequenceResultSet) Tags() models.Tags {
return r.rs.Tags()
}
func (r *sequenceResultSet) Stats() cursors.CursorStats {
return r.stats
}
type mergedResultSet struct {
heap resultSetHeap
err error
first bool
stats cursors.CursorStats
}
// NewMergedResultSet combines the results into a single ResultSet,
// producing keys in ascending lexicographical order. It requires
// all input results are ordered.
func NewMergedResultSet(results []ResultSet) ResultSet {
if len(results) == 0 {
return nil
} else if len(results) == 1 {
return results[0]
}
mrs := &mergedResultSet{first: true}
mrs.heap.init(results)
return mrs
}
func (r *mergedResultSet) Err() error { return r.err }
func (r *mergedResultSet) Close() {
for _, rs := range r.heap.items {
rs.Close()
}
r.heap.items = nil
}
func (r *mergedResultSet) Next() bool {
if len(r.heap.items) == 0 {
return false
}
if !r.first {
top := r.heap.items[0]
if top.Next() {
heap.Fix(&r.heap, 0)
return true
}
err := top.Err()
stats := top.Stats()
top.Close()
heap.Pop(&r.heap)
if err != nil {
r.err = err
r.Close()
return false
}
r.stats.Add(stats)
return len(r.heap.items) > 0
}
r.first = false
return true
}
func (r *mergedResultSet) Cursor() cursors.Cursor {
return r.heap.items[0].Cursor()
}
func (r *mergedResultSet) Tags() models.Tags {
return r.heap.items[0].Tags()
}
func (r *mergedResultSet) Stats() cursors.CursorStats {
return r.stats
}
type resultSetHeap struct {
items []ResultSet
}
func (h *resultSetHeap) init(results []ResultSet) {
if cap(h.items) < len(results) {
h.items = make([]ResultSet, 0, len(results))
} else {
h.items = h.items[:0]
}
for _, rs := range results {
if rs.Next() {
h.items = append(h.items, rs)
} else {
rs.Close()
}
}
heap.Init(h)
}
func (h *resultSetHeap) Less(i, j int) bool {
return models.CompareTags(h.items[i].Tags(), h.items[j].Tags()) == -1
}
func (h *resultSetHeap) Len() int {
return len(h.items)
}
func (h *resultSetHeap) Swap(i, j int) {
h.items[i], h.items[j] = h.items[j], h.items[i]
}
func (h *resultSetHeap) Push(x interface{}) {
panic("not implemented")
}
func (h *resultSetHeap) Pop() interface{} {
n := len(h.items)
item := h.items[n-1]
h.items[n-1] = nil
h.items = h.items[:n-1]
return item
}
// MergedStringIterator merges multiple storage.StringIterators into one.
// It sorts and deduplicates adjacent values, so the output is sorted iff all inputs are sorted.
// If all inputs are not sorted, then output order and deduplication are undefined and unpleasant.
type MergedStringIterator struct {
heap stringIteratorHeap
nextValue string
stats cursors.CursorStats
}
// API compatibility
var _ cursors.StringIterator = (*MergedStringIterator)(nil)
func NewMergedStringIterator(iterators []cursors.StringIterator) *MergedStringIterator {
nonEmptyIterators := make([]cursors.StringIterator, 0, len(iterators))
var stats cursors.CursorStats
for _, iterator := range iterators {
// All iterators must be Next()'d so that their Value() methods return a meaningful value, and sort properly.
if iterator.Next() {
nonEmptyIterators = append(nonEmptyIterators, iterator)
} else {
stats.Add(iterator.Stats())
}
}
msi := &MergedStringIterator{
heap: stringIteratorHeap{iterators: nonEmptyIterators},
stats: stats,
}
heap.Init(&msi.heap)
return msi
}
func (msi *MergedStringIterator) Next() bool {
for msi.heap.Len() > 0 {
iterator := msi.heap.iterators[0]
haveNext := false
if proposedNextValue := iterator.Value(); proposedNextValue != msi.nextValue { // Skip dupes.
msi.nextValue = proposedNextValue
haveNext = true
}
if iterator.Next() {
// iterator.Value() has changed, so re-order that iterator within the heap
heap.Fix(&msi.heap, 0)
} else {
// iterator is drained, so count the stats and remove it from the heap
msi.stats.Add(iterator.Stats())
heap.Pop(&msi.heap)
}
if haveNext {
return true
}
}
return false
}
func (msi *MergedStringIterator) Value() string {
return msi.nextValue
}
func (msi *MergedStringIterator) Stats() cursors.CursorStats {
return msi.stats
}
type stringIteratorHeap struct {
iterators []cursors.StringIterator
}
func (h stringIteratorHeap) Len() int {
return len(h.iterators)
}
func (h stringIteratorHeap) Less(i, j int) bool {
return h.iterators[i].Value() < h.iterators[j].Value()
}
func (h *stringIteratorHeap) Swap(i, j int) {
h.iterators[i], h.iterators[j] = h.iterators[j], h.iterators[i]
}
func (h *stringIteratorHeap) Push(x interface{}) {
h.iterators = append(h.iterators, x.(cursors.StringIterator))
}
func (h *stringIteratorHeap) Pop() interface{} {
n := len(h.iterators)
item := h.iterators[n-1]
h.iterators[n-1] = nil
h.iterators = h.iterators[:n-1]
return item
}

View File

@ -1,269 +0,0 @@
package reads_test
import (
"reflect"
"strings"
"testing"
"github.com/google/go-cmp/cmp"
"github.com/influxdata/influxdb/storage/reads"
"github.com/influxdata/influxdb/storage/reads/datatypes"
"github.com/influxdata/influxdb/tsdb/cursors"
)
func newStreamSeries(v ...string) *sliceStreamReader {
var frames []datatypes.ReadResponse_Frame
for _, s := range v {
frames = append(frames, seriesF(Float, s))
}
return newStreamReader(response(frames...))
}
func TestNewSequenceResultSet(t *testing.T) {
tests := []struct {
name string
streams []*sliceStreamReader
exp string
}{
{
name: "outer inner",
streams: []*sliceStreamReader{
newStreamSeries("m0,tag0=val01", "m0,tag0=val02"),
newStreamSeries("m0,tag0=val00", "m0,tag0=val03"),
},
exp: `series: _m=m0,tag0=val01
cursor:Float
series: _m=m0,tag0=val02
cursor:Float
series: _m=m0,tag0=val00
cursor:Float
series: _m=m0,tag0=val03
cursor:Float
`,
},
{
name: "sequential",
streams: []*sliceStreamReader{
newStreamSeries("m0,tag0=val00", "m0,tag0=val01"),
newStreamSeries("m0,tag0=val02", "m0,tag0=val03"),
},
exp: `series: _m=m0,tag0=val00
cursor:Float
series: _m=m0,tag0=val01
cursor:Float
series: _m=m0,tag0=val02
cursor:Float
series: _m=m0,tag0=val03
cursor:Float
`,
},
{
name: "single resultset",
streams: []*sliceStreamReader{
newStreamSeries("m0,tag0=val00", "m0,tag0=val01", "m0,tag0=val02", "m0,tag0=val03"),
},
exp: `series: _m=m0,tag0=val00
cursor:Float
series: _m=m0,tag0=val01
cursor:Float
series: _m=m0,tag0=val02
cursor:Float
series: _m=m0,tag0=val03
cursor:Float
`,
},
{
name: "single series ordered",
streams: []*sliceStreamReader{
newStreamSeries("m0,tag0=val00"),
newStreamSeries("m0,tag0=val01"),
newStreamSeries("m0,tag0=val02"),
newStreamSeries("m0,tag0=val03"),
},
exp: `series: _m=m0,tag0=val00
cursor:Float
series: _m=m0,tag0=val01
cursor:Float
series: _m=m0,tag0=val02
cursor:Float
series: _m=m0,tag0=val03
cursor:Float
`,
},
{
name: "single series random order",
streams: []*sliceStreamReader{
newStreamSeries("m0,tag0=val02"),
newStreamSeries("m0,tag0=val03"),
newStreamSeries("m0,tag0=val00"),
newStreamSeries("m0,tag0=val01"),
},
exp: `series: _m=m0,tag0=val02
cursor:Float
series: _m=m0,tag0=val03
cursor:Float
series: _m=m0,tag0=val00
cursor:Float
series: _m=m0,tag0=val01
cursor:Float
`,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
rss := make([]reads.ResultSet, len(tt.streams))
for i := range tt.streams {
rss[i] = reads.NewResultSetStreamReader(tt.streams[i])
}
rs := reads.NewSequenceResultSet(rss)
sb := new(strings.Builder)
ResultSetToString(sb, rs)
if got := sb.String(); !cmp.Equal(got, tt.exp) {
t.Errorf("unexpected value; -got/+exp\n%s", cmp.Diff(got, tt.exp))
}
})
}
}
func TestNewMergedResultSet(t *testing.T) {
exp := `series: _m=m0,tag0=val00
cursor:Float
series: _m=m0,tag0=val01
cursor:Float
series: _m=m0,tag0=val02
cursor:Float
series: _m=m0,tag0=val03
cursor:Float
`
tests := []struct {
name string
streams []*sliceStreamReader
exp string
}{
{
name: "outer inner",
streams: []*sliceStreamReader{
newStreamSeries("m0,tag0=val01", "m0,tag0=val02"),
newStreamSeries("m0,tag0=val00", "m0,tag0=val03"),
},
exp: exp,
},
{
name: "sequential",
streams: []*sliceStreamReader{
newStreamSeries("m0,tag0=val00", "m0,tag0=val01"),
newStreamSeries("m0,tag0=val02", "m0,tag0=val03"),
},
exp: exp,
},
{
name: "interleaved",
streams: []*sliceStreamReader{
newStreamSeries("m0,tag0=val01", "m0,tag0=val03"),
newStreamSeries("m0,tag0=val00", "m0,tag0=val02"),
},
exp: exp,
},
{
name: "single resultset",
streams: []*sliceStreamReader{
newStreamSeries("m0,tag0=val00", "m0,tag0=val01", "m0,tag0=val02", "m0,tag0=val03"),
},
exp: exp,
},
{
name: "single series ordered",
streams: []*sliceStreamReader{
newStreamSeries("m0,tag0=val00"),
newStreamSeries("m0,tag0=val01"),
newStreamSeries("m0,tag0=val02"),
newStreamSeries("m0,tag0=val03"),
},
exp: exp,
},
{
name: "single series random order",
streams: []*sliceStreamReader{
newStreamSeries("m0,tag0=val02"),
newStreamSeries("m0,tag0=val03"),
newStreamSeries("m0,tag0=val00"),
newStreamSeries("m0,tag0=val01"),
},
exp: exp,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
rss := make([]reads.ResultSet, len(tt.streams))
for i := range tt.streams {
rss[i] = reads.NewResultSetStreamReader(tt.streams[i])
}
rs := reads.NewMergedResultSet(rss)
sb := new(strings.Builder)
ResultSetToString(sb, rs)
if got := sb.String(); !cmp.Equal(got, tt.exp) {
t.Errorf("unexpected value; -got/+exp\n%s", cmp.Diff(got, tt.exp))
}
})
}
}
func TestNewMergedStringIterator(t *testing.T) {
tests := []struct {
name string
iterators []cursors.StringIterator
expectedValues []string
}{
{
name: "simple",
iterators: []cursors.StringIterator{
newMockStringIterator(1, 2, "bar", "foo"),
},
expectedValues: []string{"bar", "foo"},
},
{
name: "duplicates",
iterators: []cursors.StringIterator{
newMockStringIterator(1, 10, "c"),
newMockStringIterator(10, 100, "b", "b"), // This kind of duplication is not explicitly documented, but works.
newMockStringIterator(1, 10, "a", "c"),
newMockStringIterator(1, 10, "b", "d"),
newMockStringIterator(1, 10, "0", "a", "b", "e"),
},
expectedValues: []string{"0", "a", "b", "c", "d", "e"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
m := reads.NewMergedStringIterator(tt.iterators)
// Expect no stats before any iteration
var expectStats cursors.CursorStats
if !reflect.DeepEqual(expectStats, m.Stats()) {
t.Errorf("expected %+v, got %+v", expectStats, m.Stats())
}
var gotValues []string
for m.Next() {
gotValues = append(gotValues, m.Value())
}
if !reflect.DeepEqual(tt.expectedValues, gotValues) {
t.Errorf("expected %v, got %v", tt.expectedValues, gotValues)
}
for _, iterator := range tt.iterators {
expectStats.Add(iterator.Stats())
}
if !reflect.DeepEqual(expectStats, m.Stats()) {
t.Errorf("expected %+v, got %+v", expectStats, m.Stats())
}
})
}
}

View File

@ -1,259 +0,0 @@
package reads
import (
"bytes"
"sort"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/pkg/slices"
)
// groupNoneMergedGroupResultSet produces a single GroupCursor, merging all
// GroupResultSet#Keys
type groupNoneMergedGroupResultSet struct {
g []GroupResultSet
gc groupNoneMergedGroupCursor
done bool
}
// Returns a GroupResultSet that merges results using the datatypes.GroupNone
// strategy. Each source GroupResultSet in g must be configured using the
// GroupNone strategy or the results are undefined.
//
// The GroupNone strategy must merge the partition key and tag keys
// from each source GroupResultSet when producing its
func NewGroupNoneMergedGroupResultSet(g []GroupResultSet) GroupResultSet {
if len(g) == 0 {
return nil
} else if len(g) == 1 {
return g[0]
}
grs := &groupNoneMergedGroupResultSet{
g: g,
gc: groupNoneMergedGroupCursor{
mergedResultSet: mergedResultSet{first: true},
},
}
var km keyMerger
results := make([]ResultSet, 0, len(g))
for _, rs := range g {
if gc := rs.Next(); gc != nil {
results = append(results, gc)
km.mergeKeys(gc.Keys())
} else if rs.Err() != nil {
grs.done = true
grs.gc.err = rs.Err()
results = nil
break
}
}
if len(results) > 0 {
grs.gc.keys = km.get()
grs.gc.heap.init(results)
}
return grs
}
func (r *groupNoneMergedGroupResultSet) Next() GroupCursor {
if !r.done {
r.done = true
return &r.gc
}
return nil
}
func (r *groupNoneMergedGroupResultSet) Err() error { return r.gc.err }
func (r *groupNoneMergedGroupResultSet) Close() {
r.gc.Close()
for _, grs := range r.g {
grs.Close()
}
r.g = nil
}
type groupNoneMergedGroupCursor struct {
mergedResultSet
keys [][]byte
}
func (r *groupNoneMergedGroupCursor) Keys() [][]byte {
return r.keys
}
func (r *groupNoneMergedGroupCursor) PartitionKeyVals() [][]byte {
return nil
}
// groupByMergedGroupResultSet implements the GroupBy strategy.
type groupByMergedGroupResultSet struct {
items []*groupCursorItem
alt []*groupCursorItem
groupCursors []GroupCursor
resultSets []ResultSet
nilVal []byte
err error
km models.TagKeysSet
gc groupByMergedGroupCursor
}
// Returns a GroupResultSet that merges results using the datatypes.GroupBy
// strategy. Each source GroupResultSet in g must be configured using the
// GroupBy strategy with the same GroupKeys or the results are undefined.
func NewGroupByMergedGroupResultSet(g []GroupResultSet) GroupResultSet {
if len(g) == 0 {
return nil
} else if len(g) == 1 {
return g[0]
}
grs := &groupByMergedGroupResultSet{}
grs.nilVal = nilSortHi
grs.groupCursors = make([]GroupCursor, 0, len(g))
grs.resultSets = make([]ResultSet, 0, len(g))
grs.items = make([]*groupCursorItem, 0, len(g))
grs.alt = make([]*groupCursorItem, 0, len(g))
for _, rs := range g {
grs.items = append(grs.items, &groupCursorItem{grs: rs})
}
return grs
}
// next determines the cursors for the next partition key.
func (r *groupByMergedGroupResultSet) next() {
r.alt = r.alt[:0]
for i, item := range r.items {
if item.gc == nil {
item.gc = item.grs.Next()
if item.gc != nil {
r.alt = append(r.alt, item)
} else {
r.err = item.grs.Err()
item.grs.Close()
}
} else {
// append remaining non-nil cursors
r.alt = append(r.alt, r.items[i:]...)
break
}
}
r.items, r.alt = r.alt, r.items
if len(r.items) == 0 {
r.groupCursors = r.groupCursors[:0]
r.resultSets = r.resultSets[:0]
return
}
if r.err != nil {
r.Close()
return
}
sort.Slice(r.items, func(i, j int) bool {
return comparePartitionKey(r.items[i].gc.PartitionKeyVals(), r.items[j].gc.PartitionKeyVals(), r.nilVal) == -1
})
r.groupCursors = r.groupCursors[:1]
r.resultSets = r.resultSets[:1]
first := r.items[0].gc
r.groupCursors[0] = first
r.resultSets[0] = first
r.items[0].gc = nil
for i := 1; i < len(r.items); i++ {
if slices.CompareSlice(first.PartitionKeyVals(), r.items[i].gc.PartitionKeyVals()) == 0 {
r.groupCursors = append(r.groupCursors, r.items[i].gc)
r.resultSets = append(r.resultSets, r.items[i].gc)
r.items[i].gc = nil
}
}
}
func (r *groupByMergedGroupResultSet) Next() GroupCursor {
r.next()
if len(r.groupCursors) == 0 {
return nil
}
r.gc.first = true
r.gc.heap.init(r.resultSets)
r.km.Clear()
for i := range r.groupCursors {
r.km.UnionBytes(r.groupCursors[i].Keys())
}
r.gc.keys = append(r.gc.keys[:0], r.km.KeysBytes()...)
r.gc.vals = r.groupCursors[0].PartitionKeyVals()
return &r.gc
}
func (r *groupByMergedGroupResultSet) Err() error { return r.err }
func (r *groupByMergedGroupResultSet) Close() {
r.gc.Close()
for _, grs := range r.items {
if grs.gc != nil {
grs.gc.Close()
}
grs.grs.Close()
}
r.items = nil
r.alt = nil
}
type groupByMergedGroupCursor struct {
mergedResultSet
keys [][]byte
vals [][]byte
}
func (r *groupByMergedGroupCursor) Keys() [][]byte {
return r.keys
}
func (r *groupByMergedGroupCursor) PartitionKeyVals() [][]byte {
return r.vals
}
type groupCursorItem struct {
grs GroupResultSet
gc GroupCursor
}
func comparePartitionKey(a, b [][]byte, nilVal []byte) int {
i := 0
for i < len(a) && i < len(b) {
av, bv := a[i], b[i]
if len(av) == 0 {
av = nilVal
}
if len(bv) == 0 {
bv = nilVal
}
if v := bytes.Compare(av, bv); v == 0 {
i++
continue
} else {
return v
}
}
if i < len(b) {
// b is longer, so assume a is less
return -1
} else if i < len(a) {
// a is longer, so assume b is less
return 1
} else {
return 0
}
}

View File

@ -1,266 +0,0 @@
package reads_test
import (
"errors"
"strings"
"testing"
"github.com/google/go-cmp/cmp"
"github.com/influxdata/influxdb/storage/reads"
"github.com/influxdata/influxdb/storage/reads/datatypes"
)
func newGroupNoneStreamSeries(tagKeys string, v ...string) *sliceStreamReader {
var frames []datatypes.ReadResponse_Frame
frames = append(frames, groupF(tagKeys, ""))
for _, s := range v {
frames = append(frames, seriesF(Float, s))
}
return newStreamReader(response(frames...))
}
func TestNewGroupNoneMergedGroupResultSet(t *testing.T) {
exp := `group:
tag key : m0,tag0,tag1,tag2
partition key:
series: _m=m0,tag0=val00
cursor:Float
series: _m=m0,tag0=val01
cursor:Float
series: _m=m0,tag1=val10
cursor:Float
series: _m=m0,tag2=val20
cursor:Float
`
tests := []struct {
name string
streams []*sliceStreamReader
exp string
}{
{
name: "merge tagKey schemas series total order",
streams: []*sliceStreamReader{
newGroupNoneStreamSeries("m0,tag0", "m0,tag0=val00", "m0,tag0=val01"),
newGroupNoneStreamSeries("m0,tag1,tag2", "m0,tag1=val10", "m0,tag2=val20"),
},
exp: exp,
},
{
name: "merge tagKey schemas series mixed",
streams: []*sliceStreamReader{
newGroupNoneStreamSeries("m0,tag0,tag2", "m0,tag0=val01", "m0,tag2=val20"),
newGroupNoneStreamSeries("m0,tag0,tag1", "m0,tag0=val00", "m0,tag1=val10"),
},
exp: exp,
},
{
name: "merge single group schemas ordered",
streams: []*sliceStreamReader{
newGroupNoneStreamSeries("m0,tag0", "m0,tag0=val00"),
newGroupNoneStreamSeries("m0,tag0", "m0,tag0=val01"),
newGroupNoneStreamSeries("m0,tag1", "m0,tag1=val10"),
newGroupNoneStreamSeries("m0,tag2", "m0,tag2=val20"),
},
exp: exp,
},
{
name: "merge single group schemas unordered",
streams: []*sliceStreamReader{
newGroupNoneStreamSeries("m0,tag2", "m0,tag2=val20"),
newGroupNoneStreamSeries("m0,tag0", "m0,tag0=val00"),
newGroupNoneStreamSeries("m0,tag1", "m0,tag1=val10"),
newGroupNoneStreamSeries("m0,tag0", "m0,tag0=val01"),
},
exp: exp,
},
{
name: "merge single group",
streams: []*sliceStreamReader{
newGroupNoneStreamSeries("m0,tag0,tag1,tag2", "m0,tag0=val00", "m0,tag0=val01", "m0,tag1=val10", "m0,tag2=val20"),
},
exp: exp,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
grss := make([]reads.GroupResultSet, len(tt.streams))
for i := range tt.streams {
grss[i] = reads.NewGroupResultSetStreamReader(tt.streams[i])
}
grs := reads.NewGroupNoneMergedGroupResultSet(grss)
sb := new(strings.Builder)
GroupResultSetToString(sb, grs)
if got := sb.String(); !cmp.Equal(got, tt.exp) {
t.Errorf("unexpected value; -got/+exp\n%s", cmp.Diff(strings.Split(got, "\n"), strings.Split(tt.exp, "\n")))
}
grs.Close()
})
}
}
func TestGroupNoneMergedGroupResultSet_ErrNoData(t *testing.T) {
exp := "no data"
streams := []reads.StreamReader{
newGroupNoneStreamSeries("m0,tag2", "m0,tag2=val20"),
errStreamReader(exp),
}
grss := make([]reads.GroupResultSet, len(streams))
for i := range streams {
grss[i] = reads.NewGroupResultSetStreamReader(streams[i])
}
grs := reads.NewGroupNoneMergedGroupResultSet(grss)
if got := grs.Next(); got != nil {
t.Errorf("expected nil")
}
if got, expErr := grs.Err(), errors.New(exp); !cmp.Equal(got, expErr, cmp.Comparer(errCmp)) {
t.Errorf("unexpected error; -got/+exp\n%s", cmp.Diff(got, expErr, cmp.Transformer("err", errTr)))
}
}
func TestGroupNoneMergedGroupResultSet_ErrStreamNoData(t *testing.T) {
streams := []reads.StreamReader{
newGroupNoneStreamSeries("m0,tag2", "m0,tag2=val20"),
&emptyStreamReader{},
}
grss := make([]reads.GroupResultSet, len(streams))
for i := range streams {
grss[i] = reads.NewGroupResultSetStreamReader(streams[i])
}
grs := reads.NewGroupNoneMergedGroupResultSet(grss)
if got := grs.Next(); got != nil {
t.Errorf("expected nil")
}
if got, expErr := grs.Err(), reads.ErrStreamNoData; !cmp.Equal(got, expErr, cmp.Comparer(errCmp)) {
t.Errorf("unexpected error; -got/+exp\n%s", cmp.Diff(got, expErr, cmp.Transformer("err", errTr)))
}
}
func groupByF(tagKeys, parKeys string, v ...string) datatypes.ReadResponse {
var frames []datatypes.ReadResponse_Frame
frames = append(frames, groupF(tagKeys, parKeys))
for _, s := range v {
frames = append(frames, seriesF(Float, s))
}
return response(frames...)
}
func TestNewGroupByMergedGroupResultSet(t *testing.T) {
exp := `group:
tag key : _m,tag0,tag1
partition key: val00,<nil>
series: _m=aaa,tag0=val00
cursor:Float
series: _m=cpu,tag0=val00,tag1=val10
cursor:Float
series: _m=cpu,tag0=val00,tag1=val11
cursor:Float
series: _m=cpu,tag0=val00,tag1=val12
cursor:Float
group:
tag key : _m,tag0
partition key: val01,<nil>
series: _m=aaa,tag0=val01
cursor:Float
group:
tag key : _m,tag1,tag2
partition key: <nil>,val20
series: _m=mem,tag1=val10,tag2=val20
cursor:Float
series: _m=mem,tag1=val11,tag2=val20
cursor:Float
group:
tag key : _m,tag1,tag2
partition key: <nil>,val21
series: _m=mem,tag1=val11,tag2=val21
cursor:Float
`
tests := []struct {
name string
streams []*sliceStreamReader
exp string
}{
{
streams: []*sliceStreamReader{
newStreamReader(
groupByF("_m,tag0,tag1", "val00,<nil>", "aaa,tag0=val00", "cpu,tag0=val00,tag1=val11"),
groupByF("_m,tag1,tag2", "<nil>,val20", "mem,tag1=val10,tag2=val20"),
groupByF("_m,tag1,tag2", "<nil>,val21", "mem,tag1=val11,tag2=val21"),
),
newStreamReader(
groupByF("_m,tag0,tag1", "val00,<nil>", "cpu,tag0=val00,tag1=val10", "cpu,tag0=val00,tag1=val12"),
groupByF("_m,tag0", "val01,<nil>", "aaa,tag0=val01"),
),
newStreamReader(
groupByF("_m,tag1,tag2", "<nil>,val20", "mem,tag1=val11,tag2=val20"),
),
},
exp: exp,
},
{
streams: []*sliceStreamReader{
newStreamReader(
groupByF("_m,tag1,tag2", "<nil>,val20", "mem,tag1=val10,tag2=val20"),
groupByF("_m,tag1,tag2", "<nil>,val21", "mem,tag1=val11,tag2=val21"),
),
newStreamReader(
groupByF("_m,tag1,tag2", "<nil>,val20", "mem,tag1=val11,tag2=val20"),
),
newStreamReader(
groupByF("_m,tag0,tag1", "val00,<nil>", "cpu,tag0=val00,tag1=val10", "cpu,tag0=val00,tag1=val12"),
groupByF("_m,tag0", "val01,<nil>", "aaa,tag0=val01"),
),
newStreamReader(
groupByF("_m,tag0,tag1", "val00,<nil>", "aaa,tag0=val00", "cpu,tag0=val00,tag1=val11"),
),
},
exp: exp,
},
{
name: "does merge keys",
streams: []*sliceStreamReader{
newStreamReader(
groupByF("_m,tag1", "val00,<nil>", "aaa,tag0=val00", "cpu,tag0=val00,tag1=val11"),
groupByF("_m,tag2", "<nil>,val20", "mem,tag1=val10,tag2=val20"),
groupByF("_m,tag1,tag2", "<nil>,val21", "mem,tag1=val11,tag2=val21"),
),
newStreamReader(
groupByF("_m,tag0,tag1", "val00,<nil>", "cpu,tag0=val00,tag1=val10", "cpu,tag0=val00,tag1=val12"),
groupByF("_m,tag0", "val01,<nil>", "aaa,tag0=val01"),
),
newStreamReader(
groupByF("_m,tag1", "<nil>,val20", "mem,tag1=val11,tag2=val20"),
),
},
exp: exp,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
grss := make([]reads.GroupResultSet, len(tt.streams))
for i := range tt.streams {
grss[i] = reads.NewGroupResultSetStreamReader(tt.streams[i])
}
grs := reads.NewGroupByMergedGroupResultSet(grss)
sb := new(strings.Builder)
GroupResultSetToString(sb, grs, SkipNilCursor())
if got := sb.String(); !cmp.Equal(got, tt.exp) {
t.Errorf("unexpected value; -got/+exp\n%s", cmp.Diff(strings.Split(got, "\n"), strings.Split(tt.exp, "\n")))
}
grs.Close()
})
}
}

View File

@ -1,16 +0,0 @@
// Code generated by "stringer -type=readState -trimprefix=state"; DO NOT EDIT.
package reads
import "strconv"
const _readState_name = "ReadGroupReadSeriesReadPointsReadFloatPointsReadIntegerPointsReadUnsignedPointsReadBooleanPointsReadStringPointsReadErrDone"
var _readState_index = [...]uint8{0, 9, 19, 29, 44, 61, 79, 96, 112, 119, 123}
func (i readState) String() string {
if i >= readState(len(_readState_index)-1) {
return "readState(" + strconv.FormatInt(int64(i), 10) + ")"
}
return _readState_name[_readState_index[i]:_readState_index[i+1]]
}

View File

@ -11,7 +11,7 @@ import (
"github.com/influxdata/influxdb/tsdb/cursors"
)
func CursorToString(wr io.Writer, cur cursors.Cursor, opts ...optionFn) {
func cursorToString(wr io.Writer, cur cursors.Cursor) {
switch ccur := cur.(type) {
case cursors.IntegerArrayCursor:
fmt.Fprintln(wr, "Integer")
@ -87,10 +87,6 @@ func CursorToString(wr io.Writer, cur cursors.Cursor, opts ...optionFn) {
const nilVal = "<nil>"
var (
nilValBytes = []byte(nilVal)
)
func joinString(b [][]byte) string {
s := make([]string, len(b))
for i := range b {
@ -104,14 +100,14 @@ func joinString(b [][]byte) string {
return strings.Join(s, ",")
}
func TagsToString(wr io.Writer, tags models.Tags, opts ...optionFn) {
func tagsToString(wr io.Writer, tags models.Tags, opts ...optionFn) {
if k := tags.HashKey(); len(k) > 0 {
fmt.Fprintf(wr, "%s", string(k[1:]))
}
fmt.Fprintln(wr)
}
func ResultSetToString(wr io.Writer, rs reads.ResultSet, opts ...optionFn) {
func resultSetToString(wr io.Writer, rs reads.ResultSet, opts ...optionFn) {
var po PrintOptions
for _, o := range opts {
o(&po)
@ -122,7 +118,7 @@ func ResultSetToString(wr io.Writer, rs reads.ResultSet, opts ...optionFn) {
for rs.Next() {
fmt.Fprint(wr, "series: ")
TagsToString(wr, rs.Tags())
tagsToString(wr, rs.Tags())
cur := rs.Cursor()
if po.SkipNilCursor && cur == nil {
@ -137,7 +133,7 @@ func ResultSetToString(wr io.Writer, rs reads.ResultSet, opts ...optionFn) {
goto LOOP
}
CursorToString(wr, cur)
cursorToString(wr, cur)
LOOP:
iw.Indent(-2)
}
@ -154,7 +150,7 @@ func GroupResultSetToString(wr io.Writer, rs reads.GroupResultSet, opts ...optio
fmt.Fprintf(wr, "tag key : %s\n", joinString(gc.Keys()))
fmt.Fprintf(wr, "partition key: %s\n", joinString(gc.PartitionKeyVals()))
iw.Indent(2)
ResultSetToString(wr, gc, opts...)
resultSetToString(wr, gc, opts...)
iw.Indent(-4)
gc = rs.Next()
}

View File

@ -1,249 +0,0 @@
// Generated by tmpl
// https://github.com/benbjohnson/tmpl
//
// DO NOT EDIT!
// Source: stream_reader.gen.go.tmpl
package reads
import (
"fmt"
"github.com/influxdata/influxdb/storage/reads/datatypes"
"github.com/influxdata/influxdb/tsdb/cursors"
)
type floatCursorStreamReader struct {
fr *frameReader
a cursors.FloatArray
}
func (c *floatCursorStreamReader) Close() {
for c.fr.state == stateReadFloatPoints {
c.readFrame()
}
}
func (c *floatCursorStreamReader) Err() error { return c.fr.err }
func (c *floatCursorStreamReader) Next() *cursors.FloatArray {
if c.fr.state == stateReadFloatPoints {
c.readFrame()
}
return &c.a
}
func (c *floatCursorStreamReader) readFrame() {
c.a.Timestamps = nil
c.a.Values = nil
if f := c.fr.peekFrame(); f != nil {
switch ff := f.Data.(type) {
case *datatypes.ReadResponse_Frame_FloatPoints:
c.a.Timestamps = ff.FloatPoints.Timestamps
c.a.Values = ff.FloatPoints.Values
c.fr.nextFrame()
case *datatypes.ReadResponse_Frame_Series:
c.fr.state = stateReadSeries
case *datatypes.ReadResponse_Frame_Group:
c.fr.state = stateReadGroup
default:
c.fr.setErr(fmt.Errorf("floatCursorStreamReader: unexpected frame type %T", f.Data))
}
}
}
func (c *floatCursorStreamReader) Stats() cursors.CursorStats {
return c.fr.stats.Stats()
}
type integerCursorStreamReader struct {
fr *frameReader
a cursors.IntegerArray
}
func (c *integerCursorStreamReader) Close() {
for c.fr.state == stateReadIntegerPoints {
c.readFrame()
}
}
func (c *integerCursorStreamReader) Err() error { return c.fr.err }
func (c *integerCursorStreamReader) Next() *cursors.IntegerArray {
if c.fr.state == stateReadIntegerPoints {
c.readFrame()
}
return &c.a
}
func (c *integerCursorStreamReader) readFrame() {
c.a.Timestamps = nil
c.a.Values = nil
if f := c.fr.peekFrame(); f != nil {
switch ff := f.Data.(type) {
case *datatypes.ReadResponse_Frame_IntegerPoints:
c.a.Timestamps = ff.IntegerPoints.Timestamps
c.a.Values = ff.IntegerPoints.Values
c.fr.nextFrame()
case *datatypes.ReadResponse_Frame_Series:
c.fr.state = stateReadSeries
case *datatypes.ReadResponse_Frame_Group:
c.fr.state = stateReadGroup
default:
c.fr.setErr(fmt.Errorf("integerCursorStreamReader: unexpected frame type %T", f.Data))
}
}
}
func (c *integerCursorStreamReader) Stats() cursors.CursorStats {
return c.fr.stats.Stats()
}
type unsignedCursorStreamReader struct {
fr *frameReader
a cursors.UnsignedArray
}
func (c *unsignedCursorStreamReader) Close() {
for c.fr.state == stateReadUnsignedPoints {
c.readFrame()
}
}
func (c *unsignedCursorStreamReader) Err() error { return c.fr.err }
func (c *unsignedCursorStreamReader) Next() *cursors.UnsignedArray {
if c.fr.state == stateReadUnsignedPoints {
c.readFrame()
}
return &c.a
}
func (c *unsignedCursorStreamReader) readFrame() {
c.a.Timestamps = nil
c.a.Values = nil
if f := c.fr.peekFrame(); f != nil {
switch ff := f.Data.(type) {
case *datatypes.ReadResponse_Frame_UnsignedPoints:
c.a.Timestamps = ff.UnsignedPoints.Timestamps
c.a.Values = ff.UnsignedPoints.Values
c.fr.nextFrame()
case *datatypes.ReadResponse_Frame_Series:
c.fr.state = stateReadSeries
case *datatypes.ReadResponse_Frame_Group:
c.fr.state = stateReadGroup
default:
c.fr.setErr(fmt.Errorf("unsignedCursorStreamReader: unexpected frame type %T", f.Data))
}
}
}
func (c *unsignedCursorStreamReader) Stats() cursors.CursorStats {
return c.fr.stats.Stats()
}
type stringCursorStreamReader struct {
fr *frameReader
a cursors.StringArray
}
func (c *stringCursorStreamReader) Close() {
for c.fr.state == stateReadStringPoints {
c.readFrame()
}
}
func (c *stringCursorStreamReader) Err() error { return c.fr.err }
func (c *stringCursorStreamReader) Next() *cursors.StringArray {
if c.fr.state == stateReadStringPoints {
c.readFrame()
}
return &c.a
}
func (c *stringCursorStreamReader) readFrame() {
c.a.Timestamps = nil
c.a.Values = nil
if f := c.fr.peekFrame(); f != nil {
switch ff := f.Data.(type) {
case *datatypes.ReadResponse_Frame_StringPoints:
c.a.Timestamps = ff.StringPoints.Timestamps
c.a.Values = ff.StringPoints.Values
c.fr.nextFrame()
case *datatypes.ReadResponse_Frame_Series:
c.fr.state = stateReadSeries
case *datatypes.ReadResponse_Frame_Group:
c.fr.state = stateReadGroup
default:
c.fr.setErr(fmt.Errorf("stringCursorStreamReader: unexpected frame type %T", f.Data))
}
}
}
func (c *stringCursorStreamReader) Stats() cursors.CursorStats {
return c.fr.stats.Stats()
}
type booleanCursorStreamReader struct {
fr *frameReader
a cursors.BooleanArray
}
func (c *booleanCursorStreamReader) Close() {
for c.fr.state == stateReadBooleanPoints {
c.readFrame()
}
}
func (c *booleanCursorStreamReader) Err() error { return c.fr.err }
func (c *booleanCursorStreamReader) Next() *cursors.BooleanArray {
if c.fr.state == stateReadBooleanPoints {
c.readFrame()
}
return &c.a
}
func (c *booleanCursorStreamReader) readFrame() {
c.a.Timestamps = nil
c.a.Values = nil
if f := c.fr.peekFrame(); f != nil {
switch ff := f.Data.(type) {
case *datatypes.ReadResponse_Frame_BooleanPoints:
c.a.Timestamps = ff.BooleanPoints.Timestamps
c.a.Values = ff.BooleanPoints.Values
c.fr.nextFrame()
case *datatypes.ReadResponse_Frame_Series:
c.fr.state = stateReadSeries
case *datatypes.ReadResponse_Frame_Group:
c.fr.state = stateReadGroup
default:
c.fr.setErr(fmt.Errorf("booleanCursorStreamReader: unexpected frame type %T", f.Data))
}
}
}
func (c *booleanCursorStreamReader) Stats() cursors.CursorStats {
return c.fr.stats.Stats()
}

View File

@ -1,58 +0,0 @@
package reads
import (
"fmt"
"github.com/influxdata/influxdb/storage/reads/datatypes"
"github.com/influxdata/influxdb/tsdb/cursors"
)
{{range .}}
type {{.name}}CursorStreamReader struct {
fr *frameReader
a cursors.{{.Name}}Array
}
func (c *{{.name}}CursorStreamReader) Close() {
for c.fr.state == stateRead{{.Name}}Points {
c.readFrame()
}
}
func (c *{{.name}}CursorStreamReader) Err() error { return c.fr.err }
func (c *{{.name}}CursorStreamReader) Next() *cursors.{{.Name}}Array {
if c.fr.state == stateRead{{.Name}}Points {
c.readFrame()
}
return &c.a
}
func (c *{{.name}}CursorStreamReader) readFrame() {
c.a.Timestamps = nil
c.a.Values = nil
if f := c.fr.peekFrame(); f != nil {
switch ff := f.Data.(type) {
case *datatypes.ReadResponse_Frame_{{.Name}}Points:
c.a.Timestamps = ff.{{.Name}}Points.Timestamps
c.a.Values = ff.{{.Name}}Points.Values
c.fr.nextFrame()
case *datatypes.ReadResponse_Frame_Series:
c.fr.state = stateReadSeries
case *datatypes.ReadResponse_Frame_Group:
c.fr.state = stateReadGroup
default:
c.fr.setErr(fmt.Errorf("{{.name}}CursorStreamReader: unexpected frame type %T", f.Data))
}
}
}
func (c *{{.name}}CursorStreamReader) Stats() cursors.CursorStats {
return c.fr.stats.Stats()
}
{{end}}

View File

@ -1,455 +0,0 @@
package reads
import (
"errors"
"fmt"
"io"
"strconv"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/storage/reads/datatypes"
"github.com/influxdata/influxdb/tsdb/cursors"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)
var (
// ErrPartitionKeyOrder means the partition keys for a
// GroupResultSetStreamReader were incorrectly ordered.
ErrPartitionKeyOrder = errors.New("invalid partition key order")
// ErrStreamNoData means the StreamReader repeatedly returned no data
// when calling Recv
ErrStreamNoData = errors.New("peekFrame: no data")
)
// peekFrameRetries specifies the number of times peekFrame will
// retry before returning ErrStreamNoData when StreamReader.Recv
// returns an empty result.
const peekFrameRetries = 2
type StreamReader interface {
Recv() (*datatypes.ReadResponse, error)
}
// statistics is the interface which wraps the Stats method.
type statistics interface {
Stats() cursors.CursorStats
}
var zeroStatistics statistics = &emptyStatistics{}
type emptyStatistics struct{}
func (*emptyStatistics) Stats() cursors.CursorStats {
return cursors.CursorStats{}
}
type StreamClient interface {
StreamReader
grpc.ClientStream
}
// StorageReadClient adapts a grpc client to implement the cursors.Statistics
// interface and read the statistics from the gRPC trailer.
type StorageReadClient struct {
client StreamClient
trailer metadata.MD
}
// NewStorageReadClient returns a new StorageReadClient which implements
// StreamReader and reads the gRPC trailer to return CursorStats.
func NewStorageReadClient(client StreamClient) *StorageReadClient {
return &StorageReadClient{client: client}
}
func (rc *StorageReadClient) Recv() (res *datatypes.ReadResponse, err error) {
res, err = rc.client.Recv()
if err != nil {
rc.trailer = rc.client.Trailer()
}
return res, err
}
func (rc *StorageReadClient) Stats() (stats cursors.CursorStats) {
for _, s := range rc.trailer.Get("scanned-bytes") {
v, err := strconv.Atoi(s)
if err != nil {
continue
}
stats.ScannedBytes += v
}
for _, s := range rc.trailer.Get("scanned-values") {
v, err := strconv.Atoi(s)
if err != nil {
continue
}
stats.ScannedValues += v
}
return stats
}
type ResultSetStreamReader struct {
fr frameReader
cur cursorReaders
tags models.Tags
prev models.Tags
}
func NewResultSetStreamReader(stream StreamReader) *ResultSetStreamReader {
r := &ResultSetStreamReader{fr: frameReader{s: stream, state: stateReadSeries}}
r.fr.init()
r.cur.setFrameReader(&r.fr)
return r
}
func (r *ResultSetStreamReader) Err() error { return r.fr.err }
func (r *ResultSetStreamReader) Close() { r.fr.state = stateDone }
func (r *ResultSetStreamReader) Cursor() cursors.Cursor { return r.cur.cursor() }
func (r *ResultSetStreamReader) Stats() cursors.CursorStats {
return r.fr.stats.Stats()
}
// Peek reads the next frame on the underlying stream-reader
// if there is one
func (r *ResultSetStreamReader) Peek() {
r.fr.peekFrame()
}
func (r *ResultSetStreamReader) Next() bool {
if r.fr.state == stateReadSeries {
return r.readSeriesFrame()
}
if r.fr.state == stateDone || r.fr.state == stateReadErr {
return false
}
r.fr.setErr(fmt.Errorf("expected reader in state %v, was in state %v", stateReadSeries, r.fr.state))
return false
}
func (r *ResultSetStreamReader) readSeriesFrame() bool {
f := r.fr.peekFrame()
if f == nil {
return false
}
r.fr.nextFrame()
if sf, ok := f.Data.(*datatypes.ReadResponse_Frame_Series); ok {
r.fr.state = stateReadPoints
r.prev, r.tags = r.tags, r.prev
if cap(r.tags) < len(sf.Series.Tags) {
r.tags = make(models.Tags, len(sf.Series.Tags))
} else {
r.tags = r.tags[:len(sf.Series.Tags)]
}
for i := range sf.Series.Tags {
r.tags[i].Key = sf.Series.Tags[i].Key
r.tags[i].Value = sf.Series.Tags[i].Value
}
r.cur.nextType = sf.Series.DataType
return true
} else {
r.fr.setErr(fmt.Errorf("expected series frame, got %T", f.Data))
}
return false
}
func (r *ResultSetStreamReader) Tags() models.Tags {
return r.tags
}
type GroupResultSetStreamReader struct {
fr frameReader
gc groupCursorStreamReader
}
func NewGroupResultSetStreamReader(stream StreamReader) *GroupResultSetStreamReader {
r := &GroupResultSetStreamReader{fr: frameReader{s: stream, state: stateReadGroup}}
r.fr.init()
r.gc.fr = &r.fr
r.gc.cur.setFrameReader(&r.fr)
return r
}
func (r *GroupResultSetStreamReader) Err() error { return r.fr.err }
// Peek reads the next frame on the underlying stream-reader
// if there is one
func (r *GroupResultSetStreamReader) Peek() {
r.fr.peekFrame()
}
func (r *GroupResultSetStreamReader) Next() GroupCursor {
if r.fr.state == stateReadGroup {
return r.readGroupFrame()
}
if r.fr.state == stateDone || r.fr.state == stateReadErr {
return nil
}
r.fr.setErr(fmt.Errorf("expected reader in state %v, was in state %v", stateReadGroup, r.fr.state))
return nil
}
func (r *GroupResultSetStreamReader) readGroupFrame() GroupCursor {
f := r.fr.peekFrame()
if f == nil {
return nil
}
r.fr.nextFrame()
if sf, ok := f.Data.(*datatypes.ReadResponse_Frame_Group); ok {
r.fr.state = stateReadSeries
if cap(r.gc.tagKeys) < len(sf.Group.TagKeys) {
r.gc.tagKeys = make([][]byte, len(sf.Group.TagKeys))
} else {
r.gc.tagKeys = r.gc.tagKeys[:len(sf.Group.TagKeys)]
}
copy(r.gc.tagKeys, sf.Group.TagKeys)
r.gc.partitionKeyVals, r.gc.prevKey = r.gc.prevKey, r.gc.partitionKeyVals
if cap(r.gc.partitionKeyVals) < len(sf.Group.PartitionKeyVals) {
r.gc.partitionKeyVals = make([][]byte, len(sf.Group.PartitionKeyVals))
} else {
r.gc.partitionKeyVals = r.gc.partitionKeyVals[:len(sf.Group.PartitionKeyVals)]
}
copy(r.gc.partitionKeyVals, sf.Group.PartitionKeyVals)
if comparePartitionKey(r.gc.partitionKeyVals, r.gc.prevKey, nilSortHi) == 1 || r.gc.prevKey == nil {
return &r.gc
}
r.fr.setErr(ErrPartitionKeyOrder)
} else {
r.fr.setErr(fmt.Errorf("expected group frame, got %T", f.Data))
}
return nil
}
func (r *GroupResultSetStreamReader) Close() {
r.fr.state = stateDone
}
type groupCursorStreamReader struct {
fr *frameReader
cur cursorReaders
tagKeys [][]byte
partitionKeyVals [][]byte
prevKey [][]byte
tags models.Tags
}
func (gc *groupCursorStreamReader) Err() error { return gc.fr.err }
func (gc *groupCursorStreamReader) Tags() models.Tags { return gc.tags }
func (gc *groupCursorStreamReader) Keys() [][]byte { return gc.tagKeys }
func (gc *groupCursorStreamReader) PartitionKeyVals() [][]byte { return gc.partitionKeyVals }
func (gc *groupCursorStreamReader) Cursor() cursors.Cursor { return gc.cur.cursor() }
func (gc *groupCursorStreamReader) Stats() cursors.CursorStats {
return gc.fr.stats.Stats()
}
func (gc *groupCursorStreamReader) Next() bool {
if gc.fr.state == stateReadSeries {
return gc.readSeriesFrame()
}
if gc.fr.state == stateDone || gc.fr.state == stateReadErr || gc.fr.state == stateReadGroup {
return false
}
gc.fr.setErr(fmt.Errorf("expected reader in state %v, was in state %v", stateReadSeries, gc.fr.state))
return false
}
func (gc *groupCursorStreamReader) readSeriesFrame() bool {
f := gc.fr.peekFrame()
if f == nil {
return false
}
if sf, ok := f.Data.(*datatypes.ReadResponse_Frame_Series); ok {
gc.fr.nextFrame()
gc.fr.state = stateReadPoints
if cap(gc.tags) < len(sf.Series.Tags) {
gc.tags = make(models.Tags, len(sf.Series.Tags))
} else {
gc.tags = gc.tags[:len(sf.Series.Tags)]
}
for i := range sf.Series.Tags {
gc.tags[i].Key = sf.Series.Tags[i].Key
gc.tags[i].Value = sf.Series.Tags[i].Value
}
gc.cur.nextType = sf.Series.DataType
return true
} else if _, ok := f.Data.(*datatypes.ReadResponse_Frame_Group); ok {
gc.fr.state = stateReadGroup
return false
}
gc.fr.setErr(fmt.Errorf("expected series frame, got %T", f.Data))
return false
}
func (gc *groupCursorStreamReader) Close() {
RETRY:
if gc.fr.state == stateReadPoints {
cur := gc.Cursor()
if cur != nil {
cur.Close()
}
}
if gc.fr.state == stateReadSeries {
gc.readSeriesFrame()
goto RETRY
}
}
type readState byte
const (
stateReadGroup readState = iota
stateReadSeries
stateReadPoints
stateReadFloatPoints
stateReadIntegerPoints
stateReadUnsignedPoints
stateReadBooleanPoints
stateReadStringPoints
stateReadErr
stateDone
)
type frameReader struct {
s StreamReader
stats statistics
state readState
buf []datatypes.ReadResponse_Frame
p int
err error
}
func (r *frameReader) init() {
if stats, ok := r.s.(statistics); ok {
r.stats = stats
} else {
r.stats = zeroStatistics
}
}
func (r *frameReader) peekFrame() *datatypes.ReadResponse_Frame {
retries := peekFrameRetries
RETRY:
if r.p < len(r.buf) {
f := &r.buf[r.p]
return f
}
r.p = 0
r.buf = nil
res, err := r.s.Recv()
if err == nil {
if res != nil {
r.buf = res.Frames
}
if retries > 0 {
retries--
goto RETRY
}
r.setErr(ErrStreamNoData)
} else if err == io.EOF {
r.state = stateDone
} else {
r.setErr(err)
}
return nil
}
func (r *frameReader) nextFrame() { r.p++ }
func (r *frameReader) setErr(err error) {
r.err = err
r.state = stateReadErr
}
type cursorReaders struct {
fr *frameReader
nextType datatypes.ReadResponse_DataType
cc cursors.Cursor
f floatCursorStreamReader
i integerCursorStreamReader
u unsignedCursorStreamReader
b booleanCursorStreamReader
s stringCursorStreamReader
}
func (cur *cursorReaders) setFrameReader(fr *frameReader) {
cur.fr = fr
cur.f.fr = fr
cur.i.fr = fr
cur.u.fr = fr
cur.b.fr = fr
cur.s.fr = fr
}
func (cur *cursorReaders) cursor() cursors.Cursor {
cur.cc = nil
if cur.fr.state != stateReadPoints {
cur.fr.setErr(fmt.Errorf("expected reader in state %v, was in state %v", stateReadPoints, cur.fr.state))
return cur.cc
}
switch cur.nextType {
case datatypes.DataTypeFloat:
cur.fr.state = stateReadFloatPoints
cur.cc = &cur.f
case datatypes.DataTypeInteger:
cur.fr.state = stateReadIntegerPoints
cur.cc = &cur.i
case datatypes.DataTypeUnsigned:
cur.fr.state = stateReadUnsignedPoints
cur.cc = &cur.u
case datatypes.DataTypeBoolean:
cur.fr.state = stateReadBooleanPoints
cur.cc = &cur.b
case datatypes.DataTypeString:
cur.fr.state = stateReadStringPoints
cur.cc = &cur.s
default:
cur.fr.setErr(fmt.Errorf("unexpected data type, %d", cur.nextType))
}
return cur.cc
}

View File

@ -1,163 +0,0 @@
// Generated by tmpl
// https://github.com/benbjohnson/tmpl
//
// DO NOT EDIT!
// Source: stream_reader_gen_test.go.tmpl
package reads_test
import (
"sort"
"github.com/influxdata/influxdb/storage/reads/datatypes"
)
type FloatPoints datatypes.ReadResponse_FloatPointsFrame
func (a FloatPoints) Len() int { return len(a.Timestamps) }
func (a FloatPoints) Less(i, j int) bool { return a.Timestamps[i] < a.Timestamps[j] }
func (a FloatPoints) Swap(i, j int) {
a.Timestamps[i], a.Timestamps[j] = a.Timestamps[j], a.Timestamps[i]
a.Values[i], a.Values[j] = a.Values[j], a.Values[i]
}
type floatS map[int64]float64
func floatF(points floatS) datatypes.ReadResponse_Frame {
var block FloatPoints
for t, v := range points {
block.Timestamps = append(block.Timestamps, t)
block.Values = append(block.Values, v)
}
sort.Sort(block)
pointsFrame := datatypes.ReadResponse_FloatPointsFrame(block)
return datatypes.ReadResponse_Frame{
Data: &datatypes.ReadResponse_Frame_FloatPoints{
FloatPoints: &pointsFrame,
},
}
}
type IntegerPoints datatypes.ReadResponse_IntegerPointsFrame
func (a IntegerPoints) Len() int { return len(a.Timestamps) }
func (a IntegerPoints) Less(i, j int) bool { return a.Timestamps[i] < a.Timestamps[j] }
func (a IntegerPoints) Swap(i, j int) {
a.Timestamps[i], a.Timestamps[j] = a.Timestamps[j], a.Timestamps[i]
a.Values[i], a.Values[j] = a.Values[j], a.Values[i]
}
type integerS map[int64]int64
func integerF(points integerS) datatypes.ReadResponse_Frame {
var block IntegerPoints
for t, v := range points {
block.Timestamps = append(block.Timestamps, t)
block.Values = append(block.Values, v)
}
sort.Sort(block)
pointsFrame := datatypes.ReadResponse_IntegerPointsFrame(block)
return datatypes.ReadResponse_Frame{
Data: &datatypes.ReadResponse_Frame_IntegerPoints{
IntegerPoints: &pointsFrame,
},
}
}
type UnsignedPoints datatypes.ReadResponse_UnsignedPointsFrame
func (a UnsignedPoints) Len() int { return len(a.Timestamps) }
func (a UnsignedPoints) Less(i, j int) bool { return a.Timestamps[i] < a.Timestamps[j] }
func (a UnsignedPoints) Swap(i, j int) {
a.Timestamps[i], a.Timestamps[j] = a.Timestamps[j], a.Timestamps[i]
a.Values[i], a.Values[j] = a.Values[j], a.Values[i]
}
type unsignedS map[int64]uint64
func unsignedF(points unsignedS) datatypes.ReadResponse_Frame {
var block UnsignedPoints
for t, v := range points {
block.Timestamps = append(block.Timestamps, t)
block.Values = append(block.Values, v)
}
sort.Sort(block)
pointsFrame := datatypes.ReadResponse_UnsignedPointsFrame(block)
return datatypes.ReadResponse_Frame{
Data: &datatypes.ReadResponse_Frame_UnsignedPoints{
UnsignedPoints: &pointsFrame,
},
}
}
type StringPoints datatypes.ReadResponse_StringPointsFrame
func (a StringPoints) Len() int { return len(a.Timestamps) }
func (a StringPoints) Less(i, j int) bool { return a.Timestamps[i] < a.Timestamps[j] }
func (a StringPoints) Swap(i, j int) {
a.Timestamps[i], a.Timestamps[j] = a.Timestamps[j], a.Timestamps[i]
a.Values[i], a.Values[j] = a.Values[j], a.Values[i]
}
type stringS map[int64]string
func stringF(points stringS) datatypes.ReadResponse_Frame {
var block StringPoints
for t, v := range points {
block.Timestamps = append(block.Timestamps, t)
block.Values = append(block.Values, v)
}
sort.Sort(block)
pointsFrame := datatypes.ReadResponse_StringPointsFrame(block)
return datatypes.ReadResponse_Frame{
Data: &datatypes.ReadResponse_Frame_StringPoints{
StringPoints: &pointsFrame,
},
}
}
type BooleanPoints datatypes.ReadResponse_BooleanPointsFrame
func (a BooleanPoints) Len() int { return len(a.Timestamps) }
func (a BooleanPoints) Less(i, j int) bool { return a.Timestamps[i] < a.Timestamps[j] }
func (a BooleanPoints) Swap(i, j int) {
a.Timestamps[i], a.Timestamps[j] = a.Timestamps[j], a.Timestamps[i]
a.Values[i], a.Values[j] = a.Values[j], a.Values[i]
}
type booleanS map[int64]bool
func booleanF(points booleanS) datatypes.ReadResponse_Frame {
var block BooleanPoints
for t, v := range points {
block.Timestamps = append(block.Timestamps, t)
block.Values = append(block.Values, v)
}
sort.Sort(block)
pointsFrame := datatypes.ReadResponse_BooleanPointsFrame(block)
return datatypes.ReadResponse_Frame{
Data: &datatypes.ReadResponse_Frame_BooleanPoints{
BooleanPoints: &pointsFrame,
},
}
}

View File

@ -1,40 +0,0 @@
package reads_test
import (
"sort"
"github.com/influxdata/influxdb/storage/reads/datatypes"
)
{{range .}}
type {{.Name}}Points datatypes.ReadResponse_{{.Name}}PointsFrame
func (a {{.Name}}Points) Len() int { return len(a.Timestamps) }
func (a {{.Name}}Points) Less(i, j int) bool { return a.Timestamps[i] < a.Timestamps[j] }
func (a {{.Name}}Points) Swap(i, j int) {
a.Timestamps[i], a.Timestamps[j] = a.Timestamps[j], a.Timestamps[i]
a.Values[i], a.Values[j] = a.Values[j], a.Values[i]
}
{{$type := print .name "S"}}
type {{$type}} map[int64]{{.Type}}
func {{.name}}F(points {{$type}}) datatypes.ReadResponse_Frame {
var block {{.Name}}Points
for t, v := range points {
block.Timestamps = append(block.Timestamps, t)
block.Values = append(block.Values, v)
}
sort.Sort(block)
pointsFrame := datatypes.ReadResponse_{{.Name}}PointsFrame(block)
return datatypes.ReadResponse_Frame{
Data: &datatypes.ReadResponse_Frame_{{.Name}}Points{
{{.Name}}Points: &pointsFrame,
},
}
}
{{end}}

View File

@ -1,819 +0,0 @@
package reads_test
import (
"bytes"
"errors"
"io"
"strings"
"testing"
"github.com/google/go-cmp/cmp"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/storage/reads"
"github.com/influxdata/influxdb/storage/reads/datatypes"
)
func errCmp(x, y error) bool {
if x == nil {
return y == nil
}
if y == nil {
return false
}
return x.Error() == y.Error()
}
func errTr(x error) string {
if x == nil {
return ""
}
return x.Error()
}
func TestNewResultSetStreamReader(t *testing.T) {
tests := []struct {
name string
stream *sliceStreamReader
exp string
expErr error
}{
{
name: "float series",
stream: newStreamReader(
response(
seriesF(Float, "cpu,tag0=val0"),
floatF(floatS{
0: 1.0,
1: 2.0,
2: 3.0,
}),
seriesF(Float, "cpu,tag0=val1"),
floatF(floatS{
10: 11.0,
11: 12.0,
12: 13.0,
}),
),
),
exp: `series: _m=cpu,tag0=val0
cursor:Float
0 | 1.00
1 | 2.00
2 | 3.00
series: _m=cpu,tag0=val1
cursor:Float
10 | 11.00
11 | 12.00
12 | 13.00
`,
},
{
name: "some empty series",
stream: newStreamReader(
response(
seriesF(Float, "cpu,tag0=val0"),
seriesF(Float, "cpu,tag0=val1"),
floatF(floatS{
10: 11.0,
11: 12.0,
12: 13.0,
}),
),
),
exp: `series: _m=cpu,tag0=val0
cursor:Float
series: _m=cpu,tag0=val1
cursor:Float
10 | 11.00
11 | 12.00
12 | 13.00
`,
},
{
name: "all data types",
stream: newStreamReader(
response(
seriesF(Boolean, "cpu,tag0=booleans"),
booleanF(booleanS{
3: false,
4: true,
5: true,
}),
seriesF(Float, "cpu,tag0=floats"),
floatF(floatS{
0: 1.0,
1: 2.0,
2: 3.0,
}),
seriesF(Integer, "cpu,tag0=integers"),
integerF(integerS{
1: 1,
2: 2,
3: 3,
}),
seriesF(String, "cpu,tag0=strings"),
stringF(stringS{
33: "thing 1",
34: "thing 2",
35: "things",
}),
seriesF(Unsigned, "cpu,tag0=unsigned"),
unsignedF(unsignedS{
2: 55,
3: 56,
4: 57,
}),
),
),
exp: `series: _m=cpu,tag0=booleans
cursor:Boolean
3 | false
4 | true
5 | true
series: _m=cpu,tag0=floats
cursor:Float
0 | 1.00
1 | 2.00
2 | 3.00
series: _m=cpu,tag0=integers
cursor:Integer
1 | 1
2 | 2
3 | 3
series: _m=cpu,tag0=strings
cursor:String
33 | thing 1
34 | thing 2
35 | things
series: _m=cpu,tag0=unsigned
cursor:Unsigned
2 | 55
3 | 56
4 | 57
`,
},
{
name: "invalid_points_no_series",
stream: newStreamReader(
response(
floatF(floatS{0: 1.0}),
),
),
expErr: errors.New("expected series frame, got *datatypes.ReadResponse_Frame_FloatPoints"),
},
{
name: "no points frames",
stream: newStreamReader(
response(
seriesF(Boolean, "cpu,tag0=booleans"),
seriesF(Float, "cpu,tag0=floats"),
seriesF(Integer, "cpu,tag0=integers"),
seriesF(String, "cpu,tag0=strings"),
seriesF(Unsigned, "cpu,tag0=unsigned"),
),
),
exp: `series: _m=cpu,tag0=booleans
cursor:Boolean
series: _m=cpu,tag0=floats
cursor:Float
series: _m=cpu,tag0=integers
cursor:Integer
series: _m=cpu,tag0=strings
cursor:String
series: _m=cpu,tag0=unsigned
cursor:Unsigned
`,
},
{
name: "invalid_group_frame",
stream: newStreamReader(
response(
groupF("tag0", "val0"),
floatF(floatS{0: 1.0}),
),
),
expErr: errors.New("expected series frame, got *datatypes.ReadResponse_Frame_Group"),
},
{
name: "invalid_multiple_data_types",
stream: newStreamReader(
response(
seriesF(Float, "cpu,tag0=val0"),
floatF(floatS{0: 1.0}),
integerF(integerS{0: 1}),
),
),
exp: `series: _m=cpu,tag0=val0
cursor:Float
0 | 1.00
cursor err: floatCursorStreamReader: unexpected frame type *datatypes.ReadResponse_Frame_IntegerPoints
`,
expErr: errors.New("floatCursorStreamReader: unexpected frame type *datatypes.ReadResponse_Frame_IntegerPoints"),
},
{
name: "some empty frames",
stream: newStreamReader(
response(
seriesF(Float, "cpu,tag0=val0"),
),
response(
floatF(floatS{
0: 1.0,
1: 2.0,
2: 3.0,
}),
),
response(),
response(
seriesF(Float, "cpu,tag0=val1"),
),
response(),
response(
floatF(floatS{
10: 11.0,
11: 12.0,
12: 13.0,
}),
),
response(),
),
exp: `series: _m=cpu,tag0=val0
cursor:Float
0 | 1.00
1 | 2.00
2 | 3.00
series: _m=cpu,tag0=val1
cursor:Float
10 | 11.00
11 | 12.00
12 | 13.00
`,
},
{
name: "last frame empty",
stream: newStreamReader(
response(
seriesF(Float, "cpu,tag0=val0"),
floatF(floatS{
0: 1.0,
1: 2.0,
2: 3.0,
}),
),
response(),
),
exp: `series: _m=cpu,tag0=val0
cursor:Float
0 | 1.00
1 | 2.00
2 | 3.00
`,
},
{
name: "ErrUnexpectedEOF",
stream: newStreamReader(
response(
seriesF(Float, "cpu,tag0=val0"),
),
response(
floatF(floatS{
0: 1.0,
1: 2.0,
2: 3.0,
}),
),
response(),
response(),
response(),
),
exp: `series: _m=cpu,tag0=val0
cursor:Float
0 | 1.00
1 | 2.00
2 | 3.00
cursor err: peekFrame: no data
`,
expErr: reads.ErrStreamNoData,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
rs := reads.NewResultSetStreamReader(tt.stream)
// ensure a peek doesn't effect the end result
rs.Peek()
sb := new(strings.Builder)
ResultSetToString(sb, rs)
if got := sb.String(); !cmp.Equal(got, tt.exp) {
t.Errorf("unexpected value; -got/+exp\n%s", cmp.Diff(got, tt.exp))
}
if got := rs.Err(); !cmp.Equal(got, tt.expErr, cmp.Comparer(errCmp)) {
t.Errorf("unexpected error; -got/+exp\n%s", cmp.Diff(got, tt.expErr, cmp.Transformer("err", errTr)))
}
})
}
}
func TestNewResultSetStreamReader_SkipSeriesCursors(t *testing.T) {
stream := newStreamReader(
response(
seriesF(Float, "cpu,tag0=floats"),
floatF(floatS{0: 1.0}),
seriesF(Integer, "cpu,tag0=integers"),
integerF(integerS{1: 1}),
seriesF(Unsigned, "cpu,tag0=unsigned"),
unsignedF(unsignedS{2: 55}),
),
)
expSeries := []string{"_m=cpu,tag0=floats", "_m=cpu,tag0=integers", "_m=cpu,tag0=unsigned"}
rs := reads.NewResultSetStreamReader(stream)
for i := 0; i < 3; i++ {
if got := rs.Next(); !cmp.Equal(got, true) {
t.Errorf("expected true")
}
sb := new(strings.Builder)
TagsToString(sb, rs.Tags())
if got := strings.TrimSpace(sb.String()); !cmp.Equal(got, expSeries[i]) {
t.Errorf("unexpected tags; -got/+exp\n%s", cmp.Diff(got, expSeries[i]))
}
cur := rs.Cursor()
if cur == nil {
t.Errorf("expected cursor")
}
cur.Close()
}
if got := rs.Next(); !cmp.Equal(got, false) {
t.Errorf("expected false")
}
rs.Close()
}
func TestNewGroupResultSetStreamReader(t *testing.T) {
tests := []struct {
name string
stream *sliceStreamReader
exp string
expErr error
}{
{
name: "groups none no series no points",
stream: newStreamReader(
response(
groupF("tag0,tag1", ""),
),
),
exp: `group:
tag key : tag0,tag1
partition key:
`,
},
{
name: "groups none series no points",
stream: newStreamReader(
response(
groupF("_m,tag0", ""),
seriesF(Float, "cpu,tag0=floats"),
seriesF(Integer, "cpu,tag0=integers"),
seriesF(Unsigned, "cpu,tag0=unsigned"),
),
),
exp: `group:
tag key : _m,tag0
partition key:
series: _m=cpu,tag0=floats
cursor:Float
series: _m=cpu,tag0=integers
cursor:Integer
series: _m=cpu,tag0=unsigned
cursor:Unsigned
`,
},
{
name: "groups none series points",
stream: newStreamReader(
response(
groupF("_m,tag0", ""),
seriesF(Float, "cpu,tag0=floats"),
floatF(floatS{0: 0.0, 1: 1.0, 2: 2.0}),
seriesF(Integer, "cpu,tag0=integers"),
integerF(integerS{10: 10, 20: 20, 30: 30}),
seriesF(Unsigned, "cpu,tag0=unsigned"),
unsignedF(unsignedS{100: 100, 200: 200, 300: 300}),
),
),
exp: `group:
tag key : _m,tag0
partition key:
series: _m=cpu,tag0=floats
cursor:Float
0 | 0.00
1 | 1.00
2 | 2.00
series: _m=cpu,tag0=integers
cursor:Integer
10 | 10
20 | 20
30 | 30
series: _m=cpu,tag0=unsigned
cursor:Unsigned
100 | 100
200 | 200
300 | 300
`,
},
{
name: "groups by no series no points",
stream: newStreamReader(
response(
groupF("tag00,tag10", "val00,val10"),
groupF("tag00,tag10", "val00,val11"),
groupF("tag00,tag10", "val01,val10"),
groupF("tag00,tag10", "val01,val11"),
),
),
exp: `group:
tag key : tag00,tag10
partition key: val00,val10
group:
tag key : tag00,tag10
partition key: val00,val11
group:
tag key : tag00,tag10
partition key: val01,val10
group:
tag key : tag00,tag10
partition key: val01,val11
`,
},
{
name: "groups by series no points",
stream: newStreamReader(
response(
groupF("_m,tag0", "cpu,val0"),
seriesF(Float, "cpu,tag0=val0"),
seriesF(Float, "cpu,tag0=val0,tag1=val0"),
groupF("_m,tag0", "cpu,val1"),
seriesF(Float, "cpu,tag0=val1"),
seriesF(Float, "cpu,tag0=val1,tag1=val0"),
),
),
exp: `group:
tag key : _m,tag0
partition key: cpu,val0
series: _m=cpu,tag0=val0
cursor:Float
series: _m=cpu,tag0=val0,tag1=val0
cursor:Float
group:
tag key : _m,tag0
partition key: cpu,val1
series: _m=cpu,tag0=val1
cursor:Float
series: _m=cpu,tag0=val1,tag1=val0
cursor:Float
`,
},
{
name: "missing group frame",
stream: newStreamReader(
response(
seriesF(Float, "cpu,tag0=val0"),
),
),
expErr: errors.New("expected group frame, got *datatypes.ReadResponse_Frame_Series"),
},
{
name: "incorrect points frame data type",
stream: newStreamReader(
response(
groupF("_m,tag0", "cpu,val0"),
seriesF(Float, "cpu,tag0=val0"),
integerF(integerS{0: 1}),
),
),
exp: `group:
tag key : _m,tag0
partition key: cpu,val0
series: _m=cpu,tag0=val0
cursor:Float
cursor err: floatCursorStreamReader: unexpected frame type *datatypes.ReadResponse_Frame_IntegerPoints
`,
expErr: errors.New("floatCursorStreamReader: unexpected frame type *datatypes.ReadResponse_Frame_IntegerPoints"),
},
{
name: "partition key order",
stream: newStreamReader(
response(
groupF("_m,tag0", "cpu,val1"),
groupF("_m,tag0", "cpu,val0"),
),
),
exp: `group:
tag key : _m,tag0
partition key: cpu,val1
`,
expErr: reads.ErrPartitionKeyOrder,
},
{
name: "partition key order",
stream: newStreamReader(
response(
groupF("_m", "cpu,"),
groupF("_m,tag0", "cpu,val0"),
),
),
exp: `group:
tag key : _m
partition key: cpu,<nil>
`,
expErr: reads.ErrPartitionKeyOrder,
},
{
name: "partition key order",
stream: newStreamReader(
response(
groupF("_m,tag0", ",val0"),
groupF("_m,tag0", "cpu,val0"),
),
),
exp: `group:
tag key : _m,tag0
partition key: <nil>,val0
`,
expErr: reads.ErrPartitionKeyOrder,
},
{
name: "partition key order",
stream: newStreamReader(
response(
groupF("_m,tag0", "cpu,val0"),
groupF("_m,tag0", "cpu,val1"),
groupF("_m,tag0", ","),
groupF("_m,tag0", ",val0"),
),
),
exp: `group:
tag key : _m,tag0
partition key: cpu,val0
group:
tag key : _m,tag0
partition key: cpu,val1
group:
tag key : _m,tag0
partition key: <nil>,<nil>
`,
expErr: reads.ErrPartitionKeyOrder,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
rs := reads.NewGroupResultSetStreamReader(tt.stream)
// ensure a peek doesn't effect the end result
rs.Peek()
sb := new(strings.Builder)
GroupResultSetToString(sb, rs)
if got := sb.String(); !cmp.Equal(got, tt.exp) {
t.Errorf("unexpected value; -got/+exp\n%s", cmp.Diff(got, tt.exp))
}
if got := rs.Err(); !cmp.Equal(got, tt.expErr, cmp.Comparer(errCmp)) {
t.Errorf("unexpected error; -got/+exp\n%s", cmp.Diff(got, tt.expErr, cmp.Transformer("err", errTr)))
}
})
}
}
func joinB(b [][]byte) string {
return string(bytes.Join(b, []byte(",")))
}
func TestNewGroupResultSetStreamReader_SkipGroupCursors(t *testing.T) {
stream := newStreamReader(
response(
groupF("_m,tag0", "cpu,val0"),
seriesF(Float, "cpu,tag0=val0"),
floatF(floatS{0: 1.0}),
groupF("_m,tag0", "cpu,val1"),
seriesF(Integer, "cpu,tag0=val1,tag1=val0"),
integerF(integerS{1: 1}),
seriesF(Integer, "cpu,tag0=val1,tag1=val1"),
unsignedF(unsignedS{2: 55}),
),
)
type expGroup struct {
tagKeys string
parKeys string
series []string
}
t.Run("skip series cursors", func(t *testing.T) {
exp := []expGroup{
{
tagKeys: "_m,tag0",
parKeys: "cpu,val0",
series: []string{"_m=cpu,tag0=val0"},
},
{
tagKeys: "_m,tag0",
parKeys: "cpu,val1",
series: []string{"_m=cpu,tag0=val1,tag1=val0", "_m=cpu,tag0=val1,tag1=val1"},
},
}
stream.reset()
grs := reads.NewGroupResultSetStreamReader(stream)
for i := range exp {
rs := grs.Next()
if rs == nil {
t.Errorf("expected group cursor")
}
if got := joinB(rs.Keys()); !cmp.Equal(got, exp[i].tagKeys) {
t.Errorf("unexpected group keys; -got/+exp\n%s", cmp.Diff(got, exp[i].tagKeys))
}
if got := joinB(rs.PartitionKeyVals()); !cmp.Equal(got, exp[i].parKeys) {
t.Errorf("unexpected group keys; -got/+exp\n%s", cmp.Diff(got, exp[i].parKeys))
}
for j := range exp[i].series {
if got := rs.Next(); !cmp.Equal(got, true) {
t.Errorf("expected true")
}
sb := new(strings.Builder)
TagsToString(sb, rs.Tags())
if got := strings.TrimSpace(sb.String()); !cmp.Equal(got, exp[i].series[j]) {
t.Errorf("unexpected tags; -got/+exp\n%s", cmp.Diff(got, exp[i].series[j]))
}
cur := rs.Cursor()
if cur == nil {
t.Errorf("expected cursor")
}
cur.Close()
}
if got := rs.Next(); !cmp.Equal(got, false) {
t.Errorf("expected false")
}
rs.Close()
}
if rs := grs.Next(); rs != nil {
t.Errorf("unexpected group cursor")
}
grs.Close()
})
t.Run("skip series", func(t *testing.T) {
exp := []expGroup{
{
tagKeys: "_m,tag0",
parKeys: "cpu,val0",
},
{
tagKeys: "_m,tag0",
parKeys: "cpu,val1",
},
}
stream.reset()
grs := reads.NewGroupResultSetStreamReader(stream)
for i := range exp {
rs := grs.Next()
if rs == nil {
t.Errorf("expected group cursor")
}
if got := joinB(rs.Keys()); !cmp.Equal(got, exp[i].tagKeys) {
t.Errorf("unexpected group keys; -got/+exp\n%s", cmp.Diff(got, exp[i].tagKeys))
}
if got := joinB(rs.PartitionKeyVals()); !cmp.Equal(got, exp[i].parKeys) {
t.Errorf("unexpected group keys; -got/+exp\n%s", cmp.Diff(got, exp[i].parKeys))
}
rs.Close()
}
if rs := grs.Next(); rs != nil {
t.Errorf("unexpected group cursor")
}
grs.Close()
})
}
func response(f ...datatypes.ReadResponse_Frame) datatypes.ReadResponse {
return datatypes.ReadResponse{Frames: f}
}
type sliceStreamReader struct {
res []datatypes.ReadResponse
p int
}
func newStreamReader(res ...datatypes.ReadResponse) *sliceStreamReader {
return &sliceStreamReader{res: res}
}
func (r *sliceStreamReader) reset() { r.p = 0 }
func (r *sliceStreamReader) Recv() (*datatypes.ReadResponse, error) {
if r.p < len(r.res) {
res := &r.res[r.p]
r.p++
return res, nil
}
return nil, io.EOF
}
func (r *sliceStreamReader) String() string {
return ""
}
// errStreamReader is a reads.StreamReader that always returns an error.
type errStreamReader string
func (e errStreamReader) Recv() (*datatypes.ReadResponse, error) {
return nil, errors.New(string(e))
}
// emptyStreamReader is a reads.StreamReader that returns no data.
type emptyStreamReader struct{}
func (s *emptyStreamReader) Recv() (*datatypes.ReadResponse, error) {
return nil, nil
}
func groupF(tagKeys string, partitionKeyVals string) datatypes.ReadResponse_Frame {
var pk [][]byte
if partitionKeyVals != "" {
pk = bytes.Split([]byte(partitionKeyVals), []byte(","))
for i := range pk {
if bytes.Equal(pk[i], nilValBytes) {
pk[i] = nil
}
}
}
return datatypes.ReadResponse_Frame{
Data: &datatypes.ReadResponse_Frame_Group{
Group: &datatypes.ReadResponse_GroupFrame{
TagKeys: bytes.Split([]byte(tagKeys), []byte(",")),
PartitionKeyVals: pk,
},
},
}
}
const (
Float = datatypes.DataTypeFloat
Integer = datatypes.DataTypeInteger
Unsigned = datatypes.DataTypeUnsigned
Boolean = datatypes.DataTypeBoolean
String = datatypes.DataTypeString
)
func seriesF(dt datatypes.ReadResponse_DataType, measurement string) datatypes.ReadResponse_Frame {
name, tags := models.ParseKeyBytes([]byte(measurement))
tags.Set([]byte("_m"), name)
t := make([]datatypes.Tag, len(tags))
for i, tag := range tags {
t[i].Key = tag.Key
t[i].Value = tag.Value
}
return datatypes.ReadResponse_Frame{
Data: &datatypes.ReadResponse_Frame_Series{
Series: &datatypes.ReadResponse_SeriesFrame{
DataType: dt,
Tags: t,
},
},
}
}

View File

@ -1,63 +0,0 @@
package reads
import (
"github.com/influxdata/influxdb/storage/reads/datatypes"
"github.com/influxdata/influxdb/tsdb/cursors"
)
type StringValuesStreamReader interface {
Recv() (*datatypes.StringValuesResponse, error)
}
type StringIteratorStreamReader struct {
stream StringValuesStreamReader
response *datatypes.StringValuesResponse
i int
err error
}
// API compatibility
var _ cursors.StringIterator = (*StringIteratorStreamReader)(nil)
func NewStringIteratorStreamReader(stream StringValuesStreamReader) *StringIteratorStreamReader {
return &StringIteratorStreamReader{
stream: stream,
}
}
func (r *StringIteratorStreamReader) Err() error {
return r.err
}
func (r *StringIteratorStreamReader) Next() bool {
if r.err != nil {
return false
}
if r.response == nil || len(r.response.Values)-1 <= r.i {
r.response, r.err = r.stream.Recv()
if r.err != nil {
return false
}
r.i = 0
} else {
r.i++
}
return len(r.response.Values) > r.i
}
func (r *StringIteratorStreamReader) Value() string {
if len(r.response.Values) > r.i {
return string(r.response.Values[r.i])
}
// Better than panic.
return ""
}
func (r *StringIteratorStreamReader) Stats() cursors.CursorStats {
return cursors.CursorStats{}
}

View File

@ -1,120 +0,0 @@
package reads_test
import (
"io"
"reflect"
"testing"
"github.com/influxdata/influxdb/storage/reads"
"github.com/influxdata/influxdb/storage/reads/datatypes"
"github.com/influxdata/influxdb/tsdb/cursors"
)
type mockStringIterator struct {
values []string
nextValue *string
stats cursors.CursorStats
}
func newMockStringIterator(scannedValues, scannedBytes int, values ...string) *mockStringIterator {
return &mockStringIterator{
values: values,
stats: cursors.CursorStats{
ScannedValues: scannedValues,
ScannedBytes: scannedBytes,
},
}
}
func (si *mockStringIterator) Next() bool {
if len(si.values) > 0 {
si.nextValue = &si.values[0]
si.values = si.values[1:]
return true
}
si.nextValue = nil
return false
}
func (si *mockStringIterator) Value() string {
if si.nextValue != nil {
return *si.nextValue
}
// Better than panic.
return ""
}
func (si *mockStringIterator) Stats() cursors.CursorStats {
if len(si.values) > 0 {
return cursors.CursorStats{}
}
return si.stats
}
type mockStringValuesStreamReader struct {
responses []*datatypes.StringValuesResponse
}
func newMockStringValuesStreamReader(responseValuess ...[]string) *mockStringValuesStreamReader {
responses := make([]*datatypes.StringValuesResponse, len(responseValuess))
for i := range responseValuess {
responses[i] = &datatypes.StringValuesResponse{
Values: make([][]byte, len(responseValuess[i])),
}
for j := range responseValuess[i] {
responses[i].Values[j] = []byte(responseValuess[i][j])
}
}
return &mockStringValuesStreamReader{
responses: responses,
}
}
func (r *mockStringValuesStreamReader) Recv() (*datatypes.StringValuesResponse, error) {
if len(r.responses) > 0 {
tr := r.responses[0]
r.responses = r.responses[1:]
return tr, nil
}
return nil, io.EOF
}
func TestStringIteratorStreamReader(t *testing.T) {
tests := []struct {
name string
responseValuess [][]string // []string is the values from one response
expectReadValues []string
}{
{
name: "simple",
responseValuess: [][]string{{"foo", "bar"}},
expectReadValues: []string{"foo", "bar"},
},
{
name: "no deduplication expected",
responseValuess: [][]string{{"foo", "bar", "bar"}, {"foo"}},
expectReadValues: []string{"foo", "bar", "bar", "foo"},
},
{
name: "not as simple",
responseValuess: [][]string{{"foo", "bar", "baz"}, {"qux"}, {"more"}},
expectReadValues: []string{"foo", "bar", "baz", "qux", "more"},
},
}
for _, tt := range tests {
stream := newMockStringValuesStreamReader(tt.responseValuess...)
r := reads.NewStringIteratorStreamReader(stream)
var got []string
for r.Next() {
got = append(got, r.Value())
}
if !reflect.DeepEqual(tt.expectReadValues, got) {
t.Errorf("expected %v got %v", tt.expectReadValues, got)
}
}
}

View File

@ -1,75 +0,0 @@
package reads
import (
"github.com/influxdata/influxdb/storage/reads/datatypes"
"github.com/influxdata/influxdb/tsdb/cursors"
)
type StringIteratorStream interface {
Send(*datatypes.StringValuesResponse) error
}
type StringIteratorWriter struct {
stream StringIteratorStream
res *datatypes.StringValuesResponse
err error
sz int // estimated size in bytes for pending write
vc int // total value count
}
func NewStringIteratorWriter(stream StringIteratorStream) *StringIteratorWriter {
siw := &StringIteratorWriter{
stream: stream,
res: &datatypes.StringValuesResponse{
Values: nil,
},
}
return siw
}
func (w *StringIteratorWriter) Err() error {
return w.err
}
func (w *StringIteratorWriter) WrittenN() int {
return w.vc
}
func (w *StringIteratorWriter) WriteStringIterator(si cursors.StringIterator) error {
if si == nil {
return nil
}
for si.Next() {
v := si.Value()
if v == "" {
// no value, no biggie
continue
}
w.res.Values = append(w.res.Values, []byte(v))
w.sz += len(v)
w.vc++
}
return nil
}
func (w *StringIteratorWriter) Flush() {
if w.err != nil || w.sz == 0 {
return
}
w.sz, w.vc = 0, 0
if w.err = w.stream.Send(w.res); w.err != nil {
return
}
for i := range w.res.Values {
w.res.Values[i] = nil
}
w.res.Values = w.res.Values[:0]
}

View File

@ -1,56 +0,0 @@
package reads_test
import (
"reflect"
"testing"
"github.com/influxdata/influxdb/storage/reads"
"github.com/influxdata/influxdb/storage/reads/datatypes"
)
type mockStringValuesStream struct {
responsesSent []*datatypes.StringValuesResponse
}
func (s *mockStringValuesStream) Send(response *datatypes.StringValuesResponse) error {
responseCopy := &datatypes.StringValuesResponse{
Values: make([][]byte, len(response.Values)),
}
for i := range response.Values {
responseCopy.Values[i] = response.Values[i]
}
s.responsesSent = append(s.responsesSent, responseCopy)
return nil
}
func TestStringIteratorWriter(t *testing.T) {
mockStream := &mockStringValuesStream{}
w := reads.NewStringIteratorWriter(mockStream)
si := newMockStringIterator(1, 2, "foo", "bar")
err := w.WriteStringIterator(si)
if err != nil {
t.Fatal(err)
}
w.Flush()
var got []string
for _, response := range mockStream.responsesSent {
for _, v := range response.Values {
got = append(got, string(v))
}
}
expect := []string{"foo", "bar"}
if !reflect.DeepEqual(expect, got) {
t.Errorf("expected %v got %v", expect, got)
}
}
func TestStringIteratorWriter_Nil(t *testing.T) {
w := reads.NewStringIteratorWriter(&mockStringValuesStream{})
err := w.WriteStringIterator(nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
w.Flush()
}