252 lines
5.3 KiB
Go
252 lines
5.3 KiB
Go
package reads
|
|
|
|
import (
|
|
"bytes"
|
|
"sort"
|
|
|
|
"github.com/influxdata/influxdb/pkg/slices"
|
|
)
|
|
|
|
// groupNoneMergedGroupResultSet produces a single GroupCursor, merging all
|
|
// GroupResultSet#Keys
|
|
type groupNoneMergedGroupResultSet struct {
|
|
g []GroupResultSet
|
|
gc groupNoneMergedGroupCursor
|
|
done bool
|
|
}
|
|
|
|
// Returns a GroupResultSet that merges results using the datatypes.GroupNone
|
|
// strategy. Each source GroupResultSet in g must be configured using the
|
|
// GroupNone strategy or the results are undefined.
|
|
//
|
|
// The GroupNone strategy must merge the partition key and tag keys
|
|
// from each source GroupResultSet when producing its
|
|
func NewGroupNoneMergedGroupResultSet(g []GroupResultSet) GroupResultSet {
|
|
if len(g) == 0 {
|
|
return nil
|
|
} else if len(g) == 1 {
|
|
return g[0]
|
|
}
|
|
|
|
grs := &groupNoneMergedGroupResultSet{
|
|
g: g,
|
|
gc: groupNoneMergedGroupCursor{
|
|
mergedResultSet: mergedResultSet{first: true},
|
|
},
|
|
}
|
|
|
|
var km keyMerger
|
|
results := make([]ResultSet, 0, len(g))
|
|
for _, rs := range g {
|
|
if gc := rs.Next(); gc != nil {
|
|
results = append(results, gc)
|
|
km.mergeKeys(gc.Keys())
|
|
} else if rs.Err() != nil {
|
|
grs.done = true
|
|
grs.gc.err = rs.Err()
|
|
results = nil
|
|
break
|
|
}
|
|
}
|
|
|
|
if len(results) > 0 {
|
|
grs.gc.keys = km.get()
|
|
grs.gc.heap.init(results)
|
|
}
|
|
|
|
return grs
|
|
}
|
|
|
|
func (r *groupNoneMergedGroupResultSet) Next() GroupCursor {
|
|
if !r.done {
|
|
r.done = true
|
|
return &r.gc
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (r *groupNoneMergedGroupResultSet) Err() error { return r.gc.err }
|
|
|
|
func (r *groupNoneMergedGroupResultSet) Close() {
|
|
r.gc.Close()
|
|
for _, grs := range r.g {
|
|
grs.Close()
|
|
}
|
|
r.g = nil
|
|
}
|
|
|
|
type groupNoneMergedGroupCursor struct {
|
|
mergedResultSet
|
|
keys [][]byte
|
|
}
|
|
|
|
func (r *groupNoneMergedGroupCursor) Keys() [][]byte {
|
|
return r.keys
|
|
}
|
|
|
|
func (r *groupNoneMergedGroupCursor) PartitionKeyVals() [][]byte {
|
|
return nil
|
|
}
|
|
|
|
// groupByMergedGroupResultSet implements the GroupBy strategy.
|
|
type groupByMergedGroupResultSet struct {
|
|
items []*groupCursorItem
|
|
alt []*groupCursorItem
|
|
groupCursors []GroupCursor
|
|
resultSets []ResultSet
|
|
nilVal []byte
|
|
err error
|
|
|
|
gc groupByMergedGroupCursor
|
|
}
|
|
|
|
// Returns a GroupResultSet that merges results using the datatypes.GroupBy
|
|
// strategy. Each source GroupResultSet in g must be configured using the
|
|
// GroupBy strategy with the same GroupKeys or the results are undefined.
|
|
func NewGroupByMergedGroupResultSet(g []GroupResultSet) GroupResultSet {
|
|
if len(g) == 0 {
|
|
return nil
|
|
} else if len(g) == 1 {
|
|
return g[0]
|
|
}
|
|
|
|
grs := &groupByMergedGroupResultSet{}
|
|
grs.nilVal = nilSortHi
|
|
grs.groupCursors = make([]GroupCursor, 0, len(g))
|
|
grs.resultSets = make([]ResultSet, 0, len(g))
|
|
grs.items = make([]*groupCursorItem, 0, len(g))
|
|
grs.alt = make([]*groupCursorItem, 0, len(g))
|
|
for _, rs := range g {
|
|
grs.items = append(grs.items, &groupCursorItem{grs: rs})
|
|
}
|
|
|
|
return grs
|
|
}
|
|
|
|
// next determines the cursors for the next partition key.
|
|
func (r *groupByMergedGroupResultSet) next() {
|
|
r.alt = r.alt[:0]
|
|
for i, item := range r.items {
|
|
if item.gc == nil {
|
|
item.gc = item.grs.Next()
|
|
if item.gc != nil {
|
|
r.alt = append(r.alt, item)
|
|
} else {
|
|
r.err = item.grs.Err()
|
|
item.grs.Close()
|
|
}
|
|
} else {
|
|
// append remaining non-nil cursors
|
|
r.alt = append(r.alt, r.items[i:]...)
|
|
break
|
|
}
|
|
}
|
|
|
|
r.items, r.alt = r.alt, r.items
|
|
if len(r.items) == 0 {
|
|
r.groupCursors = r.groupCursors[:0]
|
|
r.resultSets = r.resultSets[:0]
|
|
return
|
|
}
|
|
|
|
if r.err != nil {
|
|
r.Close()
|
|
return
|
|
}
|
|
|
|
sort.Slice(r.items, func(i, j int) bool {
|
|
return comparePartitionKey(r.items[i].gc.PartitionKeyVals(), r.items[j].gc.PartitionKeyVals(), r.nilVal) == -1
|
|
})
|
|
|
|
r.groupCursors = r.groupCursors[:1]
|
|
r.resultSets = r.resultSets[:1]
|
|
|
|
first := r.items[0].gc
|
|
r.groupCursors[0] = first
|
|
r.resultSets[0] = first
|
|
r.items[0].gc = nil
|
|
|
|
for i := 1; i < len(r.items); i++ {
|
|
if slices.CompareSlice(first.PartitionKeyVals(), r.items[i].gc.PartitionKeyVals()) == 0 {
|
|
r.groupCursors = append(r.groupCursors, r.items[i].gc)
|
|
r.resultSets = append(r.resultSets, r.items[i].gc)
|
|
r.items[i].gc = nil
|
|
}
|
|
}
|
|
}
|
|
|
|
func (r *groupByMergedGroupResultSet) Next() GroupCursor {
|
|
r.next()
|
|
if len(r.groupCursors) == 0 {
|
|
return nil
|
|
}
|
|
|
|
r.gc.first = true
|
|
r.gc.heap.init(r.resultSets)
|
|
r.gc.keys = r.groupCursors[0].Keys()
|
|
r.gc.vals = r.groupCursors[0].PartitionKeyVals()
|
|
return &r.gc
|
|
}
|
|
|
|
func (r *groupByMergedGroupResultSet) Err() error { return r.err }
|
|
|
|
func (r *groupByMergedGroupResultSet) Close() {
|
|
r.gc.Close()
|
|
for _, grs := range r.items {
|
|
if grs.gc != nil {
|
|
grs.gc.Close()
|
|
}
|
|
grs.grs.Close()
|
|
}
|
|
r.items = nil
|
|
r.alt = nil
|
|
}
|
|
|
|
type groupByMergedGroupCursor struct {
|
|
mergedResultSet
|
|
keys [][]byte
|
|
vals [][]byte
|
|
}
|
|
|
|
func (r *groupByMergedGroupCursor) Keys() [][]byte {
|
|
return r.keys
|
|
}
|
|
|
|
func (r *groupByMergedGroupCursor) PartitionKeyVals() [][]byte {
|
|
return r.vals
|
|
}
|
|
|
|
type groupCursorItem struct {
|
|
grs GroupResultSet
|
|
gc GroupCursor
|
|
}
|
|
|
|
func comparePartitionKey(a, b [][]byte, nilVal []byte) int {
|
|
i := 0
|
|
for i < len(a) && i < len(b) {
|
|
av, bv := a[i], b[i]
|
|
if len(av) == 0 {
|
|
av = nilVal
|
|
}
|
|
if len(bv) == 0 {
|
|
bv = nilVal
|
|
}
|
|
if v := bytes.Compare(av, bv); v == 0 {
|
|
i++
|
|
continue
|
|
} else {
|
|
return v
|
|
}
|
|
}
|
|
|
|
if i < len(b) {
|
|
// b is longer, so assume a is less
|
|
return -1
|
|
} else if i < len(a) {
|
|
// a is longer, so assume b is less
|
|
return 1
|
|
} else {
|
|
return 0
|
|
}
|
|
}
|