refactor(storage): Export subset of APIs to facilitate reuse

* the protocol service definition, ReadRequest and ReadResponse is
  reused across projects, rather than requiring redefinition.

* the ReadRequest protocol buffer definition removes the concept of a
  database and retention policy, replacing it with a field named
  ReadSource of type google.protobuf.Any. OSS requests will use the
  ReadSource message structure defined in local to this package, which
  defines fields to represent a Database and RetentionPolicy. Other
  implementations can provide their own data structure allowing the
  remainder of the ReadRequest to be reused.

* The RPC service and Store are expected to be redefined to handle their
  specific requirements for resolving a ReadSource

* ResultSet and GroupResultSet are interfaces representing non-grouping
  and grouping read behavior respectively. Calling NewResultSet or
  NewGroupResultSet will construct instances of these types

* The ResponseWriter type is exported to deal with serialization of
  the ResultSet and GroupResultSet types
pull/10231/head
Stuart Carnie 2018-08-24 10:30:12 -07:00
parent 2f4fcd8255
commit 59bd822af9
23 changed files with 5133 additions and 4701 deletions

View File

@ -286,12 +286,7 @@ func (s *Server) appendHTTPDService(c httpd.Config) {
srv.Handler.PointsWriter = s.PointsWriter
srv.Handler.Version = s.buildInfo.Version
srv.Handler.BuildType = "OSS"
// Wire up storage service for Prometheus endpoints.
storageStore := storage.NewStore()
storageStore.MetaClient = s.MetaClient
storageStore.TSDBStore = s.TSDBStore
srv.Handler.Store = storageStore
srv.Handler.Store = storage.NewStore(s.TSDBStore, s.MetaClient)
s.Services = append(s.Services, srv)
}

View File

@ -14,6 +14,7 @@ import (
"time"
"github.com/gogo/protobuf/proto"
"github.com/gogo/protobuf/types"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/services/storage"
"github.com/influxdata/influxql"
@ -184,12 +185,17 @@ func (cmd *Command) validate() error {
}
func (cmd *Command) query(c storage.StorageClient) error {
var req storage.ReadRequest
req.Database = cmd.database
if cmd.retentionPolicy != "" {
req.Database += "/" + cmd.retentionPolicy
src := storage.ReadSource{
Database: cmd.database,
RetentionPolicy: cmd.retentionPolicy,
}
var req storage.ReadRequest
if any, err := types.MarshalAny(&src); err != nil {
return err
} else {
req.ReadSource = any
}
req.TimestampRange.Start = cmd.startTime
req.TimestampRange.End = cmd.endTime
req.SeriesLimit = cmd.slimit

View File

@ -14,7 +14,7 @@ import (
// It's currently a partial implementation as one of a store's exported methods
// returns an unexported type.
type StorageStoreMock struct {
ReadFn func(ctx context.Context, req *storage.ReadRequest) (storage.Results, error)
ReadFn func(ctx context.Context, req *storage.ReadRequest) (storage.ResultSet, error)
WithLoggerFn func(log *zap.Logger)
ResultSet *StorageResultsMock
@ -29,7 +29,7 @@ func NewStorageStoreMock() *StorageStoreMock {
WithLoggerFn: func(*zap.Logger) {},
ResultSet: NewStorageResultsMock(),
}
store.ReadFn = func(context.Context, *storage.ReadRequest) (storage.Results, error) {
store.ReadFn = func(context.Context, *storage.ReadRequest) (storage.ResultSet, error) {
return store.ResultSet, nil
}
return store
@ -41,7 +41,7 @@ func (s *StorageStoreMock) WithLogger(log *zap.Logger) {
}
// Read reads the storage request and returns a cursor to access results.
func (s *StorageStoreMock) Read(ctx context.Context, req *storage.ReadRequest) (storage.Results, error) {
func (s *StorageStoreMock) Read(ctx context.Context, req *storage.ReadRequest) (storage.ResultSet, error) {
return s.ReadFn(ctx, req)
}

View File

@ -6,6 +6,7 @@ import (
"math"
"time"
"github.com/gogo/protobuf/types"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/prometheus/remote"
"github.com/influxdata/influxdb/services/storage"
@ -81,12 +82,13 @@ func ReadRequestToInfluxStorageRequest(req *remote.ReadRequest, db, rp string) (
}
q := req.Queries[0]
if rp != "" {
db = db + "/" + rp
src, err := types.MarshalAny(&storage.ReadSource{Database: db, RetentionPolicy: rp})
if err != nil {
return nil, err
}
sreq := &storage.ReadRequest{
Database: db,
ReadSource: src,
TimestampRange: storage.TimestampRange{
Start: time.Unix(0, q.StartTimestampMs*int64(time.Millisecond)).UnixNano(),
End: time.Unix(0, q.EndTimestampMs*int64(time.Millisecond)).UnixNano(),

View File

@ -129,7 +129,6 @@ func NewHandler(c Config) *Handler {
Config: &c,
Logger: zap.NewNop(),
CLFLogger: log.New(os.Stderr, "[httpd] ", 0),
Store: storage.NewStore(),
stats: &Statistics{},
requestTracker: NewRequestTracker(),
}
@ -1595,7 +1594,7 @@ func (h *Handler) recovery(inner http.Handler, name string) http.Handler {
// Store describes the behaviour of the storage packages Store type.
type Store interface {
Read(ctx context.Context, req *storage.ReadRequest) (storage.Results, error)
Read(ctx context.Context, req *storage.ReadRequest) (storage.ResultSet, error)
WithLogger(log *zap.Logger)
}

View File

@ -85,25 +85,24 @@ type multiShardArrayCursors struct {
}
}
func newMultiShardArrayCursors(ctx context.Context, rr *readRequest) *multiShardArrayCursors {
lim := rr.limit
if lim < 0 {
lim = 1
func newMultiShardArrayCursors(ctx context.Context, start, end int64, asc bool, limit int64) *multiShardArrayCursors {
if limit < 0 {
limit = 1
}
m := &multiShardArrayCursors{
ctx: ctx,
limit: lim,
limit: limit,
req: tsdb.CursorRequest{
Ascending: rr.asc,
StartTime: rr.start,
EndTime: rr.end,
Ascending: asc,
StartTime: start,
EndTime: end,
},
}
cc := cursorContext{
ctx: ctx,
limit: lim,
limit: limit,
req: &m.req,
}
@ -116,20 +115,20 @@ func newMultiShardArrayCursors(ctx context.Context, rr *readRequest) *multiShard
return m
}
func (m *multiShardArrayCursors) createCursor(row seriesRow) tsdb.Cursor {
m.req.Name = row.name
m.req.Tags = row.stags
m.req.Field = row.field.n
func (m *multiShardArrayCursors) createCursor(row SeriesRow) tsdb.Cursor {
m.req.Name = row.Name
m.req.Tags = row.SeriesTags
m.req.Field = row.Field
var cond expression
if row.valueCond != nil {
cond = &astExpr{row.valueCond}
if row.ValueCond != nil {
cond = &astExpr{row.ValueCond}
}
var shard tsdb.CursorIterator
var cur tsdb.Cursor
for cur == nil && len(row.query) > 0 {
shard, row.query = row.query[0], row.query[1:]
for cur == nil && len(row.Query) > 0 {
shard, row.Query = row.Query[0], row.Query[1:]
cur, _ = shard.Next(m.ctx, &m.req)
}
@ -139,19 +138,19 @@ func (m *multiShardArrayCursors) createCursor(row seriesRow) tsdb.Cursor {
switch c := cur.(type) {
case tsdb.IntegerArrayCursor:
m.cursors.i.reset(c, row.query, cond)
m.cursors.i.reset(c, row.Query, cond)
return &m.cursors.i
case tsdb.FloatArrayCursor:
m.cursors.f.reset(c, row.query, cond)
m.cursors.f.reset(c, row.Query, cond)
return &m.cursors.f
case tsdb.UnsignedArrayCursor:
m.cursors.u.reset(c, row.query, cond)
m.cursors.u.reset(c, row.Query, cond)
return &m.cursors.u
case tsdb.StringArrayCursor:
m.cursors.s.reset(c, row.query, cond)
m.cursors.s.reset(c, row.Query, cond)
return &m.cursors.s
case tsdb.BooleanArrayCursor:
m.cursors.b.reset(c, row.query, cond)
m.cursors.b.reset(c, row.Query, cond)
return &m.cursors.b
default:
panic(fmt.Sprintf("unreachable: %T", cur))

View File

@ -14,51 +14,35 @@ import (
"go.uber.org/zap"
)
type GroupCursor interface {
Tags() models.Tags
Keys() [][]byte
PartitionKeyVals() [][]byte
Next() bool
Cursor() tsdb.Cursor
Close()
}
type groupResultSet struct {
ctx context.Context
req *ReadRequest
rr readRequest
agg *Aggregate
mb multiShardCursors
i int
rows []*seriesRow
rows []*SeriesRow
keys [][]byte
rgc groupByCursor
km keyMerger
newCursorFn func() (seriesCursor, error)
newCursorFn func() (SeriesCursor, error)
nextGroupFn func(c *groupResultSet) GroupCursor
sortFn func(c *groupResultSet) (int, error)
eof bool
}
func newGroupResultSet(ctx context.Context, req *ReadRequest, newCursorFn func() (seriesCursor, error)) *groupResultSet {
func NewGroupResultSet(ctx context.Context, req *ReadRequest, newCursorFn func() (SeriesCursor, error)) *groupResultSet {
g := &groupResultSet{
ctx: ctx,
req: req,
rr: readRequest{
ctx: ctx,
start: req.TimestampRange.Start,
end: req.TimestampRange.End,
asc: !req.Descending,
limit: req.PointsLimit,
aggregate: req.Aggregate,
},
ctx: ctx,
req: req,
agg: req.Aggregate,
keys: make([][]byte, len(req.GroupKeys)),
newCursorFn: newCursorFn,
}
g.mb = newMultiShardArrayCursors(ctx, &g.rr)
g.mb = newMultiShardArrayCursors(ctx, req.TimestampRange.Start, req.TimestampRange.End, !req.Descending, req.PointsLimit)
for i, k := range req.GroupKeys {
g.keys[i] = []byte(k)
@ -69,8 +53,9 @@ func newGroupResultSet(ctx context.Context, req *ReadRequest, newCursorFn func()
g.sortFn = groupBySort
g.nextGroupFn = groupByNextGroup
g.rgc = groupByCursor{
ctx: ctx,
mb: g.mb,
req: &g.rr,
agg: req.Aggregate,
vals: make([][]byte, len(req.GroupKeys)),
}
@ -134,7 +119,7 @@ func (g *groupResultSet) sort() (int, error) {
// seriesHasPoints reads the first block of TSM data to verify the series has points for
// the time range of the query.
func (g *groupResultSet) seriesHasPoints(row *seriesRow) bool {
func (g *groupResultSet) seriesHasPoints(row *SeriesRow) bool {
// TODO(sgc): this is expensive. Storage engine must provide efficient time range queries of series keys.
cur := g.mb.createCursor(*row)
var ts []int64
@ -174,8 +159,9 @@ func groupNoneNextGroup(g *groupResultSet) GroupCursor {
g.eof = true
return &groupNoneCursor{
ctx: g.ctx,
mb: g.mb,
req: &g.rr,
agg: g.agg,
cur: cur,
keys: g.km.get(),
}
@ -196,7 +182,7 @@ func groupNoneSort(g *groupResultSet) (int, error) {
for row != nil {
n++
if allTime || g.seriesHasPoints(row) {
g.km.mergeTagKeys(row.tags)
g.km.mergeTagKeys(row.Tags)
}
row = cur.Next()
}
@ -209,17 +195,17 @@ func groupByNextGroup(g *groupResultSet) GroupCursor {
next:
row := g.rows[g.i]
for i := range g.keys {
g.rgc.vals[i] = row.tags.Get(g.keys[i])
g.rgc.vals[i] = row.Tags.Get(g.keys[i])
}
g.km.clear()
allTime := g.req.Hints.HintSchemaAllTime()
c := 0
rowKey := row.sortKey
rowKey := row.SortKey
j := g.i
for j < len(g.rows) && bytes.Equal(rowKey, g.rows[j].sortKey) {
for j < len(g.rows) && bytes.Equal(rowKey, g.rows[j].SortKey) {
if allTime || g.seriesHasPoints(g.rows[j]) {
g.km.mergeTagKeys(g.rows[j].tags)
g.km.mergeTagKeys(g.rows[j].Tags)
c++
}
j++
@ -247,28 +233,28 @@ func groupBySort(g *groupResultSet) (int, error) {
return 0, nil
}
var rows []*seriesRow
var rows []*SeriesRow
vals := make([][]byte, len(g.keys))
tagsBuf := &tagsBuffer{sz: 4096}
row := cur.Next()
for row != nil {
nr := *row
nr.stags = tagsBuf.copyTags(nr.stags)
nr.tags = tagsBuf.copyTags(nr.tags)
nr.SeriesTags = tagsBuf.copyTags(nr.SeriesTags)
nr.Tags = tagsBuf.copyTags(nr.Tags)
l := 0
for i, k := range g.keys {
vals[i] = nr.tags.Get(k)
vals[i] = nr.Tags.Get(k)
if len(vals[i]) == 0 {
vals[i] = nilKey[:] // if there was no value, ensure it sorts last
}
l += len(vals[i])
}
nr.sortKey = make([]byte, 0, l)
nr.SortKey = make([]byte, 0, l)
for _, v := range vals {
nr.sortKey = append(nr.sortKey, v...)
nr.SortKey = append(nr.SortKey, v...)
}
rows = append(rows, &nr)
@ -276,7 +262,7 @@ func groupBySort(g *groupResultSet) (int, error) {
}
sort.Slice(rows, func(i, j int) bool {
return bytes.Compare(rows[i].sortKey, rows[j].sortKey) == -1
return bytes.Compare(rows[i].SortKey, rows[j].SortKey) == -1
})
g.rows = rows
@ -286,14 +272,15 @@ func groupBySort(g *groupResultSet) (int, error) {
}
type groupNoneCursor struct {
ctx context.Context
mb multiShardCursors
req *readRequest
cur seriesCursor
row seriesRow
agg *Aggregate
cur SeriesCursor
row SeriesRow
keys [][]byte
}
func (c *groupNoneCursor) Tags() models.Tags { return c.row.tags }
func (c *groupNoneCursor) Tags() models.Tags { return c.row.Tags }
func (c *groupNoneCursor) Keys() [][]byte { return c.keys }
func (c *groupNoneCursor) PartitionKeyVals() [][]byte { return nil }
func (c *groupNoneCursor) Close() { c.cur.Close() }
@ -311,29 +298,30 @@ func (c *groupNoneCursor) Next() bool {
func (c *groupNoneCursor) Cursor() tsdb.Cursor {
cur := c.mb.createCursor(c.row)
if c.req.aggregate != nil {
cur = c.mb.newAggregateCursor(c.req.ctx, c.req.aggregate, cur)
if c.agg != nil {
cur = c.mb.newAggregateCursor(c.ctx, c.agg, cur)
}
return cur
}
type groupByCursor struct {
ctx context.Context
mb multiShardCursors
req *readRequest
agg *Aggregate
i int
rows []*seriesRow
rows []*SeriesRow
keys [][]byte
vals [][]byte
}
func (c *groupByCursor) reset(rows []*seriesRow) {
func (c *groupByCursor) reset(rows []*SeriesRow) {
c.i = 0
c.rows = rows
}
func (c *groupByCursor) Keys() [][]byte { return c.keys }
func (c *groupByCursor) PartitionKeyVals() [][]byte { return c.vals }
func (c *groupByCursor) Tags() models.Tags { return c.rows[c.i-1].tags }
func (c *groupByCursor) Tags() models.Tags { return c.rows[c.i-1].Tags }
func (c *groupByCursor) Close() {}
func (c *groupByCursor) Next() bool {
@ -346,8 +334,8 @@ func (c *groupByCursor) Next() bool {
func (c *groupByCursor) Cursor() tsdb.Cursor {
cur := c.mb.createCursor(*c.rows[c.i-1])
if c.req.aggregate != nil {
cur = c.mb.newAggregateCursor(c.req.ctx, c.req.aggregate, cur)
if c.agg != nil {
cur = c.mb.newAggregateCursor(c.ctx, c.agg, cur)
}
return cur
}

View File

@ -12,10 +12,10 @@ import (
func TestGroupGroupResultSetSorting(t *testing.T) {
tests := []struct {
name string
cur seriesCursor
cur SeriesCursor
group ReadRequest_Group
keys []string
exp []seriesRow
exp []SeriesRow
}{
{
name: "group by tag1 in all series",
@ -96,12 +96,12 @@ func TestGroupGroupResultSetSorting(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
newCursor := func() (seriesCursor, error) {
newCursor := func() (SeriesCursor, error) {
return tt.cur, nil
}
rs := newGroupResultSet(context.Background(), &ReadRequest{Group: tt.group, GroupKeys: tt.keys}, newCursor)
rs := NewGroupResultSet(context.Background(), &ReadRequest{Group: tt.group, GroupKeys: tt.keys}, newCursor)
var rows []seriesRow
var rows []SeriesRow
for i := range rs.rows {
rows = append(rows, *rs.rows[i])
@ -178,12 +178,12 @@ func TestKeyMerger(t *testing.T) {
}
}
func selectTags(rows []seriesRow, keys []string) string {
func selectTags(rows []SeriesRow, keys []string) string {
var srows []string
for _, row := range rows {
var ss []string
for _, key := range keys {
for _, tag := range row.tags {
for _, tag := range row.Tags {
if key == string(tag.Key) {
ss = append(ss, string(tag.Key)+"="+string(tag.Value))
}
@ -195,16 +195,16 @@ func selectTags(rows []seriesRow, keys []string) string {
}
type sliceSeriesCursor struct {
rows []seriesRow
rows []SeriesRow
i int
}
func newSeriesRows(keys ...string) []seriesRow {
rows := make([]seriesRow, len(keys))
func newSeriesRows(keys ...string) []SeriesRow {
rows := make([]SeriesRow, len(keys))
for i := range keys {
rows[i].name, rows[i].stags = models.ParseKeyBytes([]byte(keys[i]))
rows[i].tags = rows[i].stags.Clone()
rows[i].tags.Set([]byte("_m"), rows[i].name)
rows[i].Name, rows[i].SeriesTags = models.ParseKeyBytes([]byte(keys[i]))
rows[i].Tags = rows[i].SeriesTags.Clone()
rows[i].Tags.Set([]byte("_m"), rows[i].Name)
}
return rows
}
@ -212,7 +212,7 @@ func newSeriesRows(keys ...string) []seriesRow {
func (s *sliceSeriesCursor) Close() {}
func (s *sliceSeriesCursor) Err() error { return nil }
func (s *sliceSeriesCursor) Next() *seriesRow {
func (s *sliceSeriesCursor) Next() *SeriesRow {
if s.i < len(s.rows) {
s.i++
return &s.rows[s.i-1]

View File

@ -11,7 +11,7 @@ type grpcServer struct {
addr string
loggingEnabled bool
rpc *grpc.Server
store *Store
store Store
logger *zap.Logger
}

View File

@ -293,7 +293,7 @@ func HasSingleMeasurementNoOR(expr influxql.Expr) (string, bool) {
}
func HasFieldKeyOrValue(expr influxql.Expr) (bool, bool) {
refs := hasRefs{refs: []string{"_field", "$"}, found: make([]bool, 2)}
refs := hasRefs{refs: []string{fieldKey, "$"}, found: make([]bool, 2)}
influxql.Walk(&refs, expr)
return refs.found[0], refs.found[1]
}

View File

@ -10,7 +10,7 @@ import (
"github.com/influxdata/influxdb/tsdb"
)
func (w *responseWriter) getFloatPointsFrame() *ReadResponse_Frame_FloatPoints {
func (w *ResponseWriter) getFloatPointsFrame() *ReadResponse_Frame_FloatPoints {
var res *ReadResponse_Frame_FloatPoints
if len(w.buffer.Float) > 0 {
i := len(w.buffer.Float) - 1
@ -24,13 +24,13 @@ func (w *responseWriter) getFloatPointsFrame() *ReadResponse_Frame_FloatPoints {
return res
}
func (w *responseWriter) putFloatPointsFrame(f *ReadResponse_Frame_FloatPoints) {
func (w *ResponseWriter) putFloatPointsFrame(f *ReadResponse_Frame_FloatPoints) {
f.FloatPoints.Timestamps = f.FloatPoints.Timestamps[:0]
f.FloatPoints.Values = f.FloatPoints.Values[:0]
w.buffer.Float = append(w.buffer.Float, f)
}
func (w *responseWriter) streamFloatArraySeries(cur tsdb.FloatArrayCursor) {
func (w *ResponseWriter) streamFloatArraySeries(cur tsdb.FloatArrayCursor) {
w.sf.DataType = DataTypeFloat
ss := len(w.res.Frames) - 1
a := cur.Next()
@ -39,11 +39,11 @@ func (w *responseWriter) streamFloatArraySeries(cur tsdb.FloatArrayCursor) {
w.putSeriesFrame(w.res.Frames[ss].Data.(*ReadResponse_Frame_Series))
w.res.Frames = w.res.Frames[:ss]
} else if w.sz > writeSize {
w.flushFrames()
w.Flush()
}
}
func (w *responseWriter) streamFloatArrayPoints(cur tsdb.FloatArrayCursor) {
func (w *ResponseWriter) streamFloatArrayPoints(cur tsdb.FloatArrayCursor) {
w.sf.DataType = DataTypeFloat
ss := len(w.res.Frames) - 1
@ -71,7 +71,7 @@ func (w *responseWriter) streamFloatArrayPoints(cur tsdb.FloatArrayCursor) {
b = 0
w.sz += frame.Size()
if w.sz >= writeSize {
w.flushFrames()
w.Flush()
if w.err != nil {
break
}
@ -90,11 +90,11 @@ func (w *responseWriter) streamFloatArrayPoints(cur tsdb.FloatArrayCursor) {
w.putSeriesFrame(w.res.Frames[ss].Data.(*ReadResponse_Frame_Series))
w.res.Frames = w.res.Frames[:ss]
} else if w.sz > writeSize {
w.flushFrames()
w.Flush()
}
}
func (w *responseWriter) getIntegerPointsFrame() *ReadResponse_Frame_IntegerPoints {
func (w *ResponseWriter) getIntegerPointsFrame() *ReadResponse_Frame_IntegerPoints {
var res *ReadResponse_Frame_IntegerPoints
if len(w.buffer.Integer) > 0 {
i := len(w.buffer.Integer) - 1
@ -108,13 +108,13 @@ func (w *responseWriter) getIntegerPointsFrame() *ReadResponse_Frame_IntegerPoin
return res
}
func (w *responseWriter) putIntegerPointsFrame(f *ReadResponse_Frame_IntegerPoints) {
func (w *ResponseWriter) putIntegerPointsFrame(f *ReadResponse_Frame_IntegerPoints) {
f.IntegerPoints.Timestamps = f.IntegerPoints.Timestamps[:0]
f.IntegerPoints.Values = f.IntegerPoints.Values[:0]
w.buffer.Integer = append(w.buffer.Integer, f)
}
func (w *responseWriter) streamIntegerArraySeries(cur tsdb.IntegerArrayCursor) {
func (w *ResponseWriter) streamIntegerArraySeries(cur tsdb.IntegerArrayCursor) {
w.sf.DataType = DataTypeInteger
ss := len(w.res.Frames) - 1
a := cur.Next()
@ -123,11 +123,11 @@ func (w *responseWriter) streamIntegerArraySeries(cur tsdb.IntegerArrayCursor) {
w.putSeriesFrame(w.res.Frames[ss].Data.(*ReadResponse_Frame_Series))
w.res.Frames = w.res.Frames[:ss]
} else if w.sz > writeSize {
w.flushFrames()
w.Flush()
}
}
func (w *responseWriter) streamIntegerArrayPoints(cur tsdb.IntegerArrayCursor) {
func (w *ResponseWriter) streamIntegerArrayPoints(cur tsdb.IntegerArrayCursor) {
w.sf.DataType = DataTypeInteger
ss := len(w.res.Frames) - 1
@ -155,7 +155,7 @@ func (w *responseWriter) streamIntegerArrayPoints(cur tsdb.IntegerArrayCursor) {
b = 0
w.sz += frame.Size()
if w.sz >= writeSize {
w.flushFrames()
w.Flush()
if w.err != nil {
break
}
@ -174,11 +174,11 @@ func (w *responseWriter) streamIntegerArrayPoints(cur tsdb.IntegerArrayCursor) {
w.putSeriesFrame(w.res.Frames[ss].Data.(*ReadResponse_Frame_Series))
w.res.Frames = w.res.Frames[:ss]
} else if w.sz > writeSize {
w.flushFrames()
w.Flush()
}
}
func (w *responseWriter) getUnsignedPointsFrame() *ReadResponse_Frame_UnsignedPoints {
func (w *ResponseWriter) getUnsignedPointsFrame() *ReadResponse_Frame_UnsignedPoints {
var res *ReadResponse_Frame_UnsignedPoints
if len(w.buffer.Unsigned) > 0 {
i := len(w.buffer.Unsigned) - 1
@ -192,13 +192,13 @@ func (w *responseWriter) getUnsignedPointsFrame() *ReadResponse_Frame_UnsignedPo
return res
}
func (w *responseWriter) putUnsignedPointsFrame(f *ReadResponse_Frame_UnsignedPoints) {
func (w *ResponseWriter) putUnsignedPointsFrame(f *ReadResponse_Frame_UnsignedPoints) {
f.UnsignedPoints.Timestamps = f.UnsignedPoints.Timestamps[:0]
f.UnsignedPoints.Values = f.UnsignedPoints.Values[:0]
w.buffer.Unsigned = append(w.buffer.Unsigned, f)
}
func (w *responseWriter) streamUnsignedArraySeries(cur tsdb.UnsignedArrayCursor) {
func (w *ResponseWriter) streamUnsignedArraySeries(cur tsdb.UnsignedArrayCursor) {
w.sf.DataType = DataTypeUnsigned
ss := len(w.res.Frames) - 1
a := cur.Next()
@ -207,11 +207,11 @@ func (w *responseWriter) streamUnsignedArraySeries(cur tsdb.UnsignedArrayCursor)
w.putSeriesFrame(w.res.Frames[ss].Data.(*ReadResponse_Frame_Series))
w.res.Frames = w.res.Frames[:ss]
} else if w.sz > writeSize {
w.flushFrames()
w.Flush()
}
}
func (w *responseWriter) streamUnsignedArrayPoints(cur tsdb.UnsignedArrayCursor) {
func (w *ResponseWriter) streamUnsignedArrayPoints(cur tsdb.UnsignedArrayCursor) {
w.sf.DataType = DataTypeUnsigned
ss := len(w.res.Frames) - 1
@ -239,7 +239,7 @@ func (w *responseWriter) streamUnsignedArrayPoints(cur tsdb.UnsignedArrayCursor)
b = 0
w.sz += frame.Size()
if w.sz >= writeSize {
w.flushFrames()
w.Flush()
if w.err != nil {
break
}
@ -258,11 +258,11 @@ func (w *responseWriter) streamUnsignedArrayPoints(cur tsdb.UnsignedArrayCursor)
w.putSeriesFrame(w.res.Frames[ss].Data.(*ReadResponse_Frame_Series))
w.res.Frames = w.res.Frames[:ss]
} else if w.sz > writeSize {
w.flushFrames()
w.Flush()
}
}
func (w *responseWriter) getStringPointsFrame() *ReadResponse_Frame_StringPoints {
func (w *ResponseWriter) getStringPointsFrame() *ReadResponse_Frame_StringPoints {
var res *ReadResponse_Frame_StringPoints
if len(w.buffer.String) > 0 {
i := len(w.buffer.String) - 1
@ -276,13 +276,13 @@ func (w *responseWriter) getStringPointsFrame() *ReadResponse_Frame_StringPoints
return res
}
func (w *responseWriter) putStringPointsFrame(f *ReadResponse_Frame_StringPoints) {
func (w *ResponseWriter) putStringPointsFrame(f *ReadResponse_Frame_StringPoints) {
f.StringPoints.Timestamps = f.StringPoints.Timestamps[:0]
f.StringPoints.Values = f.StringPoints.Values[:0]
w.buffer.String = append(w.buffer.String, f)
}
func (w *responseWriter) streamStringArraySeries(cur tsdb.StringArrayCursor) {
func (w *ResponseWriter) streamStringArraySeries(cur tsdb.StringArrayCursor) {
w.sf.DataType = DataTypeString
ss := len(w.res.Frames) - 1
a := cur.Next()
@ -291,11 +291,11 @@ func (w *responseWriter) streamStringArraySeries(cur tsdb.StringArrayCursor) {
w.putSeriesFrame(w.res.Frames[ss].Data.(*ReadResponse_Frame_Series))
w.res.Frames = w.res.Frames[:ss]
} else if w.sz > writeSize {
w.flushFrames()
w.Flush()
}
}
func (w *responseWriter) streamStringArrayPoints(cur tsdb.StringArrayCursor) {
func (w *ResponseWriter) streamStringArrayPoints(cur tsdb.StringArrayCursor) {
w.sf.DataType = DataTypeString
ss := len(w.res.Frames) - 1
@ -323,7 +323,7 @@ func (w *responseWriter) streamStringArrayPoints(cur tsdb.StringArrayCursor) {
b = 0
w.sz += frame.Size()
if w.sz >= writeSize {
w.flushFrames()
w.Flush()
if w.err != nil {
break
}
@ -342,11 +342,11 @@ func (w *responseWriter) streamStringArrayPoints(cur tsdb.StringArrayCursor) {
w.putSeriesFrame(w.res.Frames[ss].Data.(*ReadResponse_Frame_Series))
w.res.Frames = w.res.Frames[:ss]
} else if w.sz > writeSize {
w.flushFrames()
w.Flush()
}
}
func (w *responseWriter) getBooleanPointsFrame() *ReadResponse_Frame_BooleanPoints {
func (w *ResponseWriter) getBooleanPointsFrame() *ReadResponse_Frame_BooleanPoints {
var res *ReadResponse_Frame_BooleanPoints
if len(w.buffer.Boolean) > 0 {
i := len(w.buffer.Boolean) - 1
@ -360,13 +360,13 @@ func (w *responseWriter) getBooleanPointsFrame() *ReadResponse_Frame_BooleanPoin
return res
}
func (w *responseWriter) putBooleanPointsFrame(f *ReadResponse_Frame_BooleanPoints) {
func (w *ResponseWriter) putBooleanPointsFrame(f *ReadResponse_Frame_BooleanPoints) {
f.BooleanPoints.Timestamps = f.BooleanPoints.Timestamps[:0]
f.BooleanPoints.Values = f.BooleanPoints.Values[:0]
w.buffer.Boolean = append(w.buffer.Boolean, f)
}
func (w *responseWriter) streamBooleanArraySeries(cur tsdb.BooleanArrayCursor) {
func (w *ResponseWriter) streamBooleanArraySeries(cur tsdb.BooleanArrayCursor) {
w.sf.DataType = DataTypeBoolean
ss := len(w.res.Frames) - 1
a := cur.Next()
@ -375,11 +375,11 @@ func (w *responseWriter) streamBooleanArraySeries(cur tsdb.BooleanArrayCursor) {
w.putSeriesFrame(w.res.Frames[ss].Data.(*ReadResponse_Frame_Series))
w.res.Frames = w.res.Frames[:ss]
} else if w.sz > writeSize {
w.flushFrames()
w.Flush()
}
}
func (w *responseWriter) streamBooleanArrayPoints(cur tsdb.BooleanArrayCursor) {
func (w *ResponseWriter) streamBooleanArrayPoints(cur tsdb.BooleanArrayCursor) {
w.sf.DataType = DataTypeBoolean
ss := len(w.res.Frames) - 1
@ -407,7 +407,7 @@ func (w *responseWriter) streamBooleanArrayPoints(cur tsdb.BooleanArrayCursor) {
b = 0
w.sz += frame.Size()
if w.sz >= writeSize {
w.flushFrames()
w.Flush()
if w.err != nil {
break
}
@ -426,6 +426,6 @@ func (w *responseWriter) streamBooleanArrayPoints(cur tsdb.BooleanArrayCursor) {
w.putSeriesFrame(w.res.Frames[ss].Data.(*ReadResponse_Frame_Series))
w.res.Frames = w.res.Frames[:ss]
} else if w.sz > writeSize {
w.flushFrames()
w.Flush()
}
}

View File

@ -6,7 +6,7 @@ import (
{{range .}}
func (w *responseWriter) get{{.Name}}PointsFrame() *ReadResponse_Frame_{{.Name}}Points {
func (w *ResponseWriter) get{{.Name}}PointsFrame() *ReadResponse_Frame_{{.Name}}Points {
var res *ReadResponse_Frame_{{.Name}}Points
if len(w.buffer.{{.Name}}) > 0 {
i := len(w.buffer.{{.Name}}) - 1
@ -20,13 +20,13 @@ func (w *responseWriter) get{{.Name}}PointsFrame() *ReadResponse_Frame_{{.Name}}
return res
}
func (w *responseWriter) put{{.Name}}PointsFrame(f *ReadResponse_Frame_{{.Name}}Points) {
func (w *ResponseWriter) put{{.Name}}PointsFrame(f *ReadResponse_Frame_{{.Name}}Points) {
f.{{.Name}}Points.Timestamps = f.{{.Name}}Points.Timestamps[:0]
f.{{.Name}}Points.Values = f.{{.Name}}Points.Values[:0]
w.buffer.{{.Name}} = append(w.buffer.{{.Name}}, f)
}
func (w *responseWriter) stream{{.Name}}ArraySeries(cur tsdb.{{.Name}}ArrayCursor) {
func (w *ResponseWriter) stream{{.Name}}ArraySeries(cur tsdb.{{.Name}}ArrayCursor) {
w.sf.DataType = DataType{{.Name}}
ss := len(w.res.Frames) - 1
a := cur.Next()
@ -35,11 +35,11 @@ func (w *responseWriter) stream{{.Name}}ArraySeries(cur tsdb.{{.Name}}ArrayCurso
w.putSeriesFrame(w.res.Frames[ss].Data.(*ReadResponse_Frame_Series))
w.res.Frames = w.res.Frames[:ss]
} else if w.sz > writeSize {
w.flushFrames()
w.Flush()
}
}
func (w *responseWriter) stream{{.Name}}ArrayPoints(cur tsdb.{{.Name}}ArrayCursor) {
func (w *ResponseWriter) stream{{.Name}}ArrayPoints(cur tsdb.{{.Name}}ArrayCursor) {
w.sf.DataType = DataType{{.Name}}
ss := len(w.res.Frames) - 1
@ -67,7 +67,7 @@ func (w *responseWriter) stream{{.Name}}ArrayPoints(cur tsdb.{{.Name}}ArrayCurso
b = 0
w.sz += frame.Size()
if w.sz >= writeSize {
w.flushFrames()
w.Flush()
if w.err != nil {
break
}
@ -86,7 +86,7 @@ func (w *responseWriter) stream{{.Name}}ArrayPoints(cur tsdb.{{.Name}}ArrayCurso
w.putSeriesFrame(w.res.Frames[ss].Data.(*ReadResponse_Frame_Series))
w.res.Frames = w.res.Frames[:ss]
} else if w.sz > writeSize {
w.flushFrames()
w.Flush()
}
}
{{end}}

View File

@ -5,13 +5,21 @@ import (
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/tsdb"
"go.uber.org/zap"
)
type responseWriter struct {
stream Storage_ReadServer
type ResponseStream interface {
Send(*ReadResponse) error
}
const (
batchSize = 1000
frameCount = 50
writeSize = 64 << 10 // 64k
)
type ResponseWriter struct {
stream ResponseStream
res *ReadResponse
logger *zap.Logger
err error
// current series
@ -34,7 +42,62 @@ type responseWriter struct {
hints HintFlags
}
func (w *responseWriter) getGroupFrame(keys, partitionKey [][]byte) *ReadResponse_Frame_Group {
func NewResponseWriter(stream ResponseStream, hints HintFlags) *ResponseWriter {
rw := &ResponseWriter{stream: stream, res: &ReadResponse{Frames: make([]ReadResponse_Frame, 0, frameCount)}, hints: hints}
return rw
}
// WrittenN returns the number of values written to the response stream.
func (w *ResponseWriter) WrittenN() int { return w.vc }
func (w *ResponseWriter) WriteResultSet(rs ResultSet) error {
for rs.Next() {
cur := rs.Cursor()
if cur == nil {
// no data for series key + field combination
continue
}
w.startSeries(rs.Tags())
w.streamCursor(cur)
if w.err != nil {
cur.Close()
return w.err
}
}
return nil
}
func (w *ResponseWriter) WriteGroupResultSet(rs GroupResultSet) error {
gc := rs.Next()
for gc != nil {
w.startGroup(gc.Keys(), gc.PartitionKeyVals())
for gc.Next() {
cur := gc.Cursor()
if cur == nil {
// no data for series key + field combination
continue
}
w.startSeries(gc.Tags())
w.streamCursor(cur)
if w.err != nil {
gc.Close()
return w.err
}
}
gc.Close()
gc = rs.Next()
}
return nil
}
func (w *ResponseWriter) Err() error { return w.err }
func (w *ResponseWriter) getGroupFrame(keys, partitionKey [][]byte) *ReadResponse_Frame_Group {
var res *ReadResponse_Frame_Group
if len(w.buffer.Group) > 0 {
i := len(w.buffer.Group) - 1
@ -60,7 +123,7 @@ func (w *responseWriter) getGroupFrame(keys, partitionKey [][]byte) *ReadRespons
return res
}
func (w *responseWriter) putGroupFrame(f *ReadResponse_Frame_Group) {
func (w *ResponseWriter) putGroupFrame(f *ReadResponse_Frame_Group) {
for i := range f.Group.TagKeys {
f.Group.TagKeys[i] = nil
}
@ -70,7 +133,7 @@ func (w *responseWriter) putGroupFrame(f *ReadResponse_Frame_Group) {
w.buffer.Group = append(w.buffer.Group, f)
}
func (w *responseWriter) getSeriesFrame(next models.Tags) *ReadResponse_Frame_Series {
func (w *ResponseWriter) getSeriesFrame(next models.Tags) *ReadResponse_Frame_Series {
var res *ReadResponse_Frame_Series
if len(w.buffer.Series) > 0 {
i := len(w.buffer.Series) - 1
@ -90,7 +153,7 @@ func (w *responseWriter) getSeriesFrame(next models.Tags) *ReadResponse_Frame_Se
return res
}
func (w *responseWriter) putSeriesFrame(f *ReadResponse_Frame_Series) {
func (w *ResponseWriter) putSeriesFrame(f *ReadResponse_Frame_Series) {
tags := f.Series.Tags
for i := range tags {
tags[i].Key = nil
@ -99,7 +162,7 @@ func (w *responseWriter) putSeriesFrame(f *ReadResponse_Frame_Series) {
w.buffer.Series = append(w.buffer.Series, f)
}
func (w *responseWriter) startGroup(keys, partitionKey [][]byte) {
func (w *ResponseWriter) startGroup(keys, partitionKey [][]byte) {
f := w.getGroupFrame(keys, partitionKey)
copy(f.Group.TagKeys, keys)
copy(f.Group.PartitionKeyVals, partitionKey)
@ -107,7 +170,7 @@ func (w *responseWriter) startGroup(keys, partitionKey [][]byte) {
w.sz += f.Size()
}
func (w *responseWriter) startSeries(next models.Tags) {
func (w *ResponseWriter) startSeries(next models.Tags) {
if w.hints.NoSeries() {
return
}
@ -123,7 +186,7 @@ func (w *responseWriter) startSeries(next models.Tags) {
w.sz += w.sf.Size()
}
func (w *responseWriter) streamCursor(cur tsdb.Cursor) {
func (w *ResponseWriter) streamCursor(cur tsdb.Cursor) {
switch {
case w.hints.NoSeries():
// skip
@ -162,7 +225,7 @@ func (w *responseWriter) streamCursor(cur tsdb.Cursor) {
cur.Close()
}
func (w *responseWriter) flushFrames() {
func (w *ResponseWriter) Flush() {
if w.err != nil || w.sz == 0 {
return
}
@ -170,7 +233,6 @@ func (w *responseWriter) flushFrames() {
w.sz = 0
if w.err = w.stream.Send(w.res); w.err != nil {
w.logger.Error("stream.Send failed", zap.Error(w.err))
return
}

View File

@ -7,32 +7,34 @@ import (
"github.com/influxdata/influxdb/tsdb"
)
type readRequest struct {
ctx context.Context
start, end int64
asc bool
limit int64
aggregate *Aggregate
}
type multiShardCursors interface {
createCursor(row seriesRow) tsdb.Cursor
createCursor(row SeriesRow) tsdb.Cursor
newAggregateCursor(ctx context.Context, agg *Aggregate, cursor tsdb.Cursor) tsdb.Cursor
}
type resultSet struct {
req readRequest
cur seriesCursor
row seriesRow
ctx context.Context
agg *Aggregate
cur SeriesCursor
row SeriesRow
mb multiShardCursors
}
func NewResultSet(ctx context.Context, req *ReadRequest, cur SeriesCursor) ResultSet {
return &resultSet{
ctx: ctx,
agg: req.Aggregate,
cur: cur,
mb: newMultiShardArrayCursors(ctx, req.TimestampRange.Start, req.TimestampRange.End, !req.Descending, req.PointsLimit),
}
}
// Close closes the result set. Close is idempotent.
func (r *resultSet) Close() {
if r == nil {
return // Nothing to do.
}
r.row.query = nil
r.row.Query = nil
r.cur.Close()
}
@ -54,12 +56,12 @@ func (r *resultSet) Next() bool {
func (r *resultSet) Cursor() tsdb.Cursor {
cur := r.mb.createCursor(r.row)
if r.req.aggregate != nil {
cur = r.mb.newAggregateCursor(r.req.ctx, r.req.aggregate, cur)
if r.agg != nil {
cur = r.mb.newAggregateCursor(r.ctx, r.agg, cur)
}
return cur
}
func (r *resultSet) Tags() models.Tags {
return r.row.tags
return r.row.Tags
}

View File

@ -15,20 +15,18 @@ import (
"go.uber.org/zap"
)
//go:generate protoc -I$GOPATH/src/github.com/influxdata/influxdb/vendor -I. --gogofaster_out=Mgoogle/protobuf/empty.proto=github.com/gogo/protobuf/types,plugins=grpc:. storage.proto predicate.proto
//go:generate protoc -I$GOPATH/src/github.com/influxdata/influxdb/vendor -I. --gogofaster_out=Mgoogle/protobuf/empty.proto=github.com/gogo/protobuf/types,Mgoogle/protobuf/any.proto=github.com/gogo/protobuf/types,plugins=grpc:. storage_common.proto storage.proto predicate.proto
//go:generate tmpl -data=@array_cursor.gen.go.tmpldata array_cursor.gen.go.tmpl
//go:generate tmpl -data=@array_cursor.gen.go.tmpldata response_writer.gen.go.tmpl
const (
batchSize = 1000
frameCount = 50
writeSize = 64 << 10 // 64k
var (
ErrMissingReadSource = errors.New("missing ReadSource")
)
type rpcService struct {
loggingEnabled bool
Store *Store
Store Store
Logger *zap.Logger
}
@ -41,23 +39,13 @@ func (r *rpcService) Hints(context.Context, *types.Empty) (*HintsResponse, error
}
func (r *rpcService) Read(req *ReadRequest, stream Storage_ReadServer) error {
var err error
var wire opentracing.SpanContext
if len(req.Trace) > 0 {
wire, _ = opentracing.GlobalTracer().Extract(opentracing.TextMap, opentracing.TextMapCarrier(req.Trace))
// TODO(sgc): Log ignored error?
source, err := getReadSource(req)
if err != nil {
return err
}
span := opentracing.StartSpan("storage.read", ext.RPCServerOption(wire))
defer span.Finish()
ext.DBInstance.Set(span, req.Database)
ctx := stream.Context()
ctx = opentracing.ContextWithSpan(ctx, span)
// TODO(sgc): this should be available via a generic API, such as tsdb.Store
ctx = tsm1.NewContextWithMetricsGroup(ctx)
ctx := tsm1.NewContextWithMetricsGroup(stream.Context())
var agg Aggregate_AggregateType
if req.Aggregate != nil {
@ -65,17 +53,23 @@ func (r *rpcService) Read(req *ReadRequest, stream Storage_ReadServer) error {
}
pred := truncateString(PredicateToExprString(req.Predicate))
groupKeys := truncateString(strings.Join(req.GroupKeys, ","))
span.
SetTag("predicate", pred).
SetTag("series_limit", req.SeriesLimit).
SetTag("series_offset", req.SeriesOffset).
SetTag("points_limit", req.PointsLimit).
SetTag("start", req.TimestampRange.Start).
SetTag("end", req.TimestampRange.End).
SetTag("desc", req.Descending).
SetTag("group", req.Group.String()).
SetTag("group_keys", groupKeys).
SetTag("aggregate", agg.String())
span := opentracing.SpanFromContext(ctx)
if span != nil {
span.
SetTag(string(ext.DBInstance), source.Database).
SetTag("rp", source.RetentionPolicy).
SetTag("predicate", pred).
SetTag("series_limit", req.SeriesLimit).
SetTag("series_offset", req.SeriesOffset).
SetTag("points_limit", req.PointsLimit).
SetTag("start", req.TimestampRange.Start).
SetTag("end", req.TimestampRange.End).
SetTag("desc", req.Descending).
SetTag("group", req.Group.String()).
SetTag("group_keys", groupKeys).
SetTag("aggregate", agg.String())
}
log, logEnd := logger.NewOperation(r.Logger, "Read", "storage_read")
defer logEnd()
@ -83,7 +77,8 @@ func (r *rpcService) Read(req *ReadRequest, stream Storage_ReadServer) error {
if r.loggingEnabled {
log.Info("Read request info",
zap.String("database", req.Database),
zap.String("database", source.Database),
zap.String("rp", source.RetentionPolicy),
zap.String("predicate", pred),
zap.String("hints", req.Hints.String()),
zap.Int64("series_limit", req.SeriesLimit),
@ -106,12 +101,7 @@ func (r *rpcService) Read(req *ReadRequest, stream Storage_ReadServer) error {
req.PointsLimit = math.MaxInt64
}
w := &responseWriter{
stream: stream,
res: &ReadResponse{Frames: make([]ReadResponse_Frame, 0, frameCount)},
logger: log,
hints: req.Hints,
}
w := NewResponseWriter(stream, req.Hints)
switch req.Group {
case GroupBy, GroupExcept:
@ -127,95 +117,63 @@ func (r *rpcService) Read(req *ReadRequest, stream Storage_ReadServer) error {
}
if req.Group == GroupAll {
err = r.handleRead(ctx, req, w)
r.handleRead(ctx, req, w, log)
} else {
err = r.handleGroupRead(ctx, req, w)
r.handleGroupRead(ctx, req, w, log)
}
if err != nil {
log.Error("Read failed", zap.Error(err))
}
w.flushFrames()
if r.loggingEnabled {
log.Info("Read completed", zap.Int("num_values", w.vc))
log.Info("Read completed", zap.Int("num_values", w.WrittenN()))
}
span.SetTag("num_values", w.vc)
grp := tsm1.MetricsGroupFromContext(ctx)
grp.ForEach(func(v metrics.Metric) {
switch m := v.(type) {
case *metrics.Counter:
span.SetTag(m.Name(), m.Value())
}
})
if span != nil {
span.SetTag("num_values", w.WrittenN())
grp := tsm1.MetricsGroupFromContext(ctx)
grp.ForEach(func(v metrics.Metric) {
switch m := v.(type) {
case *metrics.Counter:
span.SetTag(m.Name(), m.Value())
}
})
}
return nil
}
func (r *rpcService) handleRead(ctx context.Context, req *ReadRequest, w *responseWriter) error {
func (r *rpcService) handleRead(ctx context.Context, req *ReadRequest, w *ResponseWriter, log *zap.Logger) {
rs, err := r.Store.Read(ctx, req)
if err != nil {
return err
log.Error("Read failed", zap.Error(w.Err()))
return
}
if rs == nil {
return nil
return
}
defer rs.Close()
w.WriteResultSet(rs)
w.Flush()
for rs.Next() {
cur := rs.Cursor()
if cur == nil {
// no data for series key + field combination
continue
}
w.startSeries(rs.Tags())
w.streamCursor(cur)
if w.err != nil {
cur.Close()
return w.err
}
if w.Err() != nil {
log.Error("Write failed", zap.Error(w.Err()))
}
return nil
}
func (r *rpcService) handleGroupRead(ctx context.Context, req *ReadRequest, w *responseWriter) error {
func (r *rpcService) handleGroupRead(ctx context.Context, req *ReadRequest, w *ResponseWriter, log *zap.Logger) {
rs, err := r.Store.GroupRead(ctx, req)
if err != nil {
return err
log.Error("GroupRead failed", zap.Error(w.Err()))
return
}
if rs == nil {
return nil
return
}
defer rs.Close()
w.WriteGroupResultSet(rs)
w.Flush()
gc := rs.Next()
for gc != nil {
w.startGroup(gc.Keys(), gc.PartitionKeyVals())
if !req.Hints.HintSchemaAllTime() {
for gc.Next() {
cur := gc.Cursor()
if cur == nil {
// no data for series key + field combination
continue
}
w.startSeries(gc.Tags())
w.streamCursor(cur)
if w.err != nil {
gc.Close()
return w.err
}
}
}
gc.Close()
gc = rs.Next()
if w.Err() != nil {
log.Error("Write failed", zap.Error(w.Err()))
}
return nil
}

View File

@ -12,47 +12,46 @@ import (
"github.com/opentracing/opentracing-go"
)
const measurementKey = "_measurement"
const (
measurementKey = "_measurement"
fieldKey = "_field"
)
var (
measurementKeyBytes = []byte(measurementKey)
fieldKeyBytes = []byte("_field")
fieldKeyBytes = []byte(fieldKey)
)
type seriesCursor interface {
type SeriesIndex interface {
CreateCursor(ctx context.Context, req *ReadRequest, shards []*tsdb.Shard)
}
type SeriesCursor interface {
Close()
Next() *seriesRow
Next() *SeriesRow
Err() error
}
type seriesRow struct {
sortKey []byte
name []byte // measurement name
stags models.Tags // unmodified series tags
field field
tags models.Tags
query tsdb.CursorIterators
valueCond influxql.Expr
}
type mapValuer map[string]string
var _ influxql.Valuer = mapValuer(nil)
func (vs mapValuer) Value(key string) (interface{}, bool) {
v, ok := vs[key]
return v, ok
type SeriesRow struct {
SortKey []byte
Name []byte // measurement name
SeriesTags models.Tags // unmodified series tags
Tags models.Tags
Field string
Query tsdb.CursorIterators
ValueCond influxql.Expr
}
type indexSeriesCursor struct {
sqry tsdb.SeriesCursor
fields measurementFields
nf []field
field field
err error
tags models.Tags
cond influxql.Expr
measurementCond influxql.Expr
row seriesRow
row SeriesRow
eof bool
hasFieldExpr bool
hasValueExpr bool
@ -80,7 +79,7 @@ func newIndexSeriesCursor(ctx context.Context, predicate *Predicate, shards []*t
Ascending: true,
Ordered: true,
}
p := &indexSeriesCursor{row: seriesRow{query: queries}}
p := &indexSeriesCursor{row: SeriesRow{Query: queries}}
if root := predicate.GetRoot(); root != nil {
if p.cond, err = NodeToExpr(root, measurementRemap); err != nil {
@ -93,12 +92,12 @@ func newIndexSeriesCursor(ctx context.Context, predicate *Predicate, shards []*t
opt.Condition = p.cond
} else {
p.measurementCond = influxql.Reduce(RewriteExprRemoveFieldValue(influxql.CloneExpr(p.cond)), nil)
if isBooleanLiteral(p.measurementCond) {
if isTrueBooleanLiteral(p.measurementCond) {
p.measurementCond = nil
}
opt.Condition = influxql.Reduce(RewriteExprRemoveFieldKeyAndValue(influxql.CloneExpr(p.cond)), nil)
if isBooleanLiteral(opt.Condition) {
if isTrueBooleanLiteral(opt.Condition) {
opt.Condition = nil
}
}
@ -174,7 +173,7 @@ func copyTags(dst, src models.Tags) models.Tags {
return dst
}
func (c *indexSeriesCursor) Next() *seriesRow {
func (c *indexSeriesCursor) Next() *SeriesRow {
if c.eof {
return nil
}
@ -192,15 +191,15 @@ func (c *indexSeriesCursor) Next() *seriesRow {
return nil
}
c.row.name = sr.Name
c.row.stags = sr.Tags
c.row.Name = sr.Name
c.row.SeriesTags = sr.Tags
c.tags = copyTags(c.tags, sr.Tags)
c.tags.Set(measurementKeyBytes, sr.Name)
c.nf = c.fields[string(sr.Name)]
// c.nf may be nil if there are no fields
} else {
c.row.field, c.nf = c.nf[0], c.nf[1:]
c.field, c.nf = c.nf[0], c.nf[1:]
if c.measurementCond == nil || evalExprBool(c.measurementCond, c) {
break
@ -208,18 +207,19 @@ func (c *indexSeriesCursor) Next() *seriesRow {
}
}
c.tags.Set(fieldKeyBytes, c.row.field.nb)
c.tags.Set(fieldKeyBytes, c.field.nb)
c.row.Field = c.field.n
if c.cond != nil && c.hasValueExpr {
// TODO(sgc): lazily evaluate valueCond
c.row.valueCond = influxql.Reduce(c.cond, c)
if isBooleanLiteral(c.row.valueCond) {
c.row.ValueCond = influxql.Reduce(c.cond, c)
if isTrueBooleanLiteral(c.row.ValueCond) {
// we've reduced the expression to "true"
c.row.valueCond = nil
c.row.ValueCond = nil
}
}
c.row.tags = copyTags(c.row.tags, c.tags)
c.row.Tags = copyTags(c.row.Tags, c.tags)
return &c.row
}
@ -227,11 +227,11 @@ func (c *indexSeriesCursor) Next() *seriesRow {
func (c *indexSeriesCursor) Value(key string) (interface{}, bool) {
switch key {
case "_name":
return c.row.name, true
case "_field":
return c.row.field.n, true
return c.row.Name, true
case fieldKey:
return c.field.n, true
default:
res := c.row.stags.Get([]byte(key))
res := c.row.SeriesTags.Get([]byte(key))
return res, res != nil
}
}
@ -241,18 +241,18 @@ func (c *indexSeriesCursor) Err() error {
}
type limitSeriesCursor struct {
seriesCursor
SeriesCursor
n, o, c int64
}
func newLimitSeriesCursor(ctx context.Context, cur seriesCursor, n, o int64) *limitSeriesCursor {
return &limitSeriesCursor{seriesCursor: cur, o: o, n: n}
func NewLimitSeriesCursor(ctx context.Context, cur SeriesCursor, n, o int64) SeriesCursor {
return &limitSeriesCursor{SeriesCursor: cur, o: o, n: n}
}
func (c *limitSeriesCursor) Next() *seriesRow {
func (c *limitSeriesCursor) Next() *SeriesRow {
if c.o > 0 {
for i := int64(0); i < c.o; i++ {
if c.seriesCursor.Next() == nil {
if c.SeriesCursor.Next() == nil {
break
}
}
@ -263,12 +263,15 @@ func (c *limitSeriesCursor) Next() *seriesRow {
return nil
}
c.c++
return c.seriesCursor.Next()
return c.SeriesCursor.Next()
}
func isBooleanLiteral(expr influxql.Expr) bool {
_, ok := expr.(*influxql.BooleanLiteral)
return ok
func isTrueBooleanLiteral(expr influxql.Expr) bool {
b, ok := expr.(*influxql.BooleanLiteral)
if ok {
return b.Val
}
return false
}
func toFloatIterator(iter query.Iterator) (query.FloatIterator, error) {

View File

@ -41,7 +41,7 @@ func TestPlannerCondition(t *testing.T) {
var keys []string
row := p.Next()
for row != nil {
keys = append(keys, string(models.MakeKey(row.name, row.stags))+" "+row.field.n)
keys = append(keys, string(models.MakeKey(row.Name, row.SeriesTags))+" "+row.Field)
row = p.Next()
}

View File

@ -8,7 +8,7 @@ import (
"go.uber.org/zap"
)
type StorageMetaClient interface {
type MetaClient interface {
Database(name string) *meta.DatabaseInfo
ShardGroupsByTimeRange(database, policy string, min, max time.Time) (a []meta.ShardGroupInfo, err error)
}
@ -20,9 +20,9 @@ type Service struct {
loggingEnabled bool
logger *zap.Logger
Store *Store
Store *localStore
TSDBStore *tsdb.Store
MetaClient StorageMetaClient
MetaClient MetaClient
}
// NewService returns a new instance of Service.
@ -45,10 +45,8 @@ func (s *Service) WithLogger(log *zap.Logger) {
func (s *Service) Open() error {
s.logger.Info("Starting storage service")
store := NewStore()
store.TSDBStore = s.TSDBStore
store.MetaClient = s.MetaClient
store.Logger = s.logger
store := NewStore(s.TSDBStore, s.MetaClient)
s.WithLogger(s.logger)
grpc := &grpcServer{
addr: s.addr,

File diff suppressed because it is too large Load Diff

View File

@ -3,204 +3,16 @@ package com.github.influxdata.influxdb.services.storage;
option go_package = "storage";
import "github.com/gogo/protobuf/gogoproto/gogo.proto";
import "google/protobuf/empty.proto";
import "predicate.proto";
option (gogoproto.marshaler_all) = true;
option (gogoproto.sizer_all) = true;
option (gogoproto.unmarshaler_all) = true;
option (gogoproto.goproto_getters_all) = false;
service Storage {
// Read performs a read operation using the given ReadRequest
rpc Read (ReadRequest) returns (stream ReadResponse);
// Capabilities returns a map of keys and values identifying the capabilities supported by the storage engine
rpc Capabilities (google.protobuf.Empty) returns (CapabilitiesResponse);
rpc Hints (google.protobuf.Empty) returns (HintsResponse);
// Explain describes the costs associated with executing a given Read request
// rpc Explain(google.protobuf.Empty) returns (ExplainResponse){}
}
// Request message for Storage.Read.
message ReadRequest {
enum Group {
option (gogoproto.goproto_enum_prefix) = false;
// GroupNone returns all series as a single group.
// The single GroupFrame.TagKeys will be the union of all tag keys.
GROUP_NONE = 0 [(gogoproto.enumvalue_customname) = "GroupNone"];
// GroupAll returns a unique group for each series.
// As an optimization, no GroupFrames will be generated.
GROUP_ALL = 1 [(gogoproto.enumvalue_customname) = "GroupAll"];
// GroupBy returns a group for each unique value of the specified GroupKeys.
GROUP_BY = 2 [(gogoproto.enumvalue_customname) = "GroupBy"];
// GroupExcept in not implemented.
GROUP_EXCEPT = 3 [(gogoproto.enumvalue_customname) = "GroupExcept"];
}
enum HintFlags {
option (gogoproto.goproto_enum_prefix) = false;
HINT_NONE = 0x00 [(gogoproto.enumvalue_customname) = "HintNone"];
HINT_NO_POINTS = 0x01 [(gogoproto.enumvalue_customname) = "HintNoPoints"];
HINT_NO_SERIES = 0x02 [(gogoproto.enumvalue_customname) = "HintNoSeries"];
// HintSchemaAllTime performs schema queries without using time ranges
HINT_SCHEMA_ALL_TIME = 0x04 [(gogoproto.enumvalue_customname) = "HintSchemaAllTime"];
}
// Database specifies the database name (single tenant) or bucket identifier (multi tenant).
message ReadSource {
// Database identifies which database to query.
string database = 1;
TimestampRange timestamp_range = 2 [(gogoproto.customname) = "TimestampRange", (gogoproto.nullable) = false];
// Descending indicates whether points should be returned in descending order.
bool descending = 3;
// GroupKeys specifies a list of tag keys used to order the data. It is dependent on the Group property to determine
// its behavior.
repeated string group_keys = 4 [(gogoproto.customname) = "GroupKeys"];
//
Group group = 11;
// Aggregate specifies an optional aggregate to apply to the data.
// TODO(sgc): switch to slice for multiple aggregates in a single request
Aggregate aggregate = 9;
Predicate predicate = 5;
// SeriesLimit determines the maximum number of series to be returned for the request. Specify 0 for no limit.
int64 series_limit = 6 [(gogoproto.customname) = "SeriesLimit"];
// SeriesOffset determines how many series to skip before processing the request.
int64 series_offset = 7 [(gogoproto.customname) = "SeriesOffset"];
// PointsLimit determines the maximum number of values per series to be returned for the request.
// Specify 0 for no limit. -1 to return series frames only.
int64 points_limit = 8 [(gogoproto.customname) = "PointsLimit"];
// Trace contains opaque data if a trace is active.
map<string, string> trace = 10 [(gogoproto.customname) = "Trace"];
// Hints is a bitwise OR of HintFlags to control the behavior
// of the read request.
fixed32 hints = 12 [(gogoproto.customname) = "Hints", (gogoproto.casttype) = "HintFlags"];
}
message Aggregate {
enum AggregateType {
option (gogoproto.goproto_enum_prefix) = false;
NONE = 0 [(gogoproto.enumvalue_customname) = "AggregateTypeNone"];
SUM = 1 [(gogoproto.enumvalue_customname) = "AggregateTypeSum"];
COUNT = 2 [(gogoproto.enumvalue_customname) = "AggregateTypeCount"];
}
AggregateType type = 1;
// additional arguments?
}
message Tag {
bytes key = 1;
bytes value = 2;
}
// Response message for Storage.Read.
message ReadResponse {
enum FrameType {
option (gogoproto.goproto_enum_prefix) = false;
SERIES = 0 [(gogoproto.enumvalue_customname) = "FrameTypeSeries"];
POINTS = 1 [(gogoproto.enumvalue_customname) = "FrameTypePoints"];
}
enum DataType {
option (gogoproto.goproto_enum_prefix) = false;
FLOAT = 0 [(gogoproto.enumvalue_customname) = "DataTypeFloat"];
INTEGER = 1 [(gogoproto.enumvalue_customname) = "DataTypeInteger"];
UNSIGNED = 2 [(gogoproto.enumvalue_customname) = "DataTypeUnsigned"];
BOOLEAN = 3 [(gogoproto.enumvalue_customname) = "DataTypeBoolean"];
STRING = 4 [(gogoproto.enumvalue_customname) = "DataTypeString"];
}
message Frame {
oneof data {
GroupFrame group = 7;
SeriesFrame series = 1;
FloatPointsFrame float_points = 2 [(gogoproto.customname) = "FloatPoints"];
IntegerPointsFrame integer_points = 3 [(gogoproto.customname) = "IntegerPoints"];
UnsignedPointsFrame unsigned_points = 4 [(gogoproto.customname) = "UnsignedPoints"];
BooleanPointsFrame boolean_points = 5 [(gogoproto.customname) = "BooleanPoints"];
StringPointsFrame string_points = 6 [(gogoproto.customname) = "StringPoints"];
}
}
message GroupFrame {
// TagKeys
repeated bytes tag_keys = 1 [(gogoproto.customname) = "TagKeys"];
// PartitionKeyVals is the values of the partition key for this group, order matching ReadRequest.GroupKeys
repeated bytes partition_key_vals = 2 [(gogoproto.customname) = "PartitionKeyVals"];
}
message SeriesFrame {
repeated Tag tags = 1 [(gogoproto.nullable) = false];
DataType data_type = 2;
}
message FloatPointsFrame {
repeated sfixed64 timestamps = 1;
repeated double values = 2;
}
message IntegerPointsFrame {
repeated sfixed64 timestamps = 1;
repeated int64 values = 2;
}
message UnsignedPointsFrame {
repeated sfixed64 timestamps = 1;
repeated uint64 values = 2;
}
message BooleanPointsFrame {
repeated sfixed64 timestamps = 1;
repeated bool values = 2;
}
message StringPointsFrame {
repeated sfixed64 timestamps = 1;
repeated string values = 2;
}
repeated Frame frames = 1 [(gogoproto.nullable) = false];
}
message CapabilitiesResponse {
map<string, string> caps = 1;
}
message HintsResponse {
}
// Specifies a continuous range of nanosecond timestamps.
message TimestampRange {
// Start defines the inclusive lower bound.
int64 start = 1;
// End defines the inclusive upper bound.
int64 end = 2;
}
//message ExplainRequest {
// ReadRequest read_request = 1 [(gogoproto.customname) = "ReadRequest"];
//}
//
//message ExplainResponse {}
// RetentionPolicy identifies which retention policy to query.
string retention_policy = 2 [(gogoproto.customname) = "RetentionPolicy"];
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,206 @@
syntax = "proto3";
package com.github.influxdata.influxdb.services.storage;
option go_package = "storage";
import "github.com/gogo/protobuf/gogoproto/gogo.proto";
import "google/protobuf/empty.proto";
import "google/protobuf/any.proto";
import "predicate.proto";
option (gogoproto.marshaler_all) = true;
option (gogoproto.sizer_all) = true;
option (gogoproto.unmarshaler_all) = true;
option (gogoproto.goproto_getters_all) = false;
service Storage {
// Read performs a read operation using the given ReadRequest
rpc Read (ReadRequest) returns (stream ReadResponse);
// Capabilities returns a map of keys and values identifying the capabilities supported by the storage engine
rpc Capabilities (google.protobuf.Empty) returns (CapabilitiesResponse);
rpc Hints (google.protobuf.Empty) returns (HintsResponse);
// Explain describes the costs associated with executing a given Read request
// rpc Explain(google.protobuf.Empty) returns (ExplainResponse){}
}
// Request message for Storage.Read.
message ReadRequest {
enum Group {
option (gogoproto.goproto_enum_prefix) = false;
// GroupNone returns all series as a single group.
// The single GroupFrame.TagKeys will be the union of all tag keys.
GROUP_NONE = 0 [(gogoproto.enumvalue_customname) = "GroupNone"];
// GroupAll returns a unique group for each series.
// As an optimization, no GroupFrames will be generated.
GROUP_ALL = 1 [(gogoproto.enumvalue_customname) = "GroupAll"];
// GroupBy returns a group for each unique value of the specified GroupKeys.
GROUP_BY = 2 [(gogoproto.enumvalue_customname) = "GroupBy"];
// GroupExcept in not implemented.
GROUP_EXCEPT = 3 [(gogoproto.enumvalue_customname) = "GroupExcept"];
}
enum HintFlags {
option (gogoproto.goproto_enum_prefix) = false;
HINT_NONE = 0x00 [(gogoproto.enumvalue_customname) = "HintNone"];
HINT_NO_POINTS = 0x01 [(gogoproto.enumvalue_customname) = "HintNoPoints"];
HINT_NO_SERIES = 0x02 [(gogoproto.enumvalue_customname) = "HintNoSeries"];
// HintSchemaAllTime performs schema queries without using time ranges
HINT_SCHEMA_ALL_TIME = 0x04 [(gogoproto.enumvalue_customname) = "HintSchemaAllTime"];
}
google.protobuf.Any read_source = 13 [(gogoproto.customname) = "ReadSource"];
TimestampRange timestamp_range = 2 [(gogoproto.customname) = "TimestampRange", (gogoproto.nullable) = false];
// Descending indicates whether points should be returned in descending order.
bool descending = 3;
// GroupKeys specifies a list of tag keys used to order the data. It is dependent on the Group property to determine
// its behavior.
repeated string group_keys = 4 [(gogoproto.customname) = "GroupKeys"];
//
Group group = 11;
// Aggregate specifies an optional aggregate to apply to the data.
// TODO(sgc): switch to slice for multiple aggregates in a single request
Aggregate aggregate = 9;
Predicate predicate = 5;
// SeriesLimit determines the maximum number of series to be returned for the request. Specify 0 for no limit.
int64 series_limit = 6 [(gogoproto.customname) = "SeriesLimit"];
// SeriesOffset determines how many series to skip before processing the request.
int64 series_offset = 7 [(gogoproto.customname) = "SeriesOffset"];
// PointsLimit determines the maximum number of values per series to be returned for the request.
// Specify 0 for no limit. -1 to return series frames only.
int64 points_limit = 8 [(gogoproto.customname) = "PointsLimit"];
// Trace contains opaque data if a trace is active.
map<string, string> trace = 10 [(gogoproto.customname) = "Trace"];
// Hints is a bitwise OR of HintFlags to control the behavior
// of the read request.
fixed32 hints = 12 [(gogoproto.customname) = "Hints", (gogoproto.casttype) = "HintFlags"];
}
message Aggregate {
enum AggregateType {
option (gogoproto.goproto_enum_prefix) = false;
NONE = 0 [(gogoproto.enumvalue_customname) = "AggregateTypeNone"];
SUM = 1 [(gogoproto.enumvalue_customname) = "AggregateTypeSum"];
COUNT = 2 [(gogoproto.enumvalue_customname) = "AggregateTypeCount"];
}
AggregateType type = 1;
// additional arguments?
}
message Tag {
bytes key = 1;
bytes value = 2;
}
// Response message for Storage.Read.
message ReadResponse {
enum FrameType {
option (gogoproto.goproto_enum_prefix) = false;
SERIES = 0 [(gogoproto.enumvalue_customname) = "FrameTypeSeries"];
POINTS = 1 [(gogoproto.enumvalue_customname) = "FrameTypePoints"];
}
enum DataType {
option (gogoproto.goproto_enum_prefix) = false;
FLOAT = 0 [(gogoproto.enumvalue_customname) = "DataTypeFloat"];
INTEGER = 1 [(gogoproto.enumvalue_customname) = "DataTypeInteger"];
UNSIGNED = 2 [(gogoproto.enumvalue_customname) = "DataTypeUnsigned"];
BOOLEAN = 3 [(gogoproto.enumvalue_customname) = "DataTypeBoolean"];
STRING = 4 [(gogoproto.enumvalue_customname) = "DataTypeString"];
}
message Frame {
oneof data {
GroupFrame group = 7;
SeriesFrame series = 1;
FloatPointsFrame float_points = 2 [(gogoproto.customname) = "FloatPoints"];
IntegerPointsFrame integer_points = 3 [(gogoproto.customname) = "IntegerPoints"];
UnsignedPointsFrame unsigned_points = 4 [(gogoproto.customname) = "UnsignedPoints"];
BooleanPointsFrame boolean_points = 5 [(gogoproto.customname) = "BooleanPoints"];
StringPointsFrame string_points = 6 [(gogoproto.customname) = "StringPoints"];
}
}
message GroupFrame {
// TagKeys
repeated bytes tag_keys = 1 [(gogoproto.customname) = "TagKeys"];
// PartitionKeyVals is the values of the partition key for this group, order matching ReadRequest.GroupKeys
repeated bytes partition_key_vals = 2 [(gogoproto.customname) = "PartitionKeyVals"];
}
message SeriesFrame {
repeated Tag tags = 1 [(gogoproto.nullable) = false];
DataType data_type = 2;
}
message FloatPointsFrame {
repeated sfixed64 timestamps = 1;
repeated double values = 2;
}
message IntegerPointsFrame {
repeated sfixed64 timestamps = 1;
repeated int64 values = 2;
}
message UnsignedPointsFrame {
repeated sfixed64 timestamps = 1;
repeated uint64 values = 2;
}
message BooleanPointsFrame {
repeated sfixed64 timestamps = 1;
repeated bool values = 2;
}
message StringPointsFrame {
repeated sfixed64 timestamps = 1;
repeated string values = 2;
}
repeated Frame frames = 1 [(gogoproto.nullable) = false];
}
message CapabilitiesResponse {
map<string, string> caps = 1;
}
message HintsResponse {
}
// Specifies a continuous range of nanosecond timestamps.
message TimestampRange {
// Start defines the inclusive lower bound.
int64 start = 1;
// End defines the inclusive upper bound.
int64 end = 2;
}
//message ExplainRequest {
// ReadRequest read_request = 1 [(gogoproto.customname) = "ReadRequest"];
//}
//
//message ExplainResponse {}

View File

@ -4,31 +4,74 @@ import (
"context"
"errors"
"sort"
"strings"
"time"
"github.com/gogo/protobuf/types"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/services/meta"
"github.com/influxdata/influxdb/tsdb"
"go.uber.org/zap"
)
type Store struct {
type ResultSet interface {
Close()
Next() bool
Cursor() tsdb.Cursor
Tags() models.Tags
}
type GroupResultSet interface {
Next() GroupCursor
Close()
}
type GroupCursor interface {
Tags() models.Tags
Keys() [][]byte
PartitionKeyVals() [][]byte
Next() bool
Cursor() tsdb.Cursor
Close()
}
type Store interface {
Read(ctx context.Context, req *ReadRequest) (ResultSet, error)
GroupRead(ctx context.Context, req *ReadRequest) (GroupResultSet, error)
WithLogger(log *zap.Logger)
}
func getReadSource(req *ReadRequest) (*ReadSource, error) {
if req.ReadSource == nil {
return nil, ErrMissingReadSource
}
var source ReadSource
if err := types.UnmarshalAny(req.ReadSource, &source); err != nil {
return nil, err
}
return &source, nil
}
type localStore struct {
TSDBStore *tsdb.Store
MetaClient StorageMetaClient
MetaClient MetaClient
Logger *zap.Logger
}
func NewStore() *Store {
return &Store{Logger: zap.NewNop()}
func NewStore(store *tsdb.Store, metaClient MetaClient) Store {
return &localStore{
TSDBStore: store,
MetaClient: metaClient,
Logger: zap.NewNop(),
}
}
// WithLogger sets the logger for the service.
func (s *Store) WithLogger(log *zap.Logger) {
func (s *localStore) WithLogger(log *zap.Logger) {
s.Logger = log.With(zap.String("service", "store"))
}
func (s *Store) findShardIDs(database, rp string, desc bool, start, end int64) ([]uint64, error) {
func (s *localStore) findShardIDs(database, rp string, desc bool, start, end int64) ([]uint64, error) {
groups, err := s.MetaClient.ShardGroupsByTimeRange(database, rp, time.Unix(0, start), time.Unix(0, end))
if err != nil {
return nil, err
@ -53,12 +96,7 @@ func (s *Store) findShardIDs(database, rp string, desc bool, start, end int64) (
return shardIDs, nil
}
func (s *Store) validateArgs(database string, start, end int64) (string, string, int64, int64, error) {
rp := ""
if p := strings.IndexByte(database, '/'); p > -1 {
database, rp = database[:p], database[p+1:]
}
func (s *localStore) validateArgs(database, rp string, start, end int64) (string, string, int64, int64, error) {
di := s.MetaClient.Database(database)
if di == nil {
return "", "", 0, 0, errors.New("no database")
@ -82,19 +120,17 @@ func (s *Store) validateArgs(database string, start, end int64) (string, string,
return database, rp, start, end, nil
}
type Results interface {
Close()
Next() bool
Cursor() tsdb.Cursor
Tags() models.Tags
}
func (s *Store) Read(ctx context.Context, req *ReadRequest) (Results, error) {
func (s *localStore) Read(ctx context.Context, req *ReadRequest) (ResultSet, error) {
if len(req.GroupKeys) > 0 {
panic("Read: len(Grouping) > 0")
}
database, rp, start, end, err := s.validateArgs(req.Database, req.TimestampRange.Start, req.TimestampRange.End)
source, err := getReadSource(req)
if err != nil {
return nil, err
}
database, rp, start, end, err := s.validateArgs(source.Database, source.RetentionPolicy, req.TimestampRange.Start, req.TimestampRange.End)
if err != nil {
return nil, err
}
@ -107,7 +143,7 @@ func (s *Store) Read(ctx context.Context, req *ReadRequest) (Results, error) {
return (*resultSet)(nil), nil
}
var cur seriesCursor
var cur SeriesCursor
if ic, err := newIndexSeriesCursor(ctx, req.Predicate, s.TSDBStore.Shards(shardIDs)); err != nil {
return nil, err
} else if ic == nil {
@ -117,31 +153,26 @@ func (s *Store) Read(ctx context.Context, req *ReadRequest) (Results, error) {
}
if req.SeriesLimit > 0 || req.SeriesOffset > 0 {
cur = newLimitSeriesCursor(ctx, cur, req.SeriesLimit, req.SeriesOffset)
cur = NewLimitSeriesCursor(ctx, cur, req.SeriesLimit, req.SeriesOffset)
}
rr := readRequest{
ctx: ctx,
start: start,
end: end,
asc: !req.Descending,
limit: req.PointsLimit,
aggregate: req.Aggregate,
}
req.TimestampRange.Start = start
req.TimestampRange.End = end
return &resultSet{
req: rr,
cur: cur,
mb: newMultiShardArrayCursors(ctx, &rr),
}, nil
return NewResultSet(ctx, req, cur), nil
}
func (s *Store) GroupRead(ctx context.Context, req *ReadRequest) (*groupResultSet, error) {
func (s *localStore) GroupRead(ctx context.Context, req *ReadRequest) (GroupResultSet, error) {
if req.SeriesLimit > 0 || req.SeriesOffset > 0 {
return nil, errors.New("GroupRead: SeriesLimit and SeriesOffset not supported when Grouping")
}
database, rp, start, end, err := s.validateArgs(req.Database, req.TimestampRange.Start, req.TimestampRange.End)
source, err := getReadSource(req)
if err != nil {
return nil, err
}
database, rp, start, end, err := s.validateArgs(source.Database, source.RetentionPolicy, req.TimestampRange.Start, req.TimestampRange.End)
if err != nil {
return nil, err
}
@ -159,7 +190,7 @@ func (s *Store) GroupRead(ctx context.Context, req *ReadRequest) (*groupResultSe
req.TimestampRange.Start = start
req.TimestampRange.End = end
newCursor := func() (seriesCursor, error) {
newCursor := func() (SeriesCursor, error) {
cur, err := newIndexSeriesCursor(ctx, req.Predicate, shards)
if cur == nil || err != nil {
return nil, err
@ -167,5 +198,10 @@ func (s *Store) GroupRead(ctx context.Context, req *ReadRequest) (*groupResultSe
return cur, nil
}
return newGroupResultSet(ctx, req, newCursor), nil
rs := NewGroupResultSet(ctx, req, newCursor)
if rs == nil {
return nil, nil
}
return rs, nil
}