refactor(storage): cleanup in storage/reads (#17163)
* refactor(storage): remove cursorContext.limit and .count * refactor(storage): remove one-impl interface * refactor(storage): remove one-line multiShardArrayCursors.newAggregateCursor() * refactor(storage): mostly rename fields and variables * refactor(storage): multiShardArrayCursors has one shard * refactor(storage): drop misleading 'multiShard' from namespull/17168/head
parent
9ab28376d1
commit
9e8da7c313
|
|
@ -85,13 +85,13 @@ LOOP:
|
|||
return c.res
|
||||
}
|
||||
|
||||
type floatMultiShardArrayCursor struct {
|
||||
type floatArrayCursor struct {
|
||||
cursors.FloatArrayCursor
|
||||
cursorContext
|
||||
filter *floatArrayFilterCursor
|
||||
}
|
||||
|
||||
func (c *floatMultiShardArrayCursor) reset(cur cursors.FloatArrayCursor, itrs cursors.CursorIterators, cond expression) {
|
||||
func (c *floatArrayCursor) reset(cur cursors.FloatArrayCursor, cursorIterator cursors.CursorIterator, cond expression) {
|
||||
if cond != nil {
|
||||
if c.filter == nil {
|
||||
c.filter = newFloatFilterArrayCursor(cond)
|
||||
|
|
@ -101,18 +101,17 @@ func (c *floatMultiShardArrayCursor) reset(cur cursors.FloatArrayCursor, itrs cu
|
|||
}
|
||||
|
||||
c.FloatArrayCursor = cur
|
||||
c.itrs = itrs
|
||||
c.cursorIterator = cursorIterator
|
||||
c.err = nil
|
||||
c.count = 0
|
||||
}
|
||||
|
||||
func (c *floatMultiShardArrayCursor) Err() error { return c.err }
|
||||
func (c *floatArrayCursor) Err() error { return c.err }
|
||||
|
||||
func (c *floatMultiShardArrayCursor) Stats() cursors.CursorStats {
|
||||
func (c *floatArrayCursor) Stats() cursors.CursorStats {
|
||||
return c.FloatArrayCursor.Stats()
|
||||
}
|
||||
|
||||
func (c *floatMultiShardArrayCursor) Next() *cursors.FloatArray {
|
||||
func (c *floatArrayCursor) Next() *cursors.FloatArray {
|
||||
for {
|
||||
a := c.FloatArrayCursor.Next()
|
||||
if a.Len() == 0 {
|
||||
|
|
@ -120,31 +119,19 @@ func (c *floatMultiShardArrayCursor) Next() *cursors.FloatArray {
|
|||
continue
|
||||
}
|
||||
}
|
||||
c.count += int64(a.Len())
|
||||
if c.count > c.limit {
|
||||
diff := c.count - c.limit
|
||||
c.count -= diff
|
||||
rem := int64(a.Len()) - diff
|
||||
a.Timestamps = a.Timestamps[:rem]
|
||||
a.Values = a.Values[:rem]
|
||||
}
|
||||
return a
|
||||
}
|
||||
}
|
||||
|
||||
func (c *floatMultiShardArrayCursor) nextArrayCursor() bool {
|
||||
if len(c.itrs) == 0 {
|
||||
func (c *floatArrayCursor) nextArrayCursor() bool {
|
||||
if c.cursorIterator == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
c.FloatArrayCursor.Close()
|
||||
|
||||
var itr cursors.CursorIterator
|
||||
var cur cursors.Cursor
|
||||
for cur == nil && len(c.itrs) > 0 {
|
||||
itr, c.itrs = c.itrs[0], c.itrs[1:]
|
||||
cur, _ = itr.Next(c.ctx, c.req)
|
||||
}
|
||||
cur, _ := c.cursorIterator.Next(c.ctx, c.req)
|
||||
c.cursorIterator = nil
|
||||
|
||||
var ok bool
|
||||
if cur != nil {
|
||||
|
|
@ -153,7 +140,7 @@ func (c *floatMultiShardArrayCursor) nextArrayCursor() bool {
|
|||
if !ok {
|
||||
cur.Close()
|
||||
next = FloatEmptyArrayCursor
|
||||
c.itrs = nil
|
||||
c.cursorIterator = nil
|
||||
c.err = errors.New("expected float cursor")
|
||||
} else {
|
||||
if c.filter != nil {
|
||||
|
|
@ -314,13 +301,13 @@ LOOP:
|
|||
return c.res
|
||||
}
|
||||
|
||||
type integerMultiShardArrayCursor struct {
|
||||
type integerArrayCursor struct {
|
||||
cursors.IntegerArrayCursor
|
||||
cursorContext
|
||||
filter *integerArrayFilterCursor
|
||||
}
|
||||
|
||||
func (c *integerMultiShardArrayCursor) reset(cur cursors.IntegerArrayCursor, itrs cursors.CursorIterators, cond expression) {
|
||||
func (c *integerArrayCursor) reset(cur cursors.IntegerArrayCursor, cursorIterator cursors.CursorIterator, cond expression) {
|
||||
if cond != nil {
|
||||
if c.filter == nil {
|
||||
c.filter = newIntegerFilterArrayCursor(cond)
|
||||
|
|
@ -330,18 +317,17 @@ func (c *integerMultiShardArrayCursor) reset(cur cursors.IntegerArrayCursor, itr
|
|||
}
|
||||
|
||||
c.IntegerArrayCursor = cur
|
||||
c.itrs = itrs
|
||||
c.cursorIterator = cursorIterator
|
||||
c.err = nil
|
||||
c.count = 0
|
||||
}
|
||||
|
||||
func (c *integerMultiShardArrayCursor) Err() error { return c.err }
|
||||
func (c *integerArrayCursor) Err() error { return c.err }
|
||||
|
||||
func (c *integerMultiShardArrayCursor) Stats() cursors.CursorStats {
|
||||
func (c *integerArrayCursor) Stats() cursors.CursorStats {
|
||||
return c.IntegerArrayCursor.Stats()
|
||||
}
|
||||
|
||||
func (c *integerMultiShardArrayCursor) Next() *cursors.IntegerArray {
|
||||
func (c *integerArrayCursor) Next() *cursors.IntegerArray {
|
||||
for {
|
||||
a := c.IntegerArrayCursor.Next()
|
||||
if a.Len() == 0 {
|
||||
|
|
@ -349,31 +335,19 @@ func (c *integerMultiShardArrayCursor) Next() *cursors.IntegerArray {
|
|||
continue
|
||||
}
|
||||
}
|
||||
c.count += int64(a.Len())
|
||||
if c.count > c.limit {
|
||||
diff := c.count - c.limit
|
||||
c.count -= diff
|
||||
rem := int64(a.Len()) - diff
|
||||
a.Timestamps = a.Timestamps[:rem]
|
||||
a.Values = a.Values[:rem]
|
||||
}
|
||||
return a
|
||||
}
|
||||
}
|
||||
|
||||
func (c *integerMultiShardArrayCursor) nextArrayCursor() bool {
|
||||
if len(c.itrs) == 0 {
|
||||
func (c *integerArrayCursor) nextArrayCursor() bool {
|
||||
if c.cursorIterator == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
c.IntegerArrayCursor.Close()
|
||||
|
||||
var itr cursors.CursorIterator
|
||||
var cur cursors.Cursor
|
||||
for cur == nil && len(c.itrs) > 0 {
|
||||
itr, c.itrs = c.itrs[0], c.itrs[1:]
|
||||
cur, _ = itr.Next(c.ctx, c.req)
|
||||
}
|
||||
cur, _ := c.cursorIterator.Next(c.ctx, c.req)
|
||||
c.cursorIterator = nil
|
||||
|
||||
var ok bool
|
||||
if cur != nil {
|
||||
|
|
@ -382,7 +356,7 @@ func (c *integerMultiShardArrayCursor) nextArrayCursor() bool {
|
|||
if !ok {
|
||||
cur.Close()
|
||||
next = IntegerEmptyArrayCursor
|
||||
c.itrs = nil
|
||||
c.cursorIterator = nil
|
||||
c.err = errors.New("expected integer cursor")
|
||||
} else {
|
||||
if c.filter != nil {
|
||||
|
|
@ -543,13 +517,13 @@ LOOP:
|
|||
return c.res
|
||||
}
|
||||
|
||||
type unsignedMultiShardArrayCursor struct {
|
||||
type unsignedArrayCursor struct {
|
||||
cursors.UnsignedArrayCursor
|
||||
cursorContext
|
||||
filter *unsignedArrayFilterCursor
|
||||
}
|
||||
|
||||
func (c *unsignedMultiShardArrayCursor) reset(cur cursors.UnsignedArrayCursor, itrs cursors.CursorIterators, cond expression) {
|
||||
func (c *unsignedArrayCursor) reset(cur cursors.UnsignedArrayCursor, cursorIterator cursors.CursorIterator, cond expression) {
|
||||
if cond != nil {
|
||||
if c.filter == nil {
|
||||
c.filter = newUnsignedFilterArrayCursor(cond)
|
||||
|
|
@ -559,18 +533,17 @@ func (c *unsignedMultiShardArrayCursor) reset(cur cursors.UnsignedArrayCursor, i
|
|||
}
|
||||
|
||||
c.UnsignedArrayCursor = cur
|
||||
c.itrs = itrs
|
||||
c.cursorIterator = cursorIterator
|
||||
c.err = nil
|
||||
c.count = 0
|
||||
}
|
||||
|
||||
func (c *unsignedMultiShardArrayCursor) Err() error { return c.err }
|
||||
func (c *unsignedArrayCursor) Err() error { return c.err }
|
||||
|
||||
func (c *unsignedMultiShardArrayCursor) Stats() cursors.CursorStats {
|
||||
func (c *unsignedArrayCursor) Stats() cursors.CursorStats {
|
||||
return c.UnsignedArrayCursor.Stats()
|
||||
}
|
||||
|
||||
func (c *unsignedMultiShardArrayCursor) Next() *cursors.UnsignedArray {
|
||||
func (c *unsignedArrayCursor) Next() *cursors.UnsignedArray {
|
||||
for {
|
||||
a := c.UnsignedArrayCursor.Next()
|
||||
if a.Len() == 0 {
|
||||
|
|
@ -578,31 +551,19 @@ func (c *unsignedMultiShardArrayCursor) Next() *cursors.UnsignedArray {
|
|||
continue
|
||||
}
|
||||
}
|
||||
c.count += int64(a.Len())
|
||||
if c.count > c.limit {
|
||||
diff := c.count - c.limit
|
||||
c.count -= diff
|
||||
rem := int64(a.Len()) - diff
|
||||
a.Timestamps = a.Timestamps[:rem]
|
||||
a.Values = a.Values[:rem]
|
||||
}
|
||||
return a
|
||||
}
|
||||
}
|
||||
|
||||
func (c *unsignedMultiShardArrayCursor) nextArrayCursor() bool {
|
||||
if len(c.itrs) == 0 {
|
||||
func (c *unsignedArrayCursor) nextArrayCursor() bool {
|
||||
if c.cursorIterator == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
c.UnsignedArrayCursor.Close()
|
||||
|
||||
var itr cursors.CursorIterator
|
||||
var cur cursors.Cursor
|
||||
for cur == nil && len(c.itrs) > 0 {
|
||||
itr, c.itrs = c.itrs[0], c.itrs[1:]
|
||||
cur, _ = itr.Next(c.ctx, c.req)
|
||||
}
|
||||
cur, _ := c.cursorIterator.Next(c.ctx, c.req)
|
||||
c.cursorIterator = nil
|
||||
|
||||
var ok bool
|
||||
if cur != nil {
|
||||
|
|
@ -611,7 +572,7 @@ func (c *unsignedMultiShardArrayCursor) nextArrayCursor() bool {
|
|||
if !ok {
|
||||
cur.Close()
|
||||
next = UnsignedEmptyArrayCursor
|
||||
c.itrs = nil
|
||||
c.cursorIterator = nil
|
||||
c.err = errors.New("expected unsigned cursor")
|
||||
} else {
|
||||
if c.filter != nil {
|
||||
|
|
@ -772,13 +733,13 @@ LOOP:
|
|||
return c.res
|
||||
}
|
||||
|
||||
type stringMultiShardArrayCursor struct {
|
||||
type stringArrayCursor struct {
|
||||
cursors.StringArrayCursor
|
||||
cursorContext
|
||||
filter *stringArrayFilterCursor
|
||||
}
|
||||
|
||||
func (c *stringMultiShardArrayCursor) reset(cur cursors.StringArrayCursor, itrs cursors.CursorIterators, cond expression) {
|
||||
func (c *stringArrayCursor) reset(cur cursors.StringArrayCursor, cursorIterator cursors.CursorIterator, cond expression) {
|
||||
if cond != nil {
|
||||
if c.filter == nil {
|
||||
c.filter = newStringFilterArrayCursor(cond)
|
||||
|
|
@ -788,18 +749,17 @@ func (c *stringMultiShardArrayCursor) reset(cur cursors.StringArrayCursor, itrs
|
|||
}
|
||||
|
||||
c.StringArrayCursor = cur
|
||||
c.itrs = itrs
|
||||
c.cursorIterator = cursorIterator
|
||||
c.err = nil
|
||||
c.count = 0
|
||||
}
|
||||
|
||||
func (c *stringMultiShardArrayCursor) Err() error { return c.err }
|
||||
func (c *stringArrayCursor) Err() error { return c.err }
|
||||
|
||||
func (c *stringMultiShardArrayCursor) Stats() cursors.CursorStats {
|
||||
func (c *stringArrayCursor) Stats() cursors.CursorStats {
|
||||
return c.StringArrayCursor.Stats()
|
||||
}
|
||||
|
||||
func (c *stringMultiShardArrayCursor) Next() *cursors.StringArray {
|
||||
func (c *stringArrayCursor) Next() *cursors.StringArray {
|
||||
for {
|
||||
a := c.StringArrayCursor.Next()
|
||||
if a.Len() == 0 {
|
||||
|
|
@ -807,31 +767,19 @@ func (c *stringMultiShardArrayCursor) Next() *cursors.StringArray {
|
|||
continue
|
||||
}
|
||||
}
|
||||
c.count += int64(a.Len())
|
||||
if c.count > c.limit {
|
||||
diff := c.count - c.limit
|
||||
c.count -= diff
|
||||
rem := int64(a.Len()) - diff
|
||||
a.Timestamps = a.Timestamps[:rem]
|
||||
a.Values = a.Values[:rem]
|
||||
}
|
||||
return a
|
||||
}
|
||||
}
|
||||
|
||||
func (c *stringMultiShardArrayCursor) nextArrayCursor() bool {
|
||||
if len(c.itrs) == 0 {
|
||||
func (c *stringArrayCursor) nextArrayCursor() bool {
|
||||
if c.cursorIterator == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
c.StringArrayCursor.Close()
|
||||
|
||||
var itr cursors.CursorIterator
|
||||
var cur cursors.Cursor
|
||||
for cur == nil && len(c.itrs) > 0 {
|
||||
itr, c.itrs = c.itrs[0], c.itrs[1:]
|
||||
cur, _ = itr.Next(c.ctx, c.req)
|
||||
}
|
||||
cur, _ := c.cursorIterator.Next(c.ctx, c.req)
|
||||
c.cursorIterator = nil
|
||||
|
||||
var ok bool
|
||||
if cur != nil {
|
||||
|
|
@ -840,7 +788,7 @@ func (c *stringMultiShardArrayCursor) nextArrayCursor() bool {
|
|||
if !ok {
|
||||
cur.Close()
|
||||
next = StringEmptyArrayCursor
|
||||
c.itrs = nil
|
||||
c.cursorIterator = nil
|
||||
c.err = errors.New("expected string cursor")
|
||||
} else {
|
||||
if c.filter != nil {
|
||||
|
|
@ -961,13 +909,13 @@ LOOP:
|
|||
return c.res
|
||||
}
|
||||
|
||||
type booleanMultiShardArrayCursor struct {
|
||||
type booleanArrayCursor struct {
|
||||
cursors.BooleanArrayCursor
|
||||
cursorContext
|
||||
filter *booleanArrayFilterCursor
|
||||
}
|
||||
|
||||
func (c *booleanMultiShardArrayCursor) reset(cur cursors.BooleanArrayCursor, itrs cursors.CursorIterators, cond expression) {
|
||||
func (c *booleanArrayCursor) reset(cur cursors.BooleanArrayCursor, cursorIterator cursors.CursorIterator, cond expression) {
|
||||
if cond != nil {
|
||||
if c.filter == nil {
|
||||
c.filter = newBooleanFilterArrayCursor(cond)
|
||||
|
|
@ -977,18 +925,17 @@ func (c *booleanMultiShardArrayCursor) reset(cur cursors.BooleanArrayCursor, itr
|
|||
}
|
||||
|
||||
c.BooleanArrayCursor = cur
|
||||
c.itrs = itrs
|
||||
c.cursorIterator = cursorIterator
|
||||
c.err = nil
|
||||
c.count = 0
|
||||
}
|
||||
|
||||
func (c *booleanMultiShardArrayCursor) Err() error { return c.err }
|
||||
func (c *booleanArrayCursor) Err() error { return c.err }
|
||||
|
||||
func (c *booleanMultiShardArrayCursor) Stats() cursors.CursorStats {
|
||||
func (c *booleanArrayCursor) Stats() cursors.CursorStats {
|
||||
return c.BooleanArrayCursor.Stats()
|
||||
}
|
||||
|
||||
func (c *booleanMultiShardArrayCursor) Next() *cursors.BooleanArray {
|
||||
func (c *booleanArrayCursor) Next() *cursors.BooleanArray {
|
||||
for {
|
||||
a := c.BooleanArrayCursor.Next()
|
||||
if a.Len() == 0 {
|
||||
|
|
@ -996,31 +943,19 @@ func (c *booleanMultiShardArrayCursor) Next() *cursors.BooleanArray {
|
|||
continue
|
||||
}
|
||||
}
|
||||
c.count += int64(a.Len())
|
||||
if c.count > c.limit {
|
||||
diff := c.count - c.limit
|
||||
c.count -= diff
|
||||
rem := int64(a.Len()) - diff
|
||||
a.Timestamps = a.Timestamps[:rem]
|
||||
a.Values = a.Values[:rem]
|
||||
}
|
||||
return a
|
||||
}
|
||||
}
|
||||
|
||||
func (c *booleanMultiShardArrayCursor) nextArrayCursor() bool {
|
||||
if len(c.itrs) == 0 {
|
||||
func (c *booleanArrayCursor) nextArrayCursor() bool {
|
||||
if c.cursorIterator == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
c.BooleanArrayCursor.Close()
|
||||
|
||||
var itr cursors.CursorIterator
|
||||
var cur cursors.Cursor
|
||||
for cur == nil && len(c.itrs) > 0 {
|
||||
itr, c.itrs = c.itrs[0], c.itrs[1:]
|
||||
cur, _ = itr.Next(c.ctx, c.req)
|
||||
}
|
||||
cur, _ := c.cursorIterator.Next(c.ctx, c.req)
|
||||
c.cursorIterator = nil
|
||||
|
||||
var ok bool
|
||||
if cur != nil {
|
||||
|
|
@ -1029,7 +964,7 @@ func (c *booleanMultiShardArrayCursor) nextArrayCursor() bool {
|
|||
if !ok {
|
||||
cur.Close()
|
||||
next = BooleanEmptyArrayCursor
|
||||
c.itrs = nil
|
||||
c.cursorIterator = nil
|
||||
c.err = errors.New("expected boolean cursor")
|
||||
} else {
|
||||
if c.filter != nil {
|
||||
|
|
|
|||
|
|
@ -84,13 +84,13 @@ LOOP:
|
|||
return c.res
|
||||
}
|
||||
|
||||
type {{.name}}MultiShardArrayCursor struct {
|
||||
type {{.name}}ArrayCursor struct {
|
||||
cursors.{{.Name}}ArrayCursor
|
||||
cursorContext
|
||||
filter *{{$type}}
|
||||
}
|
||||
|
||||
func (c *{{.name}}MultiShardArrayCursor) reset(cur cursors.{{.Name}}ArrayCursor, itrs cursors.CursorIterators, cond expression) {
|
||||
func (c *{{.name}}ArrayCursor) reset(cur cursors.{{.Name}}ArrayCursor, cursorIterator cursors.CursorIterator, cond expression) {
|
||||
if cond != nil {
|
||||
if c.filter == nil {
|
||||
c.filter = new{{.Name}}FilterArrayCursor(cond)
|
||||
|
|
@ -100,19 +100,18 @@ func (c *{{.name}}MultiShardArrayCursor) reset(cur cursors.{{.Name}}ArrayCursor,
|
|||
}
|
||||
|
||||
c.{{.Name}}ArrayCursor = cur
|
||||
c.itrs = itrs
|
||||
c.cursorIterator = cursorIterator
|
||||
c.err = nil
|
||||
c.count = 0
|
||||
}
|
||||
|
||||
|
||||
func (c *{{.name}}MultiShardArrayCursor) Err() error { return c.err }
|
||||
func (c *{{.name}}ArrayCursor) Err() error { return c.err }
|
||||
|
||||
func (c *{{.name}}MultiShardArrayCursor) Stats() cursors.CursorStats {
|
||||
func (c *{{.name}}ArrayCursor) Stats() cursors.CursorStats {
|
||||
return c.{{.Name}}ArrayCursor.Stats()
|
||||
}
|
||||
|
||||
func (c *{{.name}}MultiShardArrayCursor) Next() {{$arrayType}} {
|
||||
func (c *{{.name}}ArrayCursor) Next() {{$arrayType}} {
|
||||
for {
|
||||
a := c.{{.Name}}ArrayCursor.Next()
|
||||
if a.Len() == 0 {
|
||||
|
|
@ -120,31 +119,19 @@ func (c *{{.name}}MultiShardArrayCursor) Next() {{$arrayType}} {
|
|||
continue
|
||||
}
|
||||
}
|
||||
c.count += int64(a.Len())
|
||||
if c.count > c.limit {
|
||||
diff := c.count - c.limit
|
||||
c.count -= diff
|
||||
rem := int64(a.Len()) - diff
|
||||
a.Timestamps = a.Timestamps[:rem]
|
||||
a.Values = a.Values[:rem]
|
||||
}
|
||||
return a
|
||||
}
|
||||
}
|
||||
|
||||
func (c *{{.name}}MultiShardArrayCursor) nextArrayCursor() bool {
|
||||
if len(c.itrs) == 0 {
|
||||
func (c *{{.name}}ArrayCursor) nextArrayCursor() bool {
|
||||
if c.cursorIterator == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
c.{{.Name}}ArrayCursor.Close()
|
||||
|
||||
var itr cursors.CursorIterator
|
||||
var cur cursors.Cursor
|
||||
for cur == nil && len(c.itrs) > 0 {
|
||||
itr, c.itrs = c.itrs[0], c.itrs[1:]
|
||||
cur, _ = itr.Next(c.ctx, c.req)
|
||||
}
|
||||
cur, _ := c.cursorIterator.Next(c.ctx, c.req)
|
||||
c.cursorIterator = nil
|
||||
|
||||
var ok bool
|
||||
if cur != nil {
|
||||
|
|
@ -153,7 +140,7 @@ func (c *{{.name}}MultiShardArrayCursor) nextArrayCursor() bool {
|
|||
if !ok {
|
||||
cur.Close()
|
||||
next = {{.Name}}EmptyArrayCursor
|
||||
c.itrs = nil
|
||||
c.cursorIterator = nil
|
||||
c.err = errors.New("expected {{.name}} cursor")
|
||||
} else {
|
||||
if c.filter != nil {
|
||||
|
|
|
|||
|
|
@ -64,36 +64,28 @@ func newCountArrayCursor(cur cursors.Cursor) cursors.Cursor {
|
|||
}
|
||||
|
||||
type cursorContext struct {
|
||||
ctx context.Context
|
||||
req *cursors.CursorRequest
|
||||
itrs cursors.CursorIterators
|
||||
limit int64
|
||||
count int64
|
||||
err error
|
||||
ctx context.Context
|
||||
req *cursors.CursorRequest
|
||||
cursorIterator cursors.CursorIterator
|
||||
err error
|
||||
}
|
||||
|
||||
type multiShardArrayCursors struct {
|
||||
ctx context.Context
|
||||
limit int64
|
||||
req cursors.CursorRequest
|
||||
type arrayCursors struct {
|
||||
ctx context.Context
|
||||
req cursors.CursorRequest
|
||||
|
||||
cursors struct {
|
||||
i integerMultiShardArrayCursor
|
||||
f floatMultiShardArrayCursor
|
||||
u unsignedMultiShardArrayCursor
|
||||
b booleanMultiShardArrayCursor
|
||||
s stringMultiShardArrayCursor
|
||||
i integerArrayCursor
|
||||
f floatArrayCursor
|
||||
u unsignedArrayCursor
|
||||
b booleanArrayCursor
|
||||
s stringArrayCursor
|
||||
}
|
||||
}
|
||||
|
||||
func newMultiShardArrayCursors(ctx context.Context, start, end int64, asc bool, limit int64) *multiShardArrayCursors {
|
||||
if limit < 0 {
|
||||
limit = 1
|
||||
}
|
||||
|
||||
m := &multiShardArrayCursors{
|
||||
ctx: ctx,
|
||||
limit: limit,
|
||||
func newArrayCursors(ctx context.Context, start, end int64, asc bool) *arrayCursors {
|
||||
m := &arrayCursors{
|
||||
ctx: ctx,
|
||||
req: cursors.CursorRequest{
|
||||
Ascending: asc,
|
||||
StartTime: start,
|
||||
|
|
@ -102,9 +94,8 @@ func newMultiShardArrayCursors(ctx context.Context, start, end int64, asc bool,
|
|||
}
|
||||
|
||||
cc := cursorContext{
|
||||
ctx: ctx,
|
||||
limit: limit,
|
||||
req: &m.req,
|
||||
ctx: ctx,
|
||||
req: &m.req,
|
||||
}
|
||||
|
||||
m.cursors.i.cursorContext = cc
|
||||
|
|
@ -116,48 +107,42 @@ func newMultiShardArrayCursors(ctx context.Context, start, end int64, asc bool,
|
|||
return m
|
||||
}
|
||||
|
||||
func (m *multiShardArrayCursors) createCursor(row SeriesRow) cursors.Cursor {
|
||||
m.req.Name = row.Name
|
||||
m.req.Tags = row.SeriesTags
|
||||
m.req.Field = row.Field
|
||||
func (m *arrayCursors) createCursor(seriesRow SeriesRow) cursors.Cursor {
|
||||
m.req.Name = seriesRow.Name
|
||||
m.req.Tags = seriesRow.SeriesTags
|
||||
m.req.Field = seriesRow.Field
|
||||
|
||||
var cond expression
|
||||
if row.ValueCond != nil {
|
||||
cond = &astExpr{row.ValueCond}
|
||||
if seriesRow.ValueCond != nil {
|
||||
cond = &astExpr{seriesRow.ValueCond}
|
||||
}
|
||||
|
||||
var shard cursors.CursorIterator
|
||||
var cur cursors.Cursor
|
||||
for cur == nil && len(row.Query) > 0 {
|
||||
shard, row.Query = row.Query[0], row.Query[1:]
|
||||
cur, _ = shard.Next(m.ctx, &m.req)
|
||||
if seriesRow.Query == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
cur, _ := seriesRow.Query.Next(m.ctx, &m.req)
|
||||
seriesRow.Query = nil
|
||||
if cur == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
switch c := cur.(type) {
|
||||
case cursors.IntegerArrayCursor:
|
||||
m.cursors.i.reset(c, row.Query, cond)
|
||||
m.cursors.i.reset(c, seriesRow.Query, cond)
|
||||
return &m.cursors.i
|
||||
case cursors.FloatArrayCursor:
|
||||
m.cursors.f.reset(c, row.Query, cond)
|
||||
m.cursors.f.reset(c, seriesRow.Query, cond)
|
||||
return &m.cursors.f
|
||||
case cursors.UnsignedArrayCursor:
|
||||
m.cursors.u.reset(c, row.Query, cond)
|
||||
m.cursors.u.reset(c, seriesRow.Query, cond)
|
||||
return &m.cursors.u
|
||||
case cursors.StringArrayCursor:
|
||||
m.cursors.s.reset(c, row.Query, cond)
|
||||
m.cursors.s.reset(c, seriesRow.Query, cond)
|
||||
return &m.cursors.s
|
||||
case cursors.BooleanArrayCursor:
|
||||
m.cursors.b.reset(c, row.Query, cond)
|
||||
m.cursors.b.reset(c, seriesRow.Query, cond)
|
||||
return &m.cursors.b
|
||||
default:
|
||||
panic(fmt.Sprintf("unreachable: %T", cur))
|
||||
}
|
||||
}
|
||||
|
||||
func (m *multiShardArrayCursors) newAggregateCursor(ctx context.Context, agg *datatypes.Aggregate, cursor cursors.Cursor) cursors.Cursor {
|
||||
return newAggregateArrayCursor(ctx, agg, cursor)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,7 +4,6 @@ import (
|
|||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"math"
|
||||
"sort"
|
||||
|
||||
"github.com/influxdata/influxdb/kit/tracing"
|
||||
|
|
@ -14,21 +13,19 @@ import (
|
|||
)
|
||||
|
||||
type groupResultSet struct {
|
||||
ctx context.Context
|
||||
req *datatypes.ReadGroupRequest
|
||||
agg *datatypes.Aggregate
|
||||
mb multiShardCursors
|
||||
ctx context.Context
|
||||
req *datatypes.ReadGroupRequest
|
||||
arrayCursors *arrayCursors
|
||||
|
||||
i int
|
||||
rows []*SeriesRow
|
||||
keys [][]byte
|
||||
nilSort []byte
|
||||
rgc groupByCursor
|
||||
km KeyMerger
|
||||
i int
|
||||
seriesRows []*SeriesRow
|
||||
keys [][]byte
|
||||
nilSort []byte
|
||||
groupByCursor groupByCursor
|
||||
km KeyMerger
|
||||
|
||||
newCursorFn func() (SeriesCursor, error)
|
||||
nextGroupFn func(c *groupResultSet) GroupCursor
|
||||
sortFn func(c *groupResultSet) (int, error)
|
||||
newSeriesCursorFn func() (SeriesCursor, error)
|
||||
nextGroupFn func(c *groupResultSet) GroupCursor
|
||||
|
||||
eof bool
|
||||
}
|
||||
|
|
@ -43,21 +40,24 @@ func GroupOptionNilSortLo() GroupOption {
|
|||
}
|
||||
}
|
||||
|
||||
func NewGroupResultSet(ctx context.Context, req *datatypes.ReadGroupRequest, newCursorFn func() (SeriesCursor, error), opts ...GroupOption) GroupResultSet {
|
||||
func NewGroupResultSet(ctx context.Context, req *datatypes.ReadGroupRequest, newSeriesCursorFn func() (SeriesCursor, error), opts ...GroupOption) GroupResultSet {
|
||||
span, _ := tracing.StartSpanFromContext(ctx)
|
||||
defer span.Finish()
|
||||
span.LogKV("group_type", req.Group.String())
|
||||
|
||||
g := &groupResultSet{
|
||||
ctx: ctx,
|
||||
req: req,
|
||||
agg: req.Aggregate,
|
||||
keys: make([][]byte, len(req.GroupKeys)),
|
||||
nilSort: NilSortHi,
|
||||
newCursorFn: newCursorFn,
|
||||
ctx: ctx,
|
||||
req: req,
|
||||
keys: make([][]byte, len(req.GroupKeys)),
|
||||
nilSort: NilSortHi,
|
||||
newSeriesCursorFn: newSeriesCursorFn,
|
||||
}
|
||||
|
||||
for _, o := range opts {
|
||||
o(g)
|
||||
}
|
||||
|
||||
g.mb = newMultiShardArrayCursors(ctx, req.Range.Start, req.Range.End, true, math.MaxInt64)
|
||||
g.arrayCursors = newArrayCursors(ctx, req.Range.Start, req.Range.End, true)
|
||||
|
||||
for i, k := range req.GroupKeys {
|
||||
g.keys[i] = []byte(k)
|
||||
|
|
@ -65,28 +65,33 @@ func NewGroupResultSet(ctx context.Context, req *datatypes.ReadGroupRequest, new
|
|||
|
||||
switch req.Group {
|
||||
case datatypes.GroupBy:
|
||||
g.sortFn = groupBySort
|
||||
g.nextGroupFn = groupByNextGroup
|
||||
g.rgc = groupByCursor{
|
||||
ctx: ctx,
|
||||
mb: g.mb,
|
||||
agg: req.Aggregate,
|
||||
vals: make([][]byte, len(req.GroupKeys)),
|
||||
g.groupByCursor = groupByCursor{
|
||||
ctx: ctx,
|
||||
arrayCursors: g.arrayCursors,
|
||||
agg: req.Aggregate,
|
||||
vals: make([][]byte, len(req.GroupKeys)),
|
||||
}
|
||||
|
||||
if n, err := g.groupBySort(); n == 0 || err != nil {
|
||||
return nil
|
||||
} else {
|
||||
span.LogKV("rows", n)
|
||||
}
|
||||
|
||||
case datatypes.GroupNone:
|
||||
g.sortFn = groupNoneSort
|
||||
g.nextGroupFn = groupNoneNextGroup
|
||||
|
||||
if n, err := g.groupNoneSort(); n == 0 || err != nil {
|
||||
return nil
|
||||
} else {
|
||||
span.LogKV("rows", n)
|
||||
}
|
||||
|
||||
default:
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
n, err := g.sort()
|
||||
if n == 0 || err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return g
|
||||
}
|
||||
|
||||
|
|
@ -111,25 +116,11 @@ func (g *groupResultSet) Next() GroupCursor {
|
|||
return g.nextGroupFn(g)
|
||||
}
|
||||
|
||||
func (g *groupResultSet) sort() (int, error) {
|
||||
span, _ := tracing.StartSpanFromContext(g.ctx)
|
||||
defer span.Finish()
|
||||
span.LogKV("group_type", g.req.Group.String())
|
||||
|
||||
n, err := g.sortFn(g)
|
||||
|
||||
if err != nil {
|
||||
span.LogKV("rows", n)
|
||||
}
|
||||
|
||||
return n, err
|
||||
}
|
||||
|
||||
// seriesHasPoints reads the first block of TSM data to verify the series has points for
|
||||
// the time range of the query.
|
||||
func (g *groupResultSet) seriesHasPoints(row *SeriesRow) bool {
|
||||
// TODO(sgc): this is expensive. Storage engine must provide efficient time range queries of series keys.
|
||||
cur := g.mb.createCursor(*row)
|
||||
cur := g.arrayCursors.createCursor(*row)
|
||||
var ts []int64
|
||||
switch c := cur.(type) {
|
||||
case cursors.IntegerArrayCursor:
|
||||
|
|
@ -157,90 +148,90 @@ func (g *groupResultSet) seriesHasPoints(row *SeriesRow) bool {
|
|||
}
|
||||
|
||||
func groupNoneNextGroup(g *groupResultSet) GroupCursor {
|
||||
cur, err := g.newCursorFn()
|
||||
seriesCursor, err := g.newSeriesCursorFn()
|
||||
if err != nil {
|
||||
// TODO(sgc): store error
|
||||
return nil
|
||||
} else if cur == nil {
|
||||
} else if seriesCursor == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
g.eof = true
|
||||
return &groupNoneCursor{
|
||||
ctx: g.ctx,
|
||||
mb: g.mb,
|
||||
agg: g.agg,
|
||||
cur: cur,
|
||||
keys: g.km.Get(),
|
||||
ctx: g.ctx,
|
||||
arrayCursors: g.arrayCursors,
|
||||
agg: g.req.Aggregate,
|
||||
cur: seriesCursor,
|
||||
keys: g.km.Get(),
|
||||
}
|
||||
}
|
||||
|
||||
func groupNoneSort(g *groupResultSet) (int, error) {
|
||||
cur, err := g.newCursorFn()
|
||||
func (g *groupResultSet) groupNoneSort() (int, error) {
|
||||
seriesCursor, err := g.newSeriesCursorFn()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
} else if cur == nil {
|
||||
} else if seriesCursor == nil {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
allTime := g.req.Hints.HintSchemaAllTime()
|
||||
g.km.Clear()
|
||||
n := 0
|
||||
row := cur.Next()
|
||||
for row != nil {
|
||||
if allTime || g.seriesHasPoints(row) {
|
||||
seriesRow := seriesCursor.Next()
|
||||
for seriesRow != nil {
|
||||
if allTime || g.seriesHasPoints(seriesRow) {
|
||||
n++
|
||||
g.km.MergeTagKeys(row.Tags)
|
||||
g.km.MergeTagKeys(seriesRow.Tags)
|
||||
}
|
||||
row = cur.Next()
|
||||
seriesRow = seriesCursor.Next()
|
||||
}
|
||||
|
||||
cur.Close()
|
||||
seriesCursor.Close()
|
||||
return n, nil
|
||||
}
|
||||
|
||||
func groupByNextGroup(g *groupResultSet) GroupCursor {
|
||||
row := g.rows[g.i]
|
||||
row := g.seriesRows[g.i]
|
||||
for i := range g.keys {
|
||||
g.rgc.vals[i] = row.Tags.Get(g.keys[i])
|
||||
g.groupByCursor.vals[i] = row.Tags.Get(g.keys[i])
|
||||
}
|
||||
|
||||
g.km.Clear()
|
||||
rowKey := row.SortKey
|
||||
j := g.i
|
||||
for j < len(g.rows) && bytes.Equal(rowKey, g.rows[j].SortKey) {
|
||||
g.km.MergeTagKeys(g.rows[j].Tags)
|
||||
for j < len(g.seriesRows) && bytes.Equal(rowKey, g.seriesRows[j].SortKey) {
|
||||
g.km.MergeTagKeys(g.seriesRows[j].Tags)
|
||||
j++
|
||||
}
|
||||
|
||||
g.rgc.reset(g.rows[g.i:j])
|
||||
g.rgc.keys = g.km.Get()
|
||||
g.groupByCursor.reset(g.seriesRows[g.i:j])
|
||||
g.groupByCursor.keys = g.km.Get()
|
||||
|
||||
g.i = j
|
||||
if j == len(g.rows) {
|
||||
if j == len(g.seriesRows) {
|
||||
g.eof = true
|
||||
}
|
||||
|
||||
return &g.rgc
|
||||
return &g.groupByCursor
|
||||
}
|
||||
|
||||
func groupBySort(g *groupResultSet) (int, error) {
|
||||
cur, err := g.newCursorFn()
|
||||
func (g *groupResultSet) groupBySort() (int, error) {
|
||||
seriesCursor, err := g.newSeriesCursorFn()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
} else if cur == nil {
|
||||
} else if seriesCursor == nil {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
var rows []*SeriesRow
|
||||
var seriesRows []*SeriesRow
|
||||
vals := make([][]byte, len(g.keys))
|
||||
tagsBuf := &tagsBuffer{sz: 4096}
|
||||
allTime := g.req.Hints.HintSchemaAllTime()
|
||||
|
||||
row := cur.Next()
|
||||
for row != nil {
|
||||
if allTime || g.seriesHasPoints(row) {
|
||||
nr := *row
|
||||
seriesRow := seriesCursor.Next()
|
||||
for seriesRow != nil {
|
||||
if allTime || g.seriesHasPoints(seriesRow) {
|
||||
nr := *seriesRow
|
||||
nr.SeriesTags = tagsBuf.copyTags(nr.SeriesTags)
|
||||
nr.Tags = tagsBuf.copyTags(nr.Tags)
|
||||
|
||||
|
|
@ -259,28 +250,28 @@ func groupBySort(g *groupResultSet) (int, error) {
|
|||
nr.SortKey = append(nr.SortKey, ',')
|
||||
}
|
||||
|
||||
rows = append(rows, &nr)
|
||||
seriesRows = append(seriesRows, &nr)
|
||||
}
|
||||
row = cur.Next()
|
||||
seriesRow = seriesCursor.Next()
|
||||
}
|
||||
|
||||
sort.Slice(rows, func(i, j int) bool {
|
||||
return bytes.Compare(rows[i].SortKey, rows[j].SortKey) == -1
|
||||
sort.Slice(seriesRows, func(i, j int) bool {
|
||||
return bytes.Compare(seriesRows[i].SortKey, seriesRows[j].SortKey) == -1
|
||||
})
|
||||
|
||||
g.rows = rows
|
||||
g.seriesRows = seriesRows
|
||||
|
||||
cur.Close()
|
||||
return len(rows), nil
|
||||
seriesCursor.Close()
|
||||
return len(seriesRows), nil
|
||||
}
|
||||
|
||||
type groupNoneCursor struct {
|
||||
ctx context.Context
|
||||
mb multiShardCursors
|
||||
agg *datatypes.Aggregate
|
||||
cur SeriesCursor
|
||||
row SeriesRow
|
||||
keys [][]byte
|
||||
ctx context.Context
|
||||
arrayCursors *arrayCursors
|
||||
agg *datatypes.Aggregate
|
||||
cur SeriesCursor
|
||||
row SeriesRow
|
||||
keys [][]byte
|
||||
}
|
||||
|
||||
func (c *groupNoneCursor) Err() error { return nil }
|
||||
|
|
@ -302,36 +293,36 @@ func (c *groupNoneCursor) Next() bool {
|
|||
}
|
||||
|
||||
func (c *groupNoneCursor) Cursor() cursors.Cursor {
|
||||
cur := c.mb.createCursor(c.row)
|
||||
cur := c.arrayCursors.createCursor(c.row)
|
||||
if c.agg != nil {
|
||||
cur = c.mb.newAggregateCursor(c.ctx, c.agg, cur)
|
||||
cur = newAggregateArrayCursor(c.ctx, c.agg, cur)
|
||||
}
|
||||
return cur
|
||||
}
|
||||
|
||||
type groupByCursor struct {
|
||||
ctx context.Context
|
||||
mb multiShardCursors
|
||||
agg *datatypes.Aggregate
|
||||
i int
|
||||
rows []*SeriesRow
|
||||
keys [][]byte
|
||||
vals [][]byte
|
||||
ctx context.Context
|
||||
arrayCursors *arrayCursors
|
||||
agg *datatypes.Aggregate
|
||||
i int
|
||||
seriesRows []*SeriesRow
|
||||
keys [][]byte
|
||||
vals [][]byte
|
||||
}
|
||||
|
||||
func (c *groupByCursor) reset(rows []*SeriesRow) {
|
||||
func (c *groupByCursor) reset(seriesRows []*SeriesRow) {
|
||||
c.i = 0
|
||||
c.rows = rows
|
||||
c.seriesRows = seriesRows
|
||||
}
|
||||
|
||||
func (c *groupByCursor) Err() error { return nil }
|
||||
func (c *groupByCursor) Keys() [][]byte { return c.keys }
|
||||
func (c *groupByCursor) PartitionKeyVals() [][]byte { return c.vals }
|
||||
func (c *groupByCursor) Tags() models.Tags { return c.rows[c.i-1].Tags }
|
||||
func (c *groupByCursor) Tags() models.Tags { return c.seriesRows[c.i-1].Tags }
|
||||
func (c *groupByCursor) Close() {}
|
||||
|
||||
func (c *groupByCursor) Next() bool {
|
||||
if c.i < len(c.rows) {
|
||||
if c.i < len(c.seriesRows) {
|
||||
c.i++
|
||||
return true
|
||||
}
|
||||
|
|
@ -339,17 +330,17 @@ func (c *groupByCursor) Next() bool {
|
|||
}
|
||||
|
||||
func (c *groupByCursor) Cursor() cursors.Cursor {
|
||||
cur := c.mb.createCursor(*c.rows[c.i-1])
|
||||
cur := c.arrayCursors.createCursor(*c.seriesRows[c.i-1])
|
||||
if c.agg != nil {
|
||||
cur = c.mb.newAggregateCursor(c.ctx, c.agg, cur)
|
||||
cur = newAggregateArrayCursor(c.ctx, c.agg, cur)
|
||||
}
|
||||
return cur
|
||||
}
|
||||
|
||||
func (c *groupByCursor) Stats() cursors.CursorStats {
|
||||
var stats cursors.CursorStats
|
||||
for _, row := range c.rows {
|
||||
stats.Add(row.Query.Stats())
|
||||
for _, seriesRow := range c.seriesRows {
|
||||
stats.Add(seriesRow.Query.Stats())
|
||||
}
|
||||
return stats
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2,31 +2,25 @@ package reads
|
|||
|
||||
import (
|
||||
"context"
|
||||
"math"
|
||||
|
||||
"github.com/influxdata/influxdb/models"
|
||||
"github.com/influxdata/influxdb/storage/reads/datatypes"
|
||||
"github.com/influxdata/influxdb/tsdb/cursors"
|
||||
)
|
||||
|
||||
type multiShardCursors interface {
|
||||
createCursor(row SeriesRow) cursors.Cursor
|
||||
newAggregateCursor(ctx context.Context, agg *datatypes.Aggregate, cursor cursors.Cursor) cursors.Cursor
|
||||
}
|
||||
|
||||
type resultSet struct {
|
||||
ctx context.Context
|
||||
agg *datatypes.Aggregate
|
||||
cur SeriesCursor
|
||||
row SeriesRow
|
||||
mb multiShardCursors
|
||||
ctx context.Context
|
||||
agg *datatypes.Aggregate
|
||||
seriesCursor SeriesCursor
|
||||
seriesRow SeriesRow
|
||||
arrayCursors *arrayCursors
|
||||
}
|
||||
|
||||
func NewFilteredResultSet(ctx context.Context, req *datatypes.ReadFilterRequest, cur SeriesCursor) ResultSet {
|
||||
func NewFilteredResultSet(ctx context.Context, req *datatypes.ReadFilterRequest, seriesCursor SeriesCursor) ResultSet {
|
||||
return &resultSet{
|
||||
ctx: ctx,
|
||||
cur: cur,
|
||||
mb: newMultiShardArrayCursors(ctx, req.Range.Start, req.Range.End, true, math.MaxInt64),
|
||||
ctx: ctx,
|
||||
seriesCursor: seriesCursor,
|
||||
arrayCursors: newArrayCursors(ctx, req.Range.Start, req.Range.End, true),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -37,8 +31,8 @@ func (r *resultSet) Close() {
|
|||
if r == nil {
|
||||
return // Nothing to do.
|
||||
}
|
||||
r.row.Query = nil
|
||||
r.cur.Close()
|
||||
r.seriesRow.Query = nil
|
||||
r.seriesCursor.Close()
|
||||
}
|
||||
|
||||
// Next returns true if there are more results available.
|
||||
|
|
@ -47,28 +41,33 @@ func (r *resultSet) Next() bool {
|
|||
return false
|
||||
}
|
||||
|
||||
row := r.cur.Next()
|
||||
if row == nil {
|
||||
seriesRow := r.seriesCursor.Next()
|
||||
if seriesRow == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
r.row = *row
|
||||
r.seriesRow = *seriesRow
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func (r *resultSet) Cursor() cursors.Cursor {
|
||||
cur := r.mb.createCursor(r.row)
|
||||
cur := r.arrayCursors.createCursor(r.seriesRow)
|
||||
if r.agg != nil {
|
||||
cur = r.mb.newAggregateCursor(r.ctx, r.agg, cur)
|
||||
cur = newAggregateArrayCursor(r.ctx, r.agg, cur)
|
||||
}
|
||||
return cur
|
||||
}
|
||||
|
||||
func (r *resultSet) Tags() models.Tags {
|
||||
return r.row.Tags
|
||||
return r.seriesRow.Tags
|
||||
}
|
||||
|
||||
// Stats returns the stats for the underlying cursors.
|
||||
// Available after resultset has been scanned.
|
||||
func (r *resultSet) Stats() cursors.CursorStats { return r.row.Query.Stats() }
|
||||
func (r *resultSet) Stats() cursors.CursorStats {
|
||||
if r.seriesRow.Query == nil {
|
||||
return cursors.CursorStats{}
|
||||
}
|
||||
return r.seriesRow.Query.Stats()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -26,7 +26,7 @@ type SeriesRow struct {
|
|||
SeriesTags models.Tags // unmodified series tags
|
||||
Tags models.Tags
|
||||
Field string
|
||||
Query cursors.CursorIterators
|
||||
Query cursors.CursorIterator
|
||||
ValueCond influxql.Expr
|
||||
}
|
||||
|
||||
|
|
@ -63,7 +63,7 @@ func NewIndexSeriesCursor(ctx context.Context, orgID, bucketID influxdb.ID, pred
|
|||
Ascending: true,
|
||||
Ordered: true,
|
||||
}
|
||||
p := &indexSeriesCursor{row: SeriesRow{Query: cursors.CursorIterators{cursorIterator}}}
|
||||
p := &indexSeriesCursor{row: SeriesRow{Query: cursorIterator}}
|
||||
|
||||
if root := predicate.GetRoot(); root != nil {
|
||||
if p.cond, err = NodeToExpr(root, nil); err != nil {
|
||||
|
|
|
|||
Loading…
Reference in New Issue