diff --git a/storage/reads/array_cursor.gen.go b/storage/reads/array_cursor.gen.go index d798db93dd..32f652f022 100644 --- a/storage/reads/array_cursor.gen.go +++ b/storage/reads/array_cursor.gen.go @@ -85,13 +85,13 @@ LOOP: return c.res } -type floatMultiShardArrayCursor struct { +type floatArrayCursor struct { cursors.FloatArrayCursor cursorContext filter *floatArrayFilterCursor } -func (c *floatMultiShardArrayCursor) reset(cur cursors.FloatArrayCursor, itrs cursors.CursorIterators, cond expression) { +func (c *floatArrayCursor) reset(cur cursors.FloatArrayCursor, cursorIterator cursors.CursorIterator, cond expression) { if cond != nil { if c.filter == nil { c.filter = newFloatFilterArrayCursor(cond) @@ -101,18 +101,17 @@ func (c *floatMultiShardArrayCursor) reset(cur cursors.FloatArrayCursor, itrs cu } c.FloatArrayCursor = cur - c.itrs = itrs + c.cursorIterator = cursorIterator c.err = nil - c.count = 0 } -func (c *floatMultiShardArrayCursor) Err() error { return c.err } +func (c *floatArrayCursor) Err() error { return c.err } -func (c *floatMultiShardArrayCursor) Stats() cursors.CursorStats { +func (c *floatArrayCursor) Stats() cursors.CursorStats { return c.FloatArrayCursor.Stats() } -func (c *floatMultiShardArrayCursor) Next() *cursors.FloatArray { +func (c *floatArrayCursor) Next() *cursors.FloatArray { for { a := c.FloatArrayCursor.Next() if a.Len() == 0 { @@ -120,31 +119,19 @@ func (c *floatMultiShardArrayCursor) Next() *cursors.FloatArray { continue } } - c.count += int64(a.Len()) - if c.count > c.limit { - diff := c.count - c.limit - c.count -= diff - rem := int64(a.Len()) - diff - a.Timestamps = a.Timestamps[:rem] - a.Values = a.Values[:rem] - } return a } } -func (c *floatMultiShardArrayCursor) nextArrayCursor() bool { - if len(c.itrs) == 0 { +func (c *floatArrayCursor) nextArrayCursor() bool { + if c.cursorIterator == nil { return false } c.FloatArrayCursor.Close() - var itr cursors.CursorIterator - var cur cursors.Cursor - for cur == nil && len(c.itrs) > 0 { - itr, c.itrs = c.itrs[0], c.itrs[1:] - cur, _ = itr.Next(c.ctx, c.req) - } + cur, _ := c.cursorIterator.Next(c.ctx, c.req) + c.cursorIterator = nil var ok bool if cur != nil { @@ -153,7 +140,7 @@ func (c *floatMultiShardArrayCursor) nextArrayCursor() bool { if !ok { cur.Close() next = FloatEmptyArrayCursor - c.itrs = nil + c.cursorIterator = nil c.err = errors.New("expected float cursor") } else { if c.filter != nil { @@ -314,13 +301,13 @@ LOOP: return c.res } -type integerMultiShardArrayCursor struct { +type integerArrayCursor struct { cursors.IntegerArrayCursor cursorContext filter *integerArrayFilterCursor } -func (c *integerMultiShardArrayCursor) reset(cur cursors.IntegerArrayCursor, itrs cursors.CursorIterators, cond expression) { +func (c *integerArrayCursor) reset(cur cursors.IntegerArrayCursor, cursorIterator cursors.CursorIterator, cond expression) { if cond != nil { if c.filter == nil { c.filter = newIntegerFilterArrayCursor(cond) @@ -330,18 +317,17 @@ func (c *integerMultiShardArrayCursor) reset(cur cursors.IntegerArrayCursor, itr } c.IntegerArrayCursor = cur - c.itrs = itrs + c.cursorIterator = cursorIterator c.err = nil - c.count = 0 } -func (c *integerMultiShardArrayCursor) Err() error { return c.err } +func (c *integerArrayCursor) Err() error { return c.err } -func (c *integerMultiShardArrayCursor) Stats() cursors.CursorStats { +func (c *integerArrayCursor) Stats() cursors.CursorStats { return c.IntegerArrayCursor.Stats() } -func (c *integerMultiShardArrayCursor) Next() *cursors.IntegerArray { +func (c *integerArrayCursor) Next() *cursors.IntegerArray { for { a := c.IntegerArrayCursor.Next() if a.Len() == 0 { @@ -349,31 +335,19 @@ func (c *integerMultiShardArrayCursor) Next() *cursors.IntegerArray { continue } } - c.count += int64(a.Len()) - if c.count > c.limit { - diff := c.count - c.limit - c.count -= diff - rem := int64(a.Len()) - diff - a.Timestamps = a.Timestamps[:rem] - a.Values = a.Values[:rem] - } return a } } -func (c *integerMultiShardArrayCursor) nextArrayCursor() bool { - if len(c.itrs) == 0 { +func (c *integerArrayCursor) nextArrayCursor() bool { + if c.cursorIterator == nil { return false } c.IntegerArrayCursor.Close() - var itr cursors.CursorIterator - var cur cursors.Cursor - for cur == nil && len(c.itrs) > 0 { - itr, c.itrs = c.itrs[0], c.itrs[1:] - cur, _ = itr.Next(c.ctx, c.req) - } + cur, _ := c.cursorIterator.Next(c.ctx, c.req) + c.cursorIterator = nil var ok bool if cur != nil { @@ -382,7 +356,7 @@ func (c *integerMultiShardArrayCursor) nextArrayCursor() bool { if !ok { cur.Close() next = IntegerEmptyArrayCursor - c.itrs = nil + c.cursorIterator = nil c.err = errors.New("expected integer cursor") } else { if c.filter != nil { @@ -543,13 +517,13 @@ LOOP: return c.res } -type unsignedMultiShardArrayCursor struct { +type unsignedArrayCursor struct { cursors.UnsignedArrayCursor cursorContext filter *unsignedArrayFilterCursor } -func (c *unsignedMultiShardArrayCursor) reset(cur cursors.UnsignedArrayCursor, itrs cursors.CursorIterators, cond expression) { +func (c *unsignedArrayCursor) reset(cur cursors.UnsignedArrayCursor, cursorIterator cursors.CursorIterator, cond expression) { if cond != nil { if c.filter == nil { c.filter = newUnsignedFilterArrayCursor(cond) @@ -559,18 +533,17 @@ func (c *unsignedMultiShardArrayCursor) reset(cur cursors.UnsignedArrayCursor, i } c.UnsignedArrayCursor = cur - c.itrs = itrs + c.cursorIterator = cursorIterator c.err = nil - c.count = 0 } -func (c *unsignedMultiShardArrayCursor) Err() error { return c.err } +func (c *unsignedArrayCursor) Err() error { return c.err } -func (c *unsignedMultiShardArrayCursor) Stats() cursors.CursorStats { +func (c *unsignedArrayCursor) Stats() cursors.CursorStats { return c.UnsignedArrayCursor.Stats() } -func (c *unsignedMultiShardArrayCursor) Next() *cursors.UnsignedArray { +func (c *unsignedArrayCursor) Next() *cursors.UnsignedArray { for { a := c.UnsignedArrayCursor.Next() if a.Len() == 0 { @@ -578,31 +551,19 @@ func (c *unsignedMultiShardArrayCursor) Next() *cursors.UnsignedArray { continue } } - c.count += int64(a.Len()) - if c.count > c.limit { - diff := c.count - c.limit - c.count -= diff - rem := int64(a.Len()) - diff - a.Timestamps = a.Timestamps[:rem] - a.Values = a.Values[:rem] - } return a } } -func (c *unsignedMultiShardArrayCursor) nextArrayCursor() bool { - if len(c.itrs) == 0 { +func (c *unsignedArrayCursor) nextArrayCursor() bool { + if c.cursorIterator == nil { return false } c.UnsignedArrayCursor.Close() - var itr cursors.CursorIterator - var cur cursors.Cursor - for cur == nil && len(c.itrs) > 0 { - itr, c.itrs = c.itrs[0], c.itrs[1:] - cur, _ = itr.Next(c.ctx, c.req) - } + cur, _ := c.cursorIterator.Next(c.ctx, c.req) + c.cursorIterator = nil var ok bool if cur != nil { @@ -611,7 +572,7 @@ func (c *unsignedMultiShardArrayCursor) nextArrayCursor() bool { if !ok { cur.Close() next = UnsignedEmptyArrayCursor - c.itrs = nil + c.cursorIterator = nil c.err = errors.New("expected unsigned cursor") } else { if c.filter != nil { @@ -772,13 +733,13 @@ LOOP: return c.res } -type stringMultiShardArrayCursor struct { +type stringArrayCursor struct { cursors.StringArrayCursor cursorContext filter *stringArrayFilterCursor } -func (c *stringMultiShardArrayCursor) reset(cur cursors.StringArrayCursor, itrs cursors.CursorIterators, cond expression) { +func (c *stringArrayCursor) reset(cur cursors.StringArrayCursor, cursorIterator cursors.CursorIterator, cond expression) { if cond != nil { if c.filter == nil { c.filter = newStringFilterArrayCursor(cond) @@ -788,18 +749,17 @@ func (c *stringMultiShardArrayCursor) reset(cur cursors.StringArrayCursor, itrs } c.StringArrayCursor = cur - c.itrs = itrs + c.cursorIterator = cursorIterator c.err = nil - c.count = 0 } -func (c *stringMultiShardArrayCursor) Err() error { return c.err } +func (c *stringArrayCursor) Err() error { return c.err } -func (c *stringMultiShardArrayCursor) Stats() cursors.CursorStats { +func (c *stringArrayCursor) Stats() cursors.CursorStats { return c.StringArrayCursor.Stats() } -func (c *stringMultiShardArrayCursor) Next() *cursors.StringArray { +func (c *stringArrayCursor) Next() *cursors.StringArray { for { a := c.StringArrayCursor.Next() if a.Len() == 0 { @@ -807,31 +767,19 @@ func (c *stringMultiShardArrayCursor) Next() *cursors.StringArray { continue } } - c.count += int64(a.Len()) - if c.count > c.limit { - diff := c.count - c.limit - c.count -= diff - rem := int64(a.Len()) - diff - a.Timestamps = a.Timestamps[:rem] - a.Values = a.Values[:rem] - } return a } } -func (c *stringMultiShardArrayCursor) nextArrayCursor() bool { - if len(c.itrs) == 0 { +func (c *stringArrayCursor) nextArrayCursor() bool { + if c.cursorIterator == nil { return false } c.StringArrayCursor.Close() - var itr cursors.CursorIterator - var cur cursors.Cursor - for cur == nil && len(c.itrs) > 0 { - itr, c.itrs = c.itrs[0], c.itrs[1:] - cur, _ = itr.Next(c.ctx, c.req) - } + cur, _ := c.cursorIterator.Next(c.ctx, c.req) + c.cursorIterator = nil var ok bool if cur != nil { @@ -840,7 +788,7 @@ func (c *stringMultiShardArrayCursor) nextArrayCursor() bool { if !ok { cur.Close() next = StringEmptyArrayCursor - c.itrs = nil + c.cursorIterator = nil c.err = errors.New("expected string cursor") } else { if c.filter != nil { @@ -961,13 +909,13 @@ LOOP: return c.res } -type booleanMultiShardArrayCursor struct { +type booleanArrayCursor struct { cursors.BooleanArrayCursor cursorContext filter *booleanArrayFilterCursor } -func (c *booleanMultiShardArrayCursor) reset(cur cursors.BooleanArrayCursor, itrs cursors.CursorIterators, cond expression) { +func (c *booleanArrayCursor) reset(cur cursors.BooleanArrayCursor, cursorIterator cursors.CursorIterator, cond expression) { if cond != nil { if c.filter == nil { c.filter = newBooleanFilterArrayCursor(cond) @@ -977,18 +925,17 @@ func (c *booleanMultiShardArrayCursor) reset(cur cursors.BooleanArrayCursor, itr } c.BooleanArrayCursor = cur - c.itrs = itrs + c.cursorIterator = cursorIterator c.err = nil - c.count = 0 } -func (c *booleanMultiShardArrayCursor) Err() error { return c.err } +func (c *booleanArrayCursor) Err() error { return c.err } -func (c *booleanMultiShardArrayCursor) Stats() cursors.CursorStats { +func (c *booleanArrayCursor) Stats() cursors.CursorStats { return c.BooleanArrayCursor.Stats() } -func (c *booleanMultiShardArrayCursor) Next() *cursors.BooleanArray { +func (c *booleanArrayCursor) Next() *cursors.BooleanArray { for { a := c.BooleanArrayCursor.Next() if a.Len() == 0 { @@ -996,31 +943,19 @@ func (c *booleanMultiShardArrayCursor) Next() *cursors.BooleanArray { continue } } - c.count += int64(a.Len()) - if c.count > c.limit { - diff := c.count - c.limit - c.count -= diff - rem := int64(a.Len()) - diff - a.Timestamps = a.Timestamps[:rem] - a.Values = a.Values[:rem] - } return a } } -func (c *booleanMultiShardArrayCursor) nextArrayCursor() bool { - if len(c.itrs) == 0 { +func (c *booleanArrayCursor) nextArrayCursor() bool { + if c.cursorIterator == nil { return false } c.BooleanArrayCursor.Close() - var itr cursors.CursorIterator - var cur cursors.Cursor - for cur == nil && len(c.itrs) > 0 { - itr, c.itrs = c.itrs[0], c.itrs[1:] - cur, _ = itr.Next(c.ctx, c.req) - } + cur, _ := c.cursorIterator.Next(c.ctx, c.req) + c.cursorIterator = nil var ok bool if cur != nil { @@ -1029,7 +964,7 @@ func (c *booleanMultiShardArrayCursor) nextArrayCursor() bool { if !ok { cur.Close() next = BooleanEmptyArrayCursor - c.itrs = nil + c.cursorIterator = nil c.err = errors.New("expected boolean cursor") } else { if c.filter != nil { diff --git a/storage/reads/array_cursor.gen.go.tmpl b/storage/reads/array_cursor.gen.go.tmpl index 8bf15b0319..dbb6ca5e7a 100644 --- a/storage/reads/array_cursor.gen.go.tmpl +++ b/storage/reads/array_cursor.gen.go.tmpl @@ -84,13 +84,13 @@ LOOP: return c.res } -type {{.name}}MultiShardArrayCursor struct { +type {{.name}}ArrayCursor struct { cursors.{{.Name}}ArrayCursor cursorContext filter *{{$type}} } -func (c *{{.name}}MultiShardArrayCursor) reset(cur cursors.{{.Name}}ArrayCursor, itrs cursors.CursorIterators, cond expression) { +func (c *{{.name}}ArrayCursor) reset(cur cursors.{{.Name}}ArrayCursor, cursorIterator cursors.CursorIterator, cond expression) { if cond != nil { if c.filter == nil { c.filter = new{{.Name}}FilterArrayCursor(cond) @@ -100,19 +100,18 @@ func (c *{{.name}}MultiShardArrayCursor) reset(cur cursors.{{.Name}}ArrayCursor, } c.{{.Name}}ArrayCursor = cur - c.itrs = itrs + c.cursorIterator = cursorIterator c.err = nil - c.count = 0 } -func (c *{{.name}}MultiShardArrayCursor) Err() error { return c.err } +func (c *{{.name}}ArrayCursor) Err() error { return c.err } -func (c *{{.name}}MultiShardArrayCursor) Stats() cursors.CursorStats { +func (c *{{.name}}ArrayCursor) Stats() cursors.CursorStats { return c.{{.Name}}ArrayCursor.Stats() } -func (c *{{.name}}MultiShardArrayCursor) Next() {{$arrayType}} { +func (c *{{.name}}ArrayCursor) Next() {{$arrayType}} { for { a := c.{{.Name}}ArrayCursor.Next() if a.Len() == 0 { @@ -120,31 +119,19 @@ func (c *{{.name}}MultiShardArrayCursor) Next() {{$arrayType}} { continue } } - c.count += int64(a.Len()) - if c.count > c.limit { - diff := c.count - c.limit - c.count -= diff - rem := int64(a.Len()) - diff - a.Timestamps = a.Timestamps[:rem] - a.Values = a.Values[:rem] - } return a } } -func (c *{{.name}}MultiShardArrayCursor) nextArrayCursor() bool { - if len(c.itrs) == 0 { +func (c *{{.name}}ArrayCursor) nextArrayCursor() bool { + if c.cursorIterator == nil { return false } c.{{.Name}}ArrayCursor.Close() - var itr cursors.CursorIterator - var cur cursors.Cursor - for cur == nil && len(c.itrs) > 0 { - itr, c.itrs = c.itrs[0], c.itrs[1:] - cur, _ = itr.Next(c.ctx, c.req) - } + cur, _ := c.cursorIterator.Next(c.ctx, c.req) + c.cursorIterator = nil var ok bool if cur != nil { @@ -153,7 +140,7 @@ func (c *{{.name}}MultiShardArrayCursor) nextArrayCursor() bool { if !ok { cur.Close() next = {{.Name}}EmptyArrayCursor - c.itrs = nil + c.cursorIterator = nil c.err = errors.New("expected {{.name}} cursor") } else { if c.filter != nil { diff --git a/storage/reads/array_cursor.go b/storage/reads/array_cursor.go index 959f196321..869f33cd65 100644 --- a/storage/reads/array_cursor.go +++ b/storage/reads/array_cursor.go @@ -64,36 +64,28 @@ func newCountArrayCursor(cur cursors.Cursor) cursors.Cursor { } type cursorContext struct { - ctx context.Context - req *cursors.CursorRequest - itrs cursors.CursorIterators - limit int64 - count int64 - err error + ctx context.Context + req *cursors.CursorRequest + cursorIterator cursors.CursorIterator + err error } -type multiShardArrayCursors struct { - ctx context.Context - limit int64 - req cursors.CursorRequest +type arrayCursors struct { + ctx context.Context + req cursors.CursorRequest cursors struct { - i integerMultiShardArrayCursor - f floatMultiShardArrayCursor - u unsignedMultiShardArrayCursor - b booleanMultiShardArrayCursor - s stringMultiShardArrayCursor + i integerArrayCursor + f floatArrayCursor + u unsignedArrayCursor + b booleanArrayCursor + s stringArrayCursor } } -func newMultiShardArrayCursors(ctx context.Context, start, end int64, asc bool, limit int64) *multiShardArrayCursors { - if limit < 0 { - limit = 1 - } - - m := &multiShardArrayCursors{ - ctx: ctx, - limit: limit, +func newArrayCursors(ctx context.Context, start, end int64, asc bool) *arrayCursors { + m := &arrayCursors{ + ctx: ctx, req: cursors.CursorRequest{ Ascending: asc, StartTime: start, @@ -102,9 +94,8 @@ func newMultiShardArrayCursors(ctx context.Context, start, end int64, asc bool, } cc := cursorContext{ - ctx: ctx, - limit: limit, - req: &m.req, + ctx: ctx, + req: &m.req, } m.cursors.i.cursorContext = cc @@ -116,48 +107,42 @@ func newMultiShardArrayCursors(ctx context.Context, start, end int64, asc bool, return m } -func (m *multiShardArrayCursors) createCursor(row SeriesRow) cursors.Cursor { - m.req.Name = row.Name - m.req.Tags = row.SeriesTags - m.req.Field = row.Field +func (m *arrayCursors) createCursor(seriesRow SeriesRow) cursors.Cursor { + m.req.Name = seriesRow.Name + m.req.Tags = seriesRow.SeriesTags + m.req.Field = seriesRow.Field var cond expression - if row.ValueCond != nil { - cond = &astExpr{row.ValueCond} + if seriesRow.ValueCond != nil { + cond = &astExpr{seriesRow.ValueCond} } - var shard cursors.CursorIterator - var cur cursors.Cursor - for cur == nil && len(row.Query) > 0 { - shard, row.Query = row.Query[0], row.Query[1:] - cur, _ = shard.Next(m.ctx, &m.req) + if seriesRow.Query == nil { + return nil } - + cur, _ := seriesRow.Query.Next(m.ctx, &m.req) + seriesRow.Query = nil if cur == nil { return nil } switch c := cur.(type) { case cursors.IntegerArrayCursor: - m.cursors.i.reset(c, row.Query, cond) + m.cursors.i.reset(c, seriesRow.Query, cond) return &m.cursors.i case cursors.FloatArrayCursor: - m.cursors.f.reset(c, row.Query, cond) + m.cursors.f.reset(c, seriesRow.Query, cond) return &m.cursors.f case cursors.UnsignedArrayCursor: - m.cursors.u.reset(c, row.Query, cond) + m.cursors.u.reset(c, seriesRow.Query, cond) return &m.cursors.u case cursors.StringArrayCursor: - m.cursors.s.reset(c, row.Query, cond) + m.cursors.s.reset(c, seriesRow.Query, cond) return &m.cursors.s case cursors.BooleanArrayCursor: - m.cursors.b.reset(c, row.Query, cond) + m.cursors.b.reset(c, seriesRow.Query, cond) return &m.cursors.b default: panic(fmt.Sprintf("unreachable: %T", cur)) } } - -func (m *multiShardArrayCursors) newAggregateCursor(ctx context.Context, agg *datatypes.Aggregate, cursor cursors.Cursor) cursors.Cursor { - return newAggregateArrayCursor(ctx, agg, cursor) -} diff --git a/storage/reads/group_resultset.go b/storage/reads/group_resultset.go index 24766cff67..06366996f2 100644 --- a/storage/reads/group_resultset.go +++ b/storage/reads/group_resultset.go @@ -4,7 +4,6 @@ import ( "bytes" "context" "fmt" - "math" "sort" "github.com/influxdata/influxdb/kit/tracing" @@ -14,21 +13,19 @@ import ( ) type groupResultSet struct { - ctx context.Context - req *datatypes.ReadGroupRequest - agg *datatypes.Aggregate - mb multiShardCursors + ctx context.Context + req *datatypes.ReadGroupRequest + arrayCursors *arrayCursors - i int - rows []*SeriesRow - keys [][]byte - nilSort []byte - rgc groupByCursor - km KeyMerger + i int + seriesRows []*SeriesRow + keys [][]byte + nilSort []byte + groupByCursor groupByCursor + km KeyMerger - newCursorFn func() (SeriesCursor, error) - nextGroupFn func(c *groupResultSet) GroupCursor - sortFn func(c *groupResultSet) (int, error) + newSeriesCursorFn func() (SeriesCursor, error) + nextGroupFn func(c *groupResultSet) GroupCursor eof bool } @@ -43,21 +40,24 @@ func GroupOptionNilSortLo() GroupOption { } } -func NewGroupResultSet(ctx context.Context, req *datatypes.ReadGroupRequest, newCursorFn func() (SeriesCursor, error), opts ...GroupOption) GroupResultSet { +func NewGroupResultSet(ctx context.Context, req *datatypes.ReadGroupRequest, newSeriesCursorFn func() (SeriesCursor, error), opts ...GroupOption) GroupResultSet { + span, _ := tracing.StartSpanFromContext(ctx) + defer span.Finish() + span.LogKV("group_type", req.Group.String()) + g := &groupResultSet{ - ctx: ctx, - req: req, - agg: req.Aggregate, - keys: make([][]byte, len(req.GroupKeys)), - nilSort: NilSortHi, - newCursorFn: newCursorFn, + ctx: ctx, + req: req, + keys: make([][]byte, len(req.GroupKeys)), + nilSort: NilSortHi, + newSeriesCursorFn: newSeriesCursorFn, } for _, o := range opts { o(g) } - g.mb = newMultiShardArrayCursors(ctx, req.Range.Start, req.Range.End, true, math.MaxInt64) + g.arrayCursors = newArrayCursors(ctx, req.Range.Start, req.Range.End, true) for i, k := range req.GroupKeys { g.keys[i] = []byte(k) @@ -65,28 +65,33 @@ func NewGroupResultSet(ctx context.Context, req *datatypes.ReadGroupRequest, new switch req.Group { case datatypes.GroupBy: - g.sortFn = groupBySort g.nextGroupFn = groupByNextGroup - g.rgc = groupByCursor{ - ctx: ctx, - mb: g.mb, - agg: req.Aggregate, - vals: make([][]byte, len(req.GroupKeys)), + g.groupByCursor = groupByCursor{ + ctx: ctx, + arrayCursors: g.arrayCursors, + agg: req.Aggregate, + vals: make([][]byte, len(req.GroupKeys)), + } + + if n, err := g.groupBySort(); n == 0 || err != nil { + return nil + } else { + span.LogKV("rows", n) } case datatypes.GroupNone: - g.sortFn = groupNoneSort g.nextGroupFn = groupNoneNextGroup + if n, err := g.groupNoneSort(); n == 0 || err != nil { + return nil + } else { + span.LogKV("rows", n) + } + default: panic("not implemented") } - n, err := g.sort() - if n == 0 || err != nil { - return nil - } - return g } @@ -111,25 +116,11 @@ func (g *groupResultSet) Next() GroupCursor { return g.nextGroupFn(g) } -func (g *groupResultSet) sort() (int, error) { - span, _ := tracing.StartSpanFromContext(g.ctx) - defer span.Finish() - span.LogKV("group_type", g.req.Group.String()) - - n, err := g.sortFn(g) - - if err != nil { - span.LogKV("rows", n) - } - - return n, err -} - // seriesHasPoints reads the first block of TSM data to verify the series has points for // the time range of the query. func (g *groupResultSet) seriesHasPoints(row *SeriesRow) bool { // TODO(sgc): this is expensive. Storage engine must provide efficient time range queries of series keys. - cur := g.mb.createCursor(*row) + cur := g.arrayCursors.createCursor(*row) var ts []int64 switch c := cur.(type) { case cursors.IntegerArrayCursor: @@ -157,90 +148,90 @@ func (g *groupResultSet) seriesHasPoints(row *SeriesRow) bool { } func groupNoneNextGroup(g *groupResultSet) GroupCursor { - cur, err := g.newCursorFn() + seriesCursor, err := g.newSeriesCursorFn() if err != nil { // TODO(sgc): store error return nil - } else if cur == nil { + } else if seriesCursor == nil { return nil } g.eof = true return &groupNoneCursor{ - ctx: g.ctx, - mb: g.mb, - agg: g.agg, - cur: cur, - keys: g.km.Get(), + ctx: g.ctx, + arrayCursors: g.arrayCursors, + agg: g.req.Aggregate, + cur: seriesCursor, + keys: g.km.Get(), } } -func groupNoneSort(g *groupResultSet) (int, error) { - cur, err := g.newCursorFn() +func (g *groupResultSet) groupNoneSort() (int, error) { + seriesCursor, err := g.newSeriesCursorFn() if err != nil { return 0, err - } else if cur == nil { + } else if seriesCursor == nil { return 0, nil } allTime := g.req.Hints.HintSchemaAllTime() g.km.Clear() n := 0 - row := cur.Next() - for row != nil { - if allTime || g.seriesHasPoints(row) { + seriesRow := seriesCursor.Next() + for seriesRow != nil { + if allTime || g.seriesHasPoints(seriesRow) { n++ - g.km.MergeTagKeys(row.Tags) + g.km.MergeTagKeys(seriesRow.Tags) } - row = cur.Next() + seriesRow = seriesCursor.Next() } - cur.Close() + seriesCursor.Close() return n, nil } func groupByNextGroup(g *groupResultSet) GroupCursor { - row := g.rows[g.i] + row := g.seriesRows[g.i] for i := range g.keys { - g.rgc.vals[i] = row.Tags.Get(g.keys[i]) + g.groupByCursor.vals[i] = row.Tags.Get(g.keys[i]) } g.km.Clear() rowKey := row.SortKey j := g.i - for j < len(g.rows) && bytes.Equal(rowKey, g.rows[j].SortKey) { - g.km.MergeTagKeys(g.rows[j].Tags) + for j < len(g.seriesRows) && bytes.Equal(rowKey, g.seriesRows[j].SortKey) { + g.km.MergeTagKeys(g.seriesRows[j].Tags) j++ } - g.rgc.reset(g.rows[g.i:j]) - g.rgc.keys = g.km.Get() + g.groupByCursor.reset(g.seriesRows[g.i:j]) + g.groupByCursor.keys = g.km.Get() g.i = j - if j == len(g.rows) { + if j == len(g.seriesRows) { g.eof = true } - return &g.rgc + return &g.groupByCursor } -func groupBySort(g *groupResultSet) (int, error) { - cur, err := g.newCursorFn() +func (g *groupResultSet) groupBySort() (int, error) { + seriesCursor, err := g.newSeriesCursorFn() if err != nil { return 0, err - } else if cur == nil { + } else if seriesCursor == nil { return 0, nil } - var rows []*SeriesRow + var seriesRows []*SeriesRow vals := make([][]byte, len(g.keys)) tagsBuf := &tagsBuffer{sz: 4096} allTime := g.req.Hints.HintSchemaAllTime() - row := cur.Next() - for row != nil { - if allTime || g.seriesHasPoints(row) { - nr := *row + seriesRow := seriesCursor.Next() + for seriesRow != nil { + if allTime || g.seriesHasPoints(seriesRow) { + nr := *seriesRow nr.SeriesTags = tagsBuf.copyTags(nr.SeriesTags) nr.Tags = tagsBuf.copyTags(nr.Tags) @@ -259,28 +250,28 @@ func groupBySort(g *groupResultSet) (int, error) { nr.SortKey = append(nr.SortKey, ',') } - rows = append(rows, &nr) + seriesRows = append(seriesRows, &nr) } - row = cur.Next() + seriesRow = seriesCursor.Next() } - sort.Slice(rows, func(i, j int) bool { - return bytes.Compare(rows[i].SortKey, rows[j].SortKey) == -1 + sort.Slice(seriesRows, func(i, j int) bool { + return bytes.Compare(seriesRows[i].SortKey, seriesRows[j].SortKey) == -1 }) - g.rows = rows + g.seriesRows = seriesRows - cur.Close() - return len(rows), nil + seriesCursor.Close() + return len(seriesRows), nil } type groupNoneCursor struct { - ctx context.Context - mb multiShardCursors - agg *datatypes.Aggregate - cur SeriesCursor - row SeriesRow - keys [][]byte + ctx context.Context + arrayCursors *arrayCursors + agg *datatypes.Aggregate + cur SeriesCursor + row SeriesRow + keys [][]byte } func (c *groupNoneCursor) Err() error { return nil } @@ -302,36 +293,36 @@ func (c *groupNoneCursor) Next() bool { } func (c *groupNoneCursor) Cursor() cursors.Cursor { - cur := c.mb.createCursor(c.row) + cur := c.arrayCursors.createCursor(c.row) if c.agg != nil { - cur = c.mb.newAggregateCursor(c.ctx, c.agg, cur) + cur = newAggregateArrayCursor(c.ctx, c.agg, cur) } return cur } type groupByCursor struct { - ctx context.Context - mb multiShardCursors - agg *datatypes.Aggregate - i int - rows []*SeriesRow - keys [][]byte - vals [][]byte + ctx context.Context + arrayCursors *arrayCursors + agg *datatypes.Aggregate + i int + seriesRows []*SeriesRow + keys [][]byte + vals [][]byte } -func (c *groupByCursor) reset(rows []*SeriesRow) { +func (c *groupByCursor) reset(seriesRows []*SeriesRow) { c.i = 0 - c.rows = rows + c.seriesRows = seriesRows } func (c *groupByCursor) Err() error { return nil } func (c *groupByCursor) Keys() [][]byte { return c.keys } func (c *groupByCursor) PartitionKeyVals() [][]byte { return c.vals } -func (c *groupByCursor) Tags() models.Tags { return c.rows[c.i-1].Tags } +func (c *groupByCursor) Tags() models.Tags { return c.seriesRows[c.i-1].Tags } func (c *groupByCursor) Close() {} func (c *groupByCursor) Next() bool { - if c.i < len(c.rows) { + if c.i < len(c.seriesRows) { c.i++ return true } @@ -339,17 +330,17 @@ func (c *groupByCursor) Next() bool { } func (c *groupByCursor) Cursor() cursors.Cursor { - cur := c.mb.createCursor(*c.rows[c.i-1]) + cur := c.arrayCursors.createCursor(*c.seriesRows[c.i-1]) if c.agg != nil { - cur = c.mb.newAggregateCursor(c.ctx, c.agg, cur) + cur = newAggregateArrayCursor(c.ctx, c.agg, cur) } return cur } func (c *groupByCursor) Stats() cursors.CursorStats { var stats cursors.CursorStats - for _, row := range c.rows { - stats.Add(row.Query.Stats()) + for _, seriesRow := range c.seriesRows { + stats.Add(seriesRow.Query.Stats()) } return stats } diff --git a/storage/reads/resultset.go b/storage/reads/resultset.go index a508d8cb7f..eaaaa61f6b 100644 --- a/storage/reads/resultset.go +++ b/storage/reads/resultset.go @@ -2,31 +2,25 @@ package reads import ( "context" - "math" "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/storage/reads/datatypes" "github.com/influxdata/influxdb/tsdb/cursors" ) -type multiShardCursors interface { - createCursor(row SeriesRow) cursors.Cursor - newAggregateCursor(ctx context.Context, agg *datatypes.Aggregate, cursor cursors.Cursor) cursors.Cursor -} - type resultSet struct { - ctx context.Context - agg *datatypes.Aggregate - cur SeriesCursor - row SeriesRow - mb multiShardCursors + ctx context.Context + agg *datatypes.Aggregate + seriesCursor SeriesCursor + seriesRow SeriesRow + arrayCursors *arrayCursors } -func NewFilteredResultSet(ctx context.Context, req *datatypes.ReadFilterRequest, cur SeriesCursor) ResultSet { +func NewFilteredResultSet(ctx context.Context, req *datatypes.ReadFilterRequest, seriesCursor SeriesCursor) ResultSet { return &resultSet{ - ctx: ctx, - cur: cur, - mb: newMultiShardArrayCursors(ctx, req.Range.Start, req.Range.End, true, math.MaxInt64), + ctx: ctx, + seriesCursor: seriesCursor, + arrayCursors: newArrayCursors(ctx, req.Range.Start, req.Range.End, true), } } @@ -37,8 +31,8 @@ func (r *resultSet) Close() { if r == nil { return // Nothing to do. } - r.row.Query = nil - r.cur.Close() + r.seriesRow.Query = nil + r.seriesCursor.Close() } // Next returns true if there are more results available. @@ -47,28 +41,33 @@ func (r *resultSet) Next() bool { return false } - row := r.cur.Next() - if row == nil { + seriesRow := r.seriesCursor.Next() + if seriesRow == nil { return false } - r.row = *row + r.seriesRow = *seriesRow return true } func (r *resultSet) Cursor() cursors.Cursor { - cur := r.mb.createCursor(r.row) + cur := r.arrayCursors.createCursor(r.seriesRow) if r.agg != nil { - cur = r.mb.newAggregateCursor(r.ctx, r.agg, cur) + cur = newAggregateArrayCursor(r.ctx, r.agg, cur) } return cur } func (r *resultSet) Tags() models.Tags { - return r.row.Tags + return r.seriesRow.Tags } // Stats returns the stats for the underlying cursors. // Available after resultset has been scanned. -func (r *resultSet) Stats() cursors.CursorStats { return r.row.Query.Stats() } +func (r *resultSet) Stats() cursors.CursorStats { + if r.seriesRow.Query == nil { + return cursors.CursorStats{} + } + return r.seriesRow.Query.Stats() +} diff --git a/storage/reads/series_cursor.go b/storage/reads/series_cursor.go index f322c42820..f4b7a839c4 100644 --- a/storage/reads/series_cursor.go +++ b/storage/reads/series_cursor.go @@ -26,7 +26,7 @@ type SeriesRow struct { SeriesTags models.Tags // unmodified series tags Tags models.Tags Field string - Query cursors.CursorIterators + Query cursors.CursorIterator ValueCond influxql.Expr } @@ -63,7 +63,7 @@ func NewIndexSeriesCursor(ctx context.Context, orgID, bucketID influxdb.ID, pred Ascending: true, Ordered: true, } - p := &indexSeriesCursor{row: SeriesRow{Query: cursors.CursorIterators{cursorIterator}}} + p := &indexSeriesCursor{row: SeriesRow{Query: cursorIterator}} if root := predicate.GetRoot(); root != nil { if p.cond, err = NodeToExpr(root, nil); err != nil {