influxdb/storage/reads/stream_reader.go

444 lines
10 KiB
Go

package reads
import (
"errors"
"fmt"
"io"
"strconv"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/storage/reads/datatypes"
"github.com/influxdata/influxdb/tsdb/cursors"
"google.golang.org/grpc/metadata"
)
var (
// ErrSeriesKeyOrder means the series keys for a ResultSetStreamReader were
// incorrectly ordered.
ErrSeriesKeyOrder = errors.New("invalid series key order")
// ErrPartitionKeyOrder means the partition keys for a
// GroupResultSetStreamReader were incorrectly ordered.
ErrPartitionKeyOrder = errors.New("invalid partition key order")
// ErrStreamNoData means the StreamReader repeatedly returned no data
// when calling Recv
ErrStreamNoData = errors.New("peekFrame: no data")
)
// peekFrameRetries specifies the number of times peekFrame will
// retry before returning ErrStreamNoData when StreamReader.Recv
// returns an empty result.
const peekFrameRetries = 2
type StreamReader interface {
Recv() (*datatypes.ReadResponse, error)
}
// statistics is the interface which wraps the Stats method.
type statistics interface {
Stats() cursors.CursorStats
}
var zeroStatistics statistics = &emptyStatistics{}
type emptyStatistics struct{}
func (*emptyStatistics) Stats() cursors.CursorStats {
return cursors.CursorStats{}
}
// StorageReadClient adapts a Storage_ReadClient to implement cursors.Statistics
// and read the statistics from the gRPC trailer.
type StorageReadClient struct {
c datatypes.Storage_ReadClient
trailer metadata.MD
}
// NewStorageReadClient returns a new StorageReadClient which implements
// StreamReader and reads the gRPC trailer to return CursorStats.
func NewStorageReadClient(c datatypes.Storage_ReadClient) *StorageReadClient {
return &StorageReadClient{c: c}
}
func (rc *StorageReadClient) Recv() (res *datatypes.ReadResponse, err error) {
res, err = rc.c.Recv()
if err != nil {
rc.trailer = rc.c.Trailer()
}
return res, err
}
func (rc *StorageReadClient) Stats() (stats cursors.CursorStats) {
for _, s := range rc.trailer.Get("scanned-bytes") {
v, err := strconv.Atoi(s)
if err != nil {
continue
}
stats.ScannedBytes += v
}
for _, s := range rc.trailer.Get("scanned-values") {
v, err := strconv.Atoi(s)
if err != nil {
continue
}
stats.ScannedValues += v
}
return stats
}
type ResultSetStreamReader struct {
fr frameReader
cur cursorReaders
tags models.Tags
prev models.Tags
}
func NewResultSetStreamReader(stream StreamReader) *ResultSetStreamReader {
r := &ResultSetStreamReader{fr: frameReader{s: stream, state: stateReadSeries}}
r.fr.init()
r.cur.setFrameReader(&r.fr)
return r
}
func (r *ResultSetStreamReader) Err() error { return r.fr.err }
func (r *ResultSetStreamReader) Close() { r.fr.state = stateDone }
func (r *ResultSetStreamReader) Cursor() cursors.Cursor { return r.cur.cursor() }
func (r *ResultSetStreamReader) Stats() cursors.CursorStats {
return r.fr.stats.Stats()
}
func (r *ResultSetStreamReader) Next() bool {
if r.fr.state == stateReadSeries {
return r.readSeriesFrame()
}
if r.fr.state == stateDone || r.fr.state == stateReadErr {
return false
}
r.fr.setErr(fmt.Errorf("expected reader in state %v, was in state %v", stateReadSeries, r.fr.state))
return false
}
func (r *ResultSetStreamReader) readSeriesFrame() bool {
f := r.fr.peekFrame()
if f == nil {
return false
}
r.fr.nextFrame()
if sf, ok := f.Data.(*datatypes.ReadResponse_Frame_Series); ok {
r.fr.state = stateReadPoints
r.prev, r.tags = r.tags, r.prev
if cap(r.tags) < len(sf.Series.Tags) {
r.tags = make(models.Tags, len(sf.Series.Tags))
} else {
r.tags = r.tags[:len(sf.Series.Tags)]
}
for i := range sf.Series.Tags {
r.tags[i].Key = sf.Series.Tags[i].Key
r.tags[i].Value = sf.Series.Tags[i].Value
}
if models.CompareTags(r.tags, r.prev) == 1 || r.prev == nil {
r.cur.nextType = sf.Series.DataType
return true
}
r.fr.setErr(ErrSeriesKeyOrder)
} else {
r.fr.setErr(fmt.Errorf("expected series frame, got %T", f.Data))
}
return false
}
func (r *ResultSetStreamReader) Tags() models.Tags {
return r.tags
}
type GroupResultSetStreamReader struct {
fr frameReader
gc groupCursorStreamReader
}
func NewGroupResultSetStreamReader(stream StreamReader) *GroupResultSetStreamReader {
r := &GroupResultSetStreamReader{fr: frameReader{s: stream, state: stateReadGroup}}
r.fr.init()
r.gc.fr = &r.fr
r.gc.cur.setFrameReader(&r.fr)
return r
}
func (r *GroupResultSetStreamReader) Err() error { return r.fr.err }
func (r *GroupResultSetStreamReader) Next() GroupCursor {
if r.fr.state == stateReadGroup {
return r.readGroupFrame()
}
if r.fr.state == stateDone || r.fr.state == stateReadErr {
return nil
}
r.fr.setErr(fmt.Errorf("expected reader in state %v, was in state %v", stateReadGroup, r.fr.state))
return nil
}
func (r *GroupResultSetStreamReader) readGroupFrame() GroupCursor {
f := r.fr.peekFrame()
if f == nil {
return nil
}
r.fr.nextFrame()
if sf, ok := f.Data.(*datatypes.ReadResponse_Frame_Group); ok {
r.fr.state = stateReadSeries
if cap(r.gc.tagKeys) < len(sf.Group.TagKeys) {
r.gc.tagKeys = make([][]byte, len(sf.Group.TagKeys))
} else {
r.gc.tagKeys = r.gc.tagKeys[:len(sf.Group.TagKeys)]
}
copy(r.gc.tagKeys, sf.Group.TagKeys)
r.gc.partitionKeyVals, r.gc.prevKey = r.gc.prevKey, r.gc.partitionKeyVals
if cap(r.gc.partitionKeyVals) < len(sf.Group.PartitionKeyVals) {
r.gc.partitionKeyVals = make([][]byte, len(sf.Group.PartitionKeyVals))
} else {
r.gc.partitionKeyVals = r.gc.partitionKeyVals[:len(sf.Group.PartitionKeyVals)]
}
copy(r.gc.partitionKeyVals, sf.Group.PartitionKeyVals)
if comparePartitionKey(r.gc.partitionKeyVals, r.gc.prevKey, nilSortHi) == 1 || r.gc.prevKey == nil {
return &r.gc
}
r.fr.setErr(ErrPartitionKeyOrder)
} else {
r.fr.setErr(fmt.Errorf("expected group frame, got %T", f.Data))
}
return nil
}
func (r *GroupResultSetStreamReader) Close() {
r.fr.state = stateDone
}
type groupCursorStreamReader struct {
fr *frameReader
cur cursorReaders
tagKeys [][]byte
partitionKeyVals [][]byte
prevKey [][]byte
tags models.Tags
}
func (gc *groupCursorStreamReader) Err() error { return gc.fr.err }
func (gc *groupCursorStreamReader) Tags() models.Tags { return gc.tags }
func (gc *groupCursorStreamReader) Keys() [][]byte { return gc.tagKeys }
func (gc *groupCursorStreamReader) PartitionKeyVals() [][]byte { return gc.partitionKeyVals }
func (gc *groupCursorStreamReader) Cursor() cursors.Cursor { return gc.cur.cursor() }
func (gc *groupCursorStreamReader) Stats() cursors.CursorStats {
return gc.fr.stats.Stats()
}
func (gc *groupCursorStreamReader) Next() bool {
if gc.fr.state == stateReadSeries {
return gc.readSeriesFrame()
}
if gc.fr.state == stateDone || gc.fr.state == stateReadErr || gc.fr.state == stateReadGroup {
return false
}
gc.fr.setErr(fmt.Errorf("expected reader in state %v, was in state %v", stateReadSeries, gc.fr.state))
return false
}
func (gc *groupCursorStreamReader) readSeriesFrame() bool {
f := gc.fr.peekFrame()
if f == nil {
return false
}
if sf, ok := f.Data.(*datatypes.ReadResponse_Frame_Series); ok {
gc.fr.nextFrame()
gc.fr.state = stateReadPoints
if cap(gc.tags) < len(sf.Series.Tags) {
gc.tags = make(models.Tags, len(sf.Series.Tags))
} else {
gc.tags = gc.tags[:len(sf.Series.Tags)]
}
for i := range sf.Series.Tags {
gc.tags[i].Key = sf.Series.Tags[i].Key
gc.tags[i].Value = sf.Series.Tags[i].Value
}
gc.cur.nextType = sf.Series.DataType
return true
} else if _, ok := f.Data.(*datatypes.ReadResponse_Frame_Group); ok {
gc.fr.state = stateReadGroup
return false
}
gc.fr.setErr(fmt.Errorf("expected series frame, got %T", f.Data))
return false
}
func (gc *groupCursorStreamReader) Close() {
RETRY:
if gc.fr.state == stateReadPoints {
cur := gc.Cursor()
if cur != nil {
cur.Close()
}
}
if gc.fr.state == stateReadSeries {
gc.readSeriesFrame()
goto RETRY
}
}
type readState byte
const (
stateReadGroup readState = iota
stateReadSeries
stateReadPoints
stateReadFloatPoints
stateReadIntegerPoints
stateReadUnsignedPoints
stateReadBooleanPoints
stateReadStringPoints
stateReadErr
stateDone
)
type frameReader struct {
s StreamReader
stats statistics
state readState
buf []datatypes.ReadResponse_Frame
p int
err error
}
func (r *frameReader) init() {
if stats, ok := r.s.(statistics); ok {
r.stats = stats
} else {
r.stats = zeroStatistics
}
}
func (r *frameReader) peekFrame() *datatypes.ReadResponse_Frame {
retries := peekFrameRetries
RETRY:
if r.p < len(r.buf) {
f := &r.buf[r.p]
return f
}
r.p = 0
r.buf = nil
res, err := r.s.Recv()
if err == nil {
if res != nil {
r.buf = res.Frames
}
if retries > 0 {
retries--
goto RETRY
}
r.setErr(ErrStreamNoData)
} else if err == io.EOF {
r.state = stateDone
} else {
r.setErr(err)
}
return nil
}
func (r *frameReader) nextFrame() { r.p++ }
func (r *frameReader) setErr(err error) {
r.err = err
r.state = stateReadErr
}
type cursorReaders struct {
fr *frameReader
nextType datatypes.ReadResponse_DataType
cc cursors.Cursor
f floatCursorStreamReader
i integerCursorStreamReader
u unsignedCursorStreamReader
b booleanCursorStreamReader
s stringCursorStreamReader
}
func (cur *cursorReaders) setFrameReader(fr *frameReader) {
cur.fr = fr
cur.f.fr = fr
cur.i.fr = fr
cur.u.fr = fr
cur.b.fr = fr
cur.s.fr = fr
}
func (cur *cursorReaders) cursor() cursors.Cursor {
cur.cc = nil
if cur.fr.state != stateReadPoints {
cur.fr.setErr(fmt.Errorf("expected reader in state %v, was in state %v", stateReadPoints, cur.fr.state))
return cur.cc
}
switch cur.nextType {
case datatypes.DataTypeFloat:
cur.fr.state = stateReadFloatPoints
cur.cc = &cur.f
case datatypes.DataTypeInteger:
cur.fr.state = stateReadIntegerPoints
cur.cc = &cur.i
case datatypes.DataTypeUnsigned:
cur.fr.state = stateReadUnsignedPoints
cur.cc = &cur.u
case datatypes.DataTypeBoolean:
cur.fr.state = stateReadBooleanPoints
cur.cc = &cur.b
case datatypes.DataTypeString:
cur.fr.state = stateReadStringPoints
cur.cc = &cur.s
default:
cur.fr.setErr(fmt.Errorf("unexpected data type, %d", cur.nextType))
}
return cur.cc
}