influxdb/storage/reads/stream_reader.gen.go.tmpl

65 lines
1.4 KiB
Cheetah

package reads
import (
"fmt"
"github.com/influxdata/influxdb/storage/reads/datatypes"
"github.com/influxdata/influxdb/tsdb/cursors"
)
type streamCursor interface {
streamCursor()
}
{{range .}}
type {{.name}}CursorStreamReader struct {
fr *frameReader
a cursors.{{.Name}}Array
}
func (c *{{.name}}CursorStreamReader) streamCursor() {}
func (c *{{.name}}CursorStreamReader) Close() {
for c.fr.state == stateRead{{.Name}}Points {
c.readFrame()
}
}
func (c *{{.name}}CursorStreamReader) Err() error { return c.fr.err }
func (c *{{.name}}CursorStreamReader) Next() *cursors.{{.Name}}Array {
if c.fr.state == stateRead{{.Name}}Points {
c.readFrame()
}
return &c.a
}
func (c *{{.name}}CursorStreamReader) readFrame() {
c.a.Timestamps = nil
c.a.Values = nil
if f := c.fr.peekFrame(); f != nil {
switch ff := f.Data.(type) {
case *datatypes.ReadResponse_Frame_{{.Name}}Points:
c.a.Timestamps = ff.{{.Name}}Points.Timestamps
c.a.Values = ff.{{.Name}}Points.Values
c.fr.nextFrame()
case *datatypes.ReadResponse_Frame_Series:
c.fr.state = stateReadSeries
case *datatypes.ReadResponse_Frame_Group:
c.fr.state = stateReadGroup
default:
c.fr.setErr(fmt.Errorf("{{.name}}CursorStreamReader: unexpected frame type %T", f.Data))
}
}
}
func (c *{{.name}}CursorStreamReader) Stats() cursors.CursorStats {
return c.fr.stats.Stats()
}
{{end}}